wiki:ExperimentLog3

Version 1 (modified by waue, 16 years ago) (diff)

--

package tw.org.nchc.code;

import java.io.IOException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseAdmin;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

class Log {
  String gid, sid, version;

  String alert_name, class_type, priority;

  String source, destination, type;

  // String ttl, tos, id, iplen, dgmlen;
  
  String srcport, dstport,tmp;
  public Log(String data) {

    String[] arr = data.split(";");
    this.gid = arr[0];
    this.sid = arr[1];
    this.version = arr[2];
    this.alert_name = arr[3];
    this.class_type = arr[4];
    this.priority = arr[5];
    this.timestamp = getTime(arr[7] + "/" + arr[6] + ":" + arr[8] + ":"
        + arr[9] + ":" + arr[10]);
    this.source = getIP(arr[11]);
    this.srcport = this.tmp;
    this.destination = getIP(arr[12]);
    this.dstport = this.tmp;
    this.type = arr[13];

    
  }
  long timestamp;


  String getIP(String str){
    String res;
    int n = str.indexOf(":");
    if (n == -1) {
      res = str;
      this.tmp = "0";
    } else {
      String[] vec = str.split(":");
      res = vec[0];
      this.tmp = vec[1];
    }
    return res;
  }

  long getTime(String str) {
    SimpleDateFormat sdf = new SimpleDateFormat("dd/MM:HH:mm:ss",
        Locale.TAIWAN);
    Long timestamp = sdf.parse(str, new ParsePosition(0)).getTime();
    return timestamp;
  }
}

// import AccessLogParser
public class SnortBase {
  static HBaseConfiguration conf = new HBaseConfiguration();

  public static final String TABLE = "table.name";

  static String tableName = "flex";

  static HTable table = null;

  public static class MapClass extends MapReduceBase implements
      Mapper<WritableComparable, Text, Text, Writable> {

    public void configure(JobConf job) {

    }

    public void map(WritableComparable key, Text value,
        OutputCollector<Text, Writable> output, Reporter reporter)
        throws IOException {

      Log log = new Log(value.toString());


      if (table == null)
        table = new HTable(conf, new Text(tableName));

      long lockId = table.startUpdate(new Text(log.destination));
      table.put(lockId, new Text("id:gid"), log.gid.getBytes());
      table.put(lockId, new Text("id:sid"), log.sid.getBytes());
      table.put(lockId, new Text("id:version"), log.version.getBytes());
      table.put(lockId, new Text("name:name"), log.alert_name.getBytes());
      table
          .put(lockId, new Text("name:class"), log.class_type
              .getBytes());
      table.put(lockId, new Text("id:priority"), log.priority
          .getBytes());
      table.put(lockId, new Text("direction:soure"), log.source.getBytes());
      table.put(lockId, new Text("direction:srcport"), log.srcport.getBytes());
      table.put(lockId, new Text("direction:dstport"), log.dstport.getBytes());
      table.put(lockId, new Text("payload:type"), log.type.getBytes());

      table.commit(lockId, log.timestamp);

    }
  }

  // do it to resolve warning : FileSystem.listPaths
  static public Path[] listPaths(FileSystem fsm, Path path)
      throws IOException {
    FileStatus[] fss = fsm.listStatus(path);
    int length = fss.length;
    Path[] pi = new Path[length];
    for (int i = 0; i < length; i++) {
      pi[i] = fss[i].getPath();
    }
    return pi;
  }

  public static void runMapReduce(String tableName, String inpath)
      throws IOException {
    Path tempDir = new Path("/tmp/Mylog/");
    Path InputPath = new Path(inpath);
    FileSystem fs = FileSystem.get(conf);
    JobConf jobConf = new JobConf(conf, SnortBase.class);
    jobConf.setJobName("Snort Parse");
    jobConf.set(TABLE, tableName);

    jobConf.setInputPath(InputPath);
    jobConf.setOutputPath(tempDir);
    jobConf.setMapperClass(MapClass.class);
    JobClient client = new JobClient(jobConf);
    ClusterStatus cluster = client.getClusterStatus();
    jobConf.setNumMapTasks(cluster.getMapTasks());
    jobConf.setNumReduceTasks(0);
    fs.delete(tempDir);
    JobClient.runJob(jobConf);
    fs.delete(tempDir);
    fs.close();
  }

  public static void creatTable(String table) throws IOException {
    HBaseAdmin admin = new HBaseAdmin(conf);
    if (!admin.tableExists(new Text(table))) {
      System.out.println("1. " + table
          + " table creating ... please wait");
      HTableDescriptor tableDesc = new HTableDescriptor(table);
      tableDesc.addFamily(new HColumnDescriptor("id:"));
      tableDesc.addFamily(new HColumnDescriptor("name:"));
      tableDesc.addFamily(new HColumnDescriptor("direction:"));
      tableDesc.addFamily(new HColumnDescriptor("payload:"));
      admin.createTable(tableDesc);
    } else {
      System.out.println("1. " + table + " table already exists.");
    }
    System.out.println("2. access_log files fetching using map/reduce");
  }

  public static void main(String[] args) throws IOException, Exception {

    String path = "/user/waue/snort-log/alert_flex_parsed.txt";

    creatTable(tableName);

    runMapReduce(tableName, path);

  }
}