/** * Program: LogParserGo.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 07/02/2008 */ /** * Purpose : * This program will parse your apache log and store it into Hbase. * * HowToUse : * Make sure two thing : * 1. Upload apache logs ( /var/log/apache2/access.log* ) to \ * hdfs (default: /user/waue/apache-log) \ * $ bin/hadoop dfs -put /var/log/apache2/ apache-log * 2. parameter "dir" in main contains the logs. * 3. you should filter the exception contents manually, \ * ex: ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "... * * Check Result: * Go to hbase console, type : * hql > select * from apache-log; +-------------------------+-------------------------+-------------------------+ | Row | Column | Cell | +-------------------------+-------------------------+-------------------------+ | 118.170.101.250 | http:agent | Mozilla/4.0 (compatible;| | | | MSIE 4.01; Windows 95) | +-------------------------+-------------------------+-------------------------+ | 118.170.101.250 | http:bytesize | 318 | +-------------------------+-------------------------+-------------------------+ ..........(skip)........ +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | http:method | OPTIONS | +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | http:protocol | HTTP/1.1 | +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | referrer:- | * | +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | url:* | - | +-------------------------+-------------------------+-------------------------+ 31 row(s) in set. (0.58 sec) */ package tw.org.nchc.code; import java.io.IOException; import java.text.ParsePosition; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseAdmin; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTable; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; class Log { String gid, sid, version; String alert_name, class_type, priority; String source, destination, type; // String ttl, tos, id, iplen, dgmlen; String srcport, dstport, tmp; public Log(String data) { String[] arr = data.split(";"); this.gid = arr[0]; this.sid = arr[1]; this.version = arr[2]; this.alert_name = arr[3]; this.class_type = arr[4]; this.priority = arr[5]; this.timestamp = getTime(arr[7] + "/" + arr[6] + ":" + arr[8] + ":" + arr[9] + ":" + arr[10]); this.source = getIP(arr[11]); this.srcport = this.tmp; this.destination = getIP(arr[12]); this.dstport = this.tmp; this.type = arr[13]; } long timestamp; String getIP(String str) { String res; int n = str.indexOf(":"); if (n == -1) { res = str; this.tmp = "0"; } else { String[] vec = str.split(":"); res = vec[0]; this.tmp = vec[1]; } return res; } long getTime(String str) { SimpleDateFormat sdf = new SimpleDateFormat("dd/MM:HH:mm:ss", Locale.TAIWAN); Long timestamp = sdf.parse(str, new ParsePosition(0)).getTime(); return timestamp; } } // import AccessLogParser public class SnortBase { static HBaseConfiguration conf = new HBaseConfiguration(); public static final String TABLE = "table.name"; static String tableName = "NewTable2"; static HTable table = null; public static class MapClass extends MapReduceBase implements Mapper { @Override // MapReduceBase.configure(JobConf job) // Default implementation that does nothing. public void configure(JobConf job) { // String get(String name,String defaultValue) // Get the value of the name property. If no such property exists,\ // then defaultValue is returned. } public void map(WritableComparable key, Text value, OutputCollector output, Reporter reporter) throws IOException { // try { Log log = new Log(value.toString()); // 查看value的值 // FileWriter out = new FileWriter(new File( // "/home/waue/Desktop/snort-result.txt")); // out.write(value.toString() + "_time=" + log.timestamp + "\n"); // out.flush(); // out.close(); if (table == null) table = new HTable(conf, new Text(tableName)); // 實驗三 String property_name = "name=" + log.alert_name + ";priority=" + log.priority + ";class=" + log.class_type + ";dst_port=" + log.dstport + ";type=" + log.type; long lockId = table.startUpdate(new Text(log.destination)); table.put(lockId, new Text("SourceSID:" + log.source + "(" + log.sid+")"), property_name.getBytes()); // 實驗二 // long lockId = table.startUpdate(new // Text(log.destination+":"+log.sid)); // String property_name = // "priority="+log.priority+ // ";class="+log.class_type+ // ";snort_id="+log.sid; // String property_source = // log.source+":"+log.srcport+" => " // +log.destination+":"+log.dstport; // String property_payload = log.type; // table.put(lockId, new Text("name:"+log.alert_name), // property_name.getBytes()); // table.put(lockId, new Text("from:"+log.source), // property_source.getBytes()); // table.put(lockId, new Text("payload:"+log.type), // property_payload.getBytes()); // 實驗一 // table.put(lockId, new Text("property:gen_id"), // log.gid.getBytes()); // table.put(lockId, new Text("property:name"), log.sid.getBytes()); // table.put(lockId, new Text("id:version"), // log.version.getBytes()); // table.put(lockId, new Text("name:name"), // log.alert_name.getBytes()); // table // .put(lockId, new Text("name:class"), log.class_type // .getBytes()); // table.put(lockId, new Text("id:priority"), log.priority // .getBytes()); // table.put(lockId, new Text("direction:soure"), // log.source.getBytes()); // table.put(lockId, new Text("direction:destination"), // log.destination.getBytes()); // table.put(lockId, new Text("direction:srcport"), // log.srcport.getBytes()); // table.put(lockId, new Text("direction:dstport"), // log.dstport.getBytes()); // table.put(lockId, new Text("payload:type"), log.type.getBytes()); table.commit(lockId, log.timestamp); // } catch (Exception e) { // e.printStackTrace(); // } } } // do it to resolve warning : FileSystem.listPaths static public Path[] listPaths(FileSystem fsm, Path path) throws IOException { FileStatus[] fss = fsm.listStatus(path); int length = fss.length; Path[] pi = new Path[length]; for (int i = 0; i < length; i++) { pi[i] = fss[i].getPath(); } return pi; } public static void runMapReduce(String tableName, String inpath) throws IOException { Path tempDir = new Path("/tmp/Mylog/"); Path InputPath = new Path(inpath); FileSystem fs = FileSystem.get(conf); JobConf jobConf = new JobConf(conf, SnortBase.class); jobConf.setJobName("Snort Parse"); jobConf.set(TABLE, tableName); // 先省略 自動搜尋目錄的功能 // Path InputDir = new Path(inpath); // Path[] in = listPaths(fs, InputDir); // if (fs.isFile(InputDir)) // { // jobConf.setInputPath(InputDir); // } // else{ // for (int i = 0; i < in.length; i++){ // if (fs.isFile(in[i])){ // jobConf.addInputPath(in[i]); // } else // { // Path[] sub = listPaths(fs, in[i]); // for (int j = 0; j < sub.length; j++) // { // if (fs.isFile(sub[j])) // { // jobConf.addInputPath(sub[j]); // } } } } } jobConf.setInputPath(InputPath); jobConf.setOutputPath(tempDir); jobConf.setMapperClass(MapClass.class); JobClient client = new JobClient(jobConf); ClusterStatus cluster = client.getClusterStatus(); jobConf.setNumMapTasks(cluster.getMapTasks()); jobConf.setNumReduceTasks(0); fs.delete(tempDir); JobClient.runJob(jobConf); fs.delete(tempDir); fs.close(); } public static void creatTable(String table) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (!admin.tableExists(new Text(table))) { System.out.println("1. " + table + " table creating ... please wait"); HTableDescriptor tableDesc = new HTableDescriptor(table); // 實驗三 tableDesc.addFamily(new HColumnDescriptor("SourceSID:")); // 實驗二 // tableDesc.addFamily(new HColumnDescriptor("name:")); // tableDesc.addFamily(new HColumnDescriptor("from:")); // tableDesc.addFamily(new HColumnDescriptor("payload:")); admin.createTable(tableDesc); } else { System.out.println("1. " + table + " table already exists."); } System.out.println("2. access_log files fetching using map/reduce"); } public static void main(String[] args) throws IOException, Exception { String path = "/user/waue/snort-log/alert_flex_parsed.txt"; // 先省略掉 parse完後自動上傳部份 /* * SnortParser sp = new * SnortParser("/tmp/alert","/tmp/alert_SnortBase"); sp.parseToLine(); */ creatTable(tableName); Long start_time = (new Date()).getTime(); runMapReduce(tableName, path); Long end_time = (new Date()).getTime(); System.out.println(end_time - start_time); } }