icas 0.1版完成
/** * Program SnortProduce.java * Editor: Waue Chen * From : GTD. NCHC. Taiwn * Last Update Date: 07/29/2009 * support version : hadoop 0.18.3, hbase 0.18.1 */ package tw.org.nchc.code; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableReduce; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ICAS extends Configured implements Tool { HBaseConfiguration hbase_conf; HBaseAdmin hbase_admin; public ICAS() throws IOException { hbase_conf = new HBaseConfiguration(); hbase_admin = new HBaseAdmin(hbase_conf); } public static class ICAS_M extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { String getip(String str) { String[] ret = str.split(":"); return ret[0]; } public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { // [3] sig [4]class [5]y [6]m [7]d [8]h [9]M [10]s [11]s [12]d [13]t String line = value.toString(); String[] str = line.split(";"); String source_ip = getip(str[11]); String dest_ip = getip(str[12]); String fkey = dest_ip ; String date = str[5] + "/" + str[6] + "/" + str[7] + "_" + str[8] + ":" + str[9] + ":" + str[10]; // source @ date @ sig @ class @ type String fvalue = source_ip + "@" + date + "@" + str[3] +"@" +str[4] + "@"+ str[13]; output.collect(new Text(fkey), new Text(fvalue)); } } public static class ICAS_R extends TableReduce<Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter reporter) throws IOException { HTable table = new HTable("ICAS"); String source_ip; String date; String sig_class; String type; String signature; String rawstr = new String(values.next().getBytes()); String[] str = rawstr.split("@"); source_ip = str[0]; date = str[1]; signature = str[2]; sig_class = str[3]; type = str[4]; // values.next().getByte() can get value and transfer to byte form, while (values.hasNext()) { // source_ip + "@" + date + "@" + sig + "@" + class + "@" + type; rawstr = new String(values.next().getBytes()); str = rawstr.split("@"); source_ip = source_ip + " ; " + str[0]; date = date + " ; " + str[1]; signature = signature + ";" + str[2]; } reporter.setStatus("amp emitting cell for row'" + key.toString() + "'"); BatchUpdate map = new BatchUpdate(key.toString()); // map.setTimestamp(timestamp); map.put("infor:source_ip", Bytes.toBytes(source_ip)); map.put("infor:signature", Bytes.toBytes(signature)); map.put("infor:date", Bytes.toBytes(date)); map.put("infor:class", Bytes.toBytes(sig_class)); map.put("infor:type", Bytes.toBytes(type)); table.commit(map); // ImmutableBytesWritable Hkey = new ImmutableBytesWritable(Bytes // .toBytes(key.toString())); // output.collect(Hkey, map); } } public static class ICAS_T extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { StringBuilder ret = new StringBuilder("\n"); while (values.hasNext()) { String v = values.next().toString().trim(); if (v.length() > 0) ret.append(v + "\n"); } output.collect((Text) key, new Text(ret.toString())); } } /** * A reducer class that just emits the sum of the input values. */ public boolean checkTableExist(String table_name) throws IOException { if (!hbase_admin.tableExists(table_name)) { return false; } return true; } // create Hbase table , success = 1 ; failse = 0; Table_exist = 2; public boolean createTable(String table_name, String[] column_family) throws IOException { // check whether Table name exite or not if (!checkTableExist(table_name)) { HTableDescriptor tableDesc = new HTableDescriptor(table_name); for (int i = 0; i < column_family.length; i++) { String cf = column_family[i]; // check and correct name format "string:" if (!cf.endsWith(":")) { column_family[i] = cf + ":"; } // add column family tableDesc.addFamily(new HColumnDescriptor(column_family[i])); } hbase_admin.createTable(tableDesc); return true; } else { return false; } } static boolean hdfsput(String source_file, String text_tmp) { try { Process p = Runtime.getRuntime().exec( "hadoop dfs -put " + source_file + " " + text_tmp); BufferedReader in = new BufferedReader(new InputStreamReader(p .getErrorStream())); String err = null; while ((err = in.readLine()) != null) { System.err.println(err); } System.err.println("finish"); return true; } catch (Exception e) { e.printStackTrace(); return false; } } int printUsage() { System.out .println("ICAS <sourceFile> <tempFile> <tableName> <mapNumber> <reduceNumber> "); ToolRunner.printGenericCommandUsage(System.out); return -1; } public int run(String[] args) throws Exception { Path text_path = new Path(args[1]); // text path Path output = new Path("/tmp/output"); // /tmp/output /* set conf */ JobConf conf = new JobConf(getConf(), ICAS.class); conf.setJobName("SnortProduce"); // key point conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); /* set mapper and reducer */ conf.setMapperClass(ICAS_M.class); conf.setReducerClass(ICAS_R.class); // conf.setMapperClass(IdentityMapper.class); // conf.setReducerClass(IdentityReducer.class); /* delete previous output dir */ if (FileSystem.get(conf).exists(output)) FileSystem.get(conf).delete(output, true); conf.setNumMapTasks(Integer.parseInt(args[3])); conf.setNumReduceTasks(Integer.parseInt(args[4])); /* set input path */ FileInputFormat.setInputPaths(conf, text_path); FileOutputFormat.setOutputPath(conf, output); /* run */ JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { /* Major parameter */ // it indicates local path, not hadoop file system path String source_file = "/home/waue/log"; // data source tmp String text_tmp = "/user/waue/parsed"; /* Minor parameter */ String table_name = "ICAS"; String[] argv = { source_file, text_tmp, table_name, "1", "1" }; args = argv; /* build table */ String[] col_family = { "infor:" }; BuildHTable build_table = new BuildHTable(); if (!build_table.checkTableExist(args[2])) { if (!build_table.createTable(args[2], col_family)) { System.err.println("create table error !"); } } else { System.err.println("Table \"" + args[2] + "\" has already existed !"); } // if (!hdfsput(args[0], args[1])) { // System.out.println("directory is existed."); // } int res = ToolRunner.run(new Configuration(), new ICAS(), args); System.exit(res); } }
測試資料:
1;2404;5;attack_method_1;Attempted_Privilege;1;08;06;16;09;46;1.2.3.4:3394;140.110.138.22:445;TCP; 1;2404;5;attack_method_1;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP; 1;2404;5;attack_method_2;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP; 1;2404;5;attack_method_3;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP; 1;2404;5;attack_method_4;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.138.22:445;TCP; 1;2404;5;attack_method_1;Attempted_Privilege;1;08;06;16;09;48;5.6.7.8:3394;140.110.141.33:445;TCP;
結果:
hbase(main):005:0> scan 'ICAS' ROW COLUMN+CELL 140.110.138.22 column=infor:class, timestamp=1248863177234, value=Attempted_Privilege 140.110.138.22 column=infor:date, timestamp=1248863177234, value=1/08/06_16:09:46 ; 1/08/06_16:0 9:48 ; 1/08/06_16:09:48 ; 1/08/06_16:09:48 ; 1/08/06_16:09:48 140.110.138.22 column=infor:signature, timestamp=1248863177234, value=attack_method_1;attack_met hod_1;attack_method_2;attack_method_3;attack_method_4 140.110.138.22 column=infor:source_ip, timestamp=1248863177234, value=1.2.3.4 ; 5.6.7.8 ; 5.6.7. 8 ; 5.6.7.8 ; 5.6.7.8 140.110.138.22 column=infor:type, timestamp=1248863177234, value=TCP 140.110.141.33 column=infor:class, timestamp=1248863177268, value=Attempted_Privilege 140.110.141.33 column=infor:date, timestamp=1248863177268, value=1/08/06_16:09:48 140.110.141.33 column=infor:signature, timestamp=1248863177268, value=attack_method_1 140.110.141.33 column=infor:source_ip, timestamp=1248863177268, value=5.6.7.8 140.110.141.33 column=infor:type, timestamp=1248863177268, value=TCP 10 row(s) in 0.1550 seconds
- 問題1: 如何讓run裡宣告的物件,map可使用,reduce也可以呼叫到
public class ICAS extends Configured implements Tool { HBaseConfiguration hbase_conf; HBaseAdmin hbase_admin; public ICAS() throws IOException { hbase_conf = new HBaseConfiguration(); hbase_admin = new HBaseAdmin(hbase_conf); } public static class ICAS_M extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { //無法用hbase_admin, hbase_conf } public static class ICAS_R extends TableReduce<Text, Text> { //無法用hbase_admin, hbase_conf } public static void main(String[] args) throws Exception { //可以用hbase_admin, hbase_conf }
- 問題二、如何不覆蓋原本hbase內的資料,而是累加進去
- 要先從原本的資料庫把資料撈出來,再整合後放進去
- 問題三、如何讓reduce 顯示進度
- 問題四、如何讓<key,value>後的value 在進行一次<key,value>
- 現在:
key= dest ip value= infor: sip host M host1; host2; host1
- 期望值:(也就是多出來的host1要被濾掉)
key= dest ip value= infor: sip host M host1; host2
Last modified 15 years ago
Last modified on Jul 29, 2009, 6:28:19 PM