/** * Program: SnortUploadHbase.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 07/02/2008 */ /** * Purpose : * First, program would parse your record and create Hbase.\ * Then it sets the first line as column qualify \ * Finally it stores in HBase automatically. * * HowToUse : * Make sure two thing : * 1. source_file must be regular as follow: * first line: qualify1:qualify2:...:qualifyN * other line: records1:records2:...:recordsN * 2. source_file path must be correct. * Check Result: * Go to hbase console, type : * hql > select * from t1_table; */ package tw.org.nchc.code; import java.io.File; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableReduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; public class SnortUploadHbase { /* Major parameter */ // it indicates local path, not hadoop file system path final static String source_file = "/var/log/snort/alert"; /* Minor parameter */ // column family name final static String column_family = "snort:"; // table name final static String table_name = "SnortTable"; // separate char final static String sp = ";"; // data source tmp final static String text_tmp = "/tmp/HBaseRecord.text.tmp"; // on this sample, map is nonuse, we use reduce to handle private static class ReduceClass extends TableReduce { public void reduce(LongWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { String first_line = "gid;sid;version;alert name;" + "class;priority;year;month;day;hour;min;second;source;" + "destination;type;ttl;tos;id; iplen;dgmlen"; // extract cf data String[] cf = first_line.split(sp); int length = cf.length; // values.next().getByte() can get value and transfer to byte form, String stro = new String(values.next().getBytes()); String str[] = stro.split(sp); // Column id is created dymanically, Text[] col_n = new Text[length]; byte[][] b_l = new byte[length][]; // contents must be ImmutableBytesWritable ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length]; // This map connect to hbase table and holds the columns per row MapWritable map = new MapWritable(); map.clear(); // prepare to write data into map for (int i = 0; i < length; i++) { col_n[i] = new Text(column_family + cf[i]); b_l[i] = str[i].getBytes(); w_l[i] = new ImmutableBytesWritable(b_l[i]); // populate the current row map.put(col_n[i], w_l[i]); } // add the row with the key as the row id output.collect(new Text(key.toString()), map); } } public SnortUploadHbase() { } // tmp file delete boolean deleteFile(String str)throws IOException{ File df = new File(str); if(df.exists()){ if(!df.delete()){ System.err.print("delete file error !"); } }else{ System.out.println("file not exit!"); } return true; } /** * Runs the demo. */ public static void main(String[] args) throws IOException { String[] col_family = {column_family}; Path text_path = new Path(text_tmp); // setup.parseFirstLine(source_file, text_tmp); // System.out.println(first_line); new SnortParser(source_file,text_tmp); BuildHTable build_table = new BuildHTable(table_name, col_family); if (!build_table.checkTableExist(table_name)) { if (!build_table.createTable()) { System.out.println("create table error !"); } } else { System.out.println("Table \"" + table_name + "\" has already existed !"); } JobConf conf = new JobConf(SnortUploadHbase.class); FileSystem fileconf = FileSystem.get(conf); fileconf.copyFromLocalFile(true, text_path, text_path); // Job name; you can modify to any you like conf.setJobName("SnortDataBase"); final int mapTasks = 1; final int reduceTasks = 1; // Hbase table name must be correct , in our profile is t1_table TableReduce.initJob(table_name, ReduceClass.class, conf); // below are map-reduce profile conf.setNumMapTasks(mapTasks); conf.setNumReduceTasks(reduceTasks); conf.setInputPath(text_path); conf.setMapperClass(IdentityMapper.class); conf.setCombinerClass(IdentityReducer.class); conf.setReducerClass(ReduceClass.class); JobClient.runJob(conf); // delete tmp file // 0.16 FileSystem.get(conf).delete(text_path); } }