Changes between Initial Version and Version 1 of ExperimentLog3


Ignore:
Timestamp:
Aug 11, 2008, 3:59:12 PM (16 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • ExperimentLog3

    v1 v1  
     1{{{
     2#!java
     3
     4package tw.org.nchc.code;
     5
     6import java.io.IOException;
     7import java.text.ParsePosition;
     8import java.text.SimpleDateFormat;
     9import java.util.Date;
     10import java.util.Locale;
     11
     12import org.apache.hadoop.fs.FileStatus;
     13import org.apache.hadoop.fs.FileSystem;
     14import org.apache.hadoop.fs.Path;
     15import org.apache.hadoop.hbase.HBaseAdmin;
     16import org.apache.hadoop.hbase.HBaseConfiguration;
     17import org.apache.hadoop.hbase.HColumnDescriptor;
     18import org.apache.hadoop.hbase.HTable;
     19import org.apache.hadoop.hbase.HTableDescriptor;
     20import org.apache.hadoop.io.Text;
     21import org.apache.hadoop.io.Writable;
     22import org.apache.hadoop.io.WritableComparable;
     23import org.apache.hadoop.mapred.ClusterStatus;
     24import org.apache.hadoop.mapred.JobClient;
     25import org.apache.hadoop.mapred.JobConf;
     26import org.apache.hadoop.mapred.MapReduceBase;
     27import org.apache.hadoop.mapred.Mapper;
     28import org.apache.hadoop.mapred.OutputCollector;
     29import org.apache.hadoop.mapred.Reporter;
     30
     31class Log {
     32        String gid, sid, version;
     33
     34        String alert_name, class_type, priority;
     35
     36        String source, destination, type;
     37
     38        // String ttl, tos, id, iplen, dgmlen;
     39       
     40        String srcport, dstport,tmp;
     41        public Log(String data) {
     42
     43                String[] arr = data.split(";");
     44                this.gid = arr[0];
     45                this.sid = arr[1];
     46                this.version = arr[2];
     47                this.alert_name = arr[3];
     48                this.class_type = arr[4];
     49                this.priority = arr[5];
     50                this.timestamp = getTime(arr[7] + "/" + arr[6] + ":" + arr[8] + ":"
     51                                + arr[9] + ":" + arr[10]);
     52                this.source = getIP(arr[11]);
     53                this.srcport = this.tmp;
     54                this.destination = getIP(arr[12]);
     55                this.dstport = this.tmp;
     56                this.type = arr[13];
     57
     58               
     59        }
     60        long timestamp;
     61
     62
     63        String getIP(String str){
     64                String res;
     65                int n = str.indexOf(":");
     66                if (n == -1) {
     67                        res = str;
     68                        this.tmp = "0";
     69                } else {
     70                        String[] vec = str.split(":");
     71                        res = vec[0];
     72                        this.tmp = vec[1];
     73                }
     74                return res;
     75        }
     76
     77        long getTime(String str) {
     78                SimpleDateFormat sdf = new SimpleDateFormat("dd/MM:HH:mm:ss",
     79                                Locale.TAIWAN);
     80                Long timestamp = sdf.parse(str, new ParsePosition(0)).getTime();
     81                return timestamp;
     82        }
     83}
     84
     85// import AccessLogParser
     86public class SnortBase {
     87        static HBaseConfiguration conf = new HBaseConfiguration();
     88
     89        public static final String TABLE = "table.name";
     90
     91        static String tableName = "flex";
     92
     93        static HTable table = null;
     94
     95        public static class MapClass extends MapReduceBase implements
     96                        Mapper<WritableComparable, Text, Text, Writable> {
     97
     98                public void configure(JobConf job) {
     99
     100                }
     101
     102                public void map(WritableComparable key, Text value,
     103                                OutputCollector<Text, Writable> output, Reporter reporter)
     104                                throws IOException {
     105
     106                        Log log = new Log(value.toString());
     107
     108
     109                        if (table == null)
     110                                table = new HTable(conf, new Text(tableName));
     111
     112                        long lockId = table.startUpdate(new Text(log.destination));
     113                        table.put(lockId, new Text("id:gid"), log.gid.getBytes());
     114                        table.put(lockId, new Text("id:sid"), log.sid.getBytes());
     115                        table.put(lockId, new Text("id:version"), log.version.getBytes());
     116                        table.put(lockId, new Text("name:name"), log.alert_name.getBytes());
     117                        table
     118                                        .put(lockId, new Text("name:class"), log.class_type
     119                                                        .getBytes());
     120                        table.put(lockId, new Text("id:priority"), log.priority
     121                                        .getBytes());
     122                        table.put(lockId, new Text("direction:soure"), log.source.getBytes());
     123                        table.put(lockId, new Text("direction:srcport"), log.srcport.getBytes());
     124                        table.put(lockId, new Text("direction:dstport"), log.dstport.getBytes());
     125                        table.put(lockId, new Text("payload:type"), log.type.getBytes());
     126
     127                        table.commit(lockId, log.timestamp);
     128
     129                }
     130        }
     131
     132        // do it to resolve warning : FileSystem.listPaths
     133        static public Path[] listPaths(FileSystem fsm, Path path)
     134                        throws IOException {
     135                FileStatus[] fss = fsm.listStatus(path);
     136                int length = fss.length;
     137                Path[] pi = new Path[length];
     138                for (int i = 0; i < length; i++) {
     139                        pi[i] = fss[i].getPath();
     140                }
     141                return pi;
     142        }
     143
     144        public static void runMapReduce(String tableName, String inpath)
     145                        throws IOException {
     146                Path tempDir = new Path("/tmp/Mylog/");
     147                Path InputPath = new Path(inpath);
     148                FileSystem fs = FileSystem.get(conf);
     149                JobConf jobConf = new JobConf(conf, SnortBase.class);
     150                jobConf.setJobName("Snort Parse");
     151                jobConf.set(TABLE, tableName);
     152
     153                jobConf.setInputPath(InputPath);
     154                jobConf.setOutputPath(tempDir);
     155                jobConf.setMapperClass(MapClass.class);
     156                JobClient client = new JobClient(jobConf);
     157                ClusterStatus cluster = client.getClusterStatus();
     158                jobConf.setNumMapTasks(cluster.getMapTasks());
     159                jobConf.setNumReduceTasks(0);
     160                fs.delete(tempDir);
     161                JobClient.runJob(jobConf);
     162                fs.delete(tempDir);
     163                fs.close();
     164        }
     165
     166        public static void creatTable(String table) throws IOException {
     167                HBaseAdmin admin = new HBaseAdmin(conf);
     168                if (!admin.tableExists(new Text(table))) {
     169                        System.out.println("1. " + table
     170                                        + " table creating ... please wait");
     171                        HTableDescriptor tableDesc = new HTableDescriptor(table);
     172                        tableDesc.addFamily(new HColumnDescriptor("id:"));
     173                        tableDesc.addFamily(new HColumnDescriptor("name:"));
     174                        tableDesc.addFamily(new HColumnDescriptor("direction:"));
     175                        tableDesc.addFamily(new HColumnDescriptor("payload:"));
     176                        admin.createTable(tableDesc);
     177                } else {
     178                        System.out.println("1. " + table + " table already exists.");
     179                }
     180                System.out.println("2. access_log files fetching using map/reduce");
     181        }
     182
     183        public static void main(String[] args) throws IOException, Exception {
     184
     185                String path = "/user/waue/snort-log/alert_flex_parsed.txt";
     186
     187                creatTable(tableName);
     188
     189                runMapReduce(tableName, path);
     190
     191        }
     192}
     193}}}
     194
     195{{{
     196#!html
     197
     198}}}