package tw.org.nchc.code;
import java.io.IOException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseAdmin;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
class Log {
String gid, sid, version;
String alert_name, class_type, priority;
String source, destination, type;
// String ttl, tos, id, iplen, dgmlen;
String srcport, dstport,tmp;
public Log(String data) {
String[] arr = data.split(";");
this.gid = arr[0];
this.sid = arr[1];
this.version = arr[2];
this.alert_name = arr[3];
this.class_type = arr[4];
this.priority = arr[5];
this.timestamp = getTime(arr[7] + "/" + arr[6] + ":" + arr[8] + ":"
+ arr[9] + ":" + arr[10]);
this.source = getIP(arr[11]);
this.srcport = this.tmp;
this.destination = getIP(arr[12]);
this.dstport = this.tmp;
this.type = arr[13];
}
long timestamp;
String getIP(String str){
String res;
int n = str.indexOf(":");
if (n == -1) {
res = str;
this.tmp = "0";
} else {
String[] vec = str.split(":");
res = vec[0];
this.tmp = vec[1];
}
return res;
}
long getTime(String str) {
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM:HH:mm:ss",
Locale.TAIWAN);
Long timestamp = sdf.parse(str, new ParsePosition(0)).getTime();
return timestamp;
}
}
// import AccessLogParser
public class SnortBase {
static HBaseConfiguration conf = new HBaseConfiguration();
public static final String TABLE = "table.name";
static String tableName = "flex";
static HTable table = null;
public static class MapClass extends MapReduceBase implements
Mapper<WritableComparable, Text, Text, Writable> {
public void configure(JobConf job) {
}
public void map(WritableComparable key, Text value,
OutputCollector<Text, Writable> output, Reporter reporter)
throws IOException {
Log log = new Log(value.toString());
if (table == null)
table = new HTable(conf, new Text(tableName));
long lockId = table.startUpdate(new Text(log.destination));
table.put(lockId, new Text("id:gid"), log.gid.getBytes());
table.put(lockId, new Text("id:sid"), log.sid.getBytes());
table.put(lockId, new Text("id:version"), log.version.getBytes());
table.put(lockId, new Text("name:name"), log.alert_name.getBytes());
table
.put(lockId, new Text("name:class"), log.class_type
.getBytes());
table.put(lockId, new Text("id:priority"), log.priority
.getBytes());
table.put(lockId, new Text("direction:soure"), log.source.getBytes());
table.put(lockId, new Text("direction:srcport"), log.srcport.getBytes());
table.put(lockId, new Text("direction:dstport"), log.dstport.getBytes());
table.put(lockId, new Text("payload:type"), log.type.getBytes());
table.commit(lockId, log.timestamp);
}
}
// do it to resolve warning : FileSystem.listPaths
static public Path[] listPaths(FileSystem fsm, Path path)
throws IOException {
FileStatus[] fss = fsm.listStatus(path);
int length = fss.length;
Path[] pi = new Path[length];
for (int i = 0; i < length; i++) {
pi[i] = fss[i].getPath();
}
return pi;
}
public static void runMapReduce(String tableName, String inpath)
throws IOException {
Path tempDir = new Path("/tmp/Mylog/");
Path InputPath = new Path(inpath);
FileSystem fs = FileSystem.get(conf);
JobConf jobConf = new JobConf(conf, SnortBase.class);
jobConf.setJobName("Snort Parse");
jobConf.set(TABLE, tableName);
jobConf.setInputPath(InputPath);
jobConf.setOutputPath(tempDir);
jobConf.setMapperClass(MapClass.class);
JobClient client = new JobClient(jobConf);
ClusterStatus cluster = client.getClusterStatus();
jobConf.setNumMapTasks(cluster.getMapTasks());
jobConf.setNumReduceTasks(0);
fs.delete(tempDir);
JobClient.runJob(jobConf);
fs.delete(tempDir);
fs.close();
}
public static void creatTable(String table) throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
if (!admin.tableExists(new Text(table))) {
System.out.println("1. " + table
+ " table creating ... please wait");
HTableDescriptor tableDesc = new HTableDescriptor(table);
tableDesc.addFamily(new HColumnDescriptor("id:"));
tableDesc.addFamily(new HColumnDescriptor("name:"));
tableDesc.addFamily(new HColumnDescriptor("direction:"));
tableDesc.addFamily(new HColumnDescriptor("payload:"));
admin.createTable(tableDesc);
} else {
System.out.println("1. " + table + " table already exists.");
}
System.out.println("2. access_log files fetching using map/reduce");
}
public static void main(String[] args) throws IOException, Exception {
String path = "/user/waue/snort-log/alert_flex_parsed.txt";
creatTable(tableName);
runMapReduce(tableName, path);
}
}