/** * Program: SnortUploadHbase.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 07/23/2008 */ /** * Purpose : * The program will parse the log of snort (/var/log/snort/alert) * into Hbase table "snort". * * HowToUse : * Run by eclipse ! (dependency by SnortParser.java) * Check Result: * Go to hbase console, type : * hql > select * from snort; */ package tw.org.nchc.code; import java.io.File; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableReduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import com.sun.org.apache.xerces.internal.impl.xpath.regex.ParseException; public class SnortUploadHbase { /* Major parameter */ // it indicates local path, not hadoop file system path final static String source_file = "/var/log/snort/alert"; /* Minor parameter */ // column family name final static String column_family = "snort:"; // table name final static String table_name = "Snort"; // separate char final static String sp = ";"; // data source tmp final static String text_tmp = "/tmp/alert_my"; // on this sample, map is nonuse, we use reduce to handle private static class ReduceClass extends TableReduce { public void reduce(LongWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { String first_line = "gid;sid;version;alert name;" + "class;priority;month;day;hour;min;second;source;" + "destination;type;ttl;tos;id; iplen;dgmlen;"; // extract cf data String[] cf = first_line.split(sp); int length = cf.length; // values.next().getByte() can get value and transfer to byte form, String stro = new String(values.next().getBytes()); String str[] = stro.split(sp); // Column id is created dymanically, Text[] col_n = new Text[length]; byte[][] b_l = new byte[length][]; // contents must be ImmutableBytesWritable ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length]; // This map connect to hbase table and holds the columns per row MapWritable map = new MapWritable(); map.clear(); // prepare to write data into map for (int i = 0; i < length; i++) { col_n[i] = new Text(column_family + cf[i]); b_l[i] = str[i].getBytes(); w_l[i] = new ImmutableBytesWritable(b_l[i]); // populate the current row map.put(col_n[i], w_l[i]); } // add the row with the key as the row id output.collect(new Text(key.toString()), map); } } public SnortUploadHbase() { } // tmp file delete boolean deleteFile(String str)throws IOException{ File df = new File(str); if(df.exists()){ if(!df.delete()){ System.err.print("delete file error !"); } }else{ System.out.println("file not exit!"); } return true; } /** * Runs the demo. */ public static void main(String[] args) throws IOException,ParseException,Exception { String[] col_family = {column_family}; Path text_path = new Path(text_tmp); // setup.parseFirstLine(source_file, text_tmp); // System.out.println(first_line); SnortParser sp = new SnortParser(source_file,text_tmp); sp.parseToLine(); BuildHTable build_table = new BuildHTable(table_name, col_family); if (!build_table.checkTableExist(table_name)) { if (!build_table.createTable()) { System.out.println("create table error !"); } } else { System.out.println("Table \"" + table_name + "\" has already existed !"); } JobConf conf = new JobConf(SnortUploadHbase.class); FileSystem fileconf = FileSystem.get(conf); fileconf.copyFromLocalFile(true, text_path, text_path); // Job name; you can modify to any you like conf.setJobName("SnortDataBase"); final int mapTasks = 1; final int reduceTasks = 1; // Hbase table name must be correct , in our profile is t1_table TableReduce.initJob(table_name, ReduceClass.class, conf); // below are map-reduce profile conf.setNumMapTasks(mapTasks); conf.setNumReduceTasks(reduceTasks); conf.setInputPath(text_path); conf.setMapperClass(IdentityMapper.class); conf.setCombinerClass(IdentityReducer.class); conf.setReducerClass(ReduceClass.class); JobClient.runJob(conf); // delete tmp file // 0.16 FileSystem.get(conf).delete(text_path); } }