source: sample/hadoop-0.16/tw/org/nchc/code/SnortBase.java @ 70

Last change on this file since 70 was 48, checked in by waue, 16 years ago

change the database structure

File size: 9.9 KB
Line 
1  /**
2 * Program: LogParserGo.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7/**
8 * Purpose :
9 *  This program will parse your apache log and store it into Hbase.
10 *
11 * HowToUse :
12 *  Make sure two thing :
13 *  1. Upload apache logs ( /var/log/apache2/access.log* ) to \
14 *    hdfs (default: /user/waue/apache-log) \
15 *   $ bin/hadoop dfs -put /var/log/apache2/ apache-log
16 *  2. parameter "dir" in main contains the logs.
17 *  3. you should filter the exception contents manually, \
18 *    ex:  ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...
19 * 
20 * Check Result:
21 *  Go to hbase console, type :
22 *    hql > select * from apache-log;
23
24 +-------------------------+-------------------------+-------------------------+
25 | Row                     | Column                  | Cell                    |
26 +-------------------------+-------------------------+-------------------------+
27 | 118.170.101.250         | http:agent              | Mozilla/4.0 (compatible;|
28 |                         |                         |  MSIE 4.01; Windows 95) |
29 +-------------------------+-------------------------+-------------------------+
30 | 118.170.101.250         | http:bytesize           | 318                     |
31 +-------------------------+-------------------------+-------------------------+
32 ..........(skip)........
33 +-------------------------+-------------------------+-------------------------+
34 | 87.65.93.58             | http:method             | OPTIONS                 |
35 +-------------------------+-------------------------+-------------------------+
36 | 87.65.93.58             | http:protocol           | HTTP/1.1                |
37 +-------------------------+-------------------------+-------------------------+
38 | 87.65.93.58             | referrer:-              | *                       |
39 +-------------------------+-------------------------+-------------------------+
40 | 87.65.93.58             | url:*                   | -                       |
41 +-------------------------+-------------------------+-------------------------+
42 31 row(s) in set. (0.58 sec)
43
44
45
46 */
47package tw.org.nchc.code;
48
49import java.io.IOException;
50import java.text.ParsePosition;
51import java.text.SimpleDateFormat;
52import java.util.Date;
53import java.util.Locale;
54
55import org.apache.hadoop.fs.FileStatus;
56import org.apache.hadoop.fs.FileSystem;
57import org.apache.hadoop.fs.Path;
58import org.apache.hadoop.hbase.HBaseAdmin;
59import org.apache.hadoop.hbase.HBaseConfiguration;
60import org.apache.hadoop.hbase.HColumnDescriptor;
61import org.apache.hadoop.hbase.HTable;
62import org.apache.hadoop.hbase.HTableDescriptor;
63import org.apache.hadoop.io.Text;
64import org.apache.hadoop.io.Writable;
65import org.apache.hadoop.io.WritableComparable;
66import org.apache.hadoop.mapred.ClusterStatus;
67import org.apache.hadoop.mapred.JobClient;
68import org.apache.hadoop.mapred.JobConf;
69import org.apache.hadoop.mapred.MapReduceBase;
70import org.apache.hadoop.mapred.Mapper;
71import org.apache.hadoop.mapred.OutputCollector;
72import org.apache.hadoop.mapred.Reporter;
73
74class Log {
75  String gid, sid, version;
76
77  String alert_name, class_type, priority;
78
79  String source, destination, type;
80
81  // String ttl, tos, id, iplen, dgmlen;
82
83  String srcport, dstport, tmp;
84
85  public Log(String data) {
86
87    String[] arr = data.split(";");
88    this.gid = arr[0];
89    this.sid = arr[1];
90    this.version = arr[2];
91    this.alert_name = arr[3];
92    this.class_type = arr[4];
93    this.priority = arr[5];
94    this.timestamp = getTime(arr[7] + "/" + arr[6] + ":" + arr[8] + ":"
95        + arr[9] + ":" + arr[10]);
96    this.source = getIP(arr[11]);
97    this.srcport = this.tmp;
98    this.destination = getIP(arr[12]);
99    this.dstport = this.tmp;
100    this.type = arr[13];
101
102  }
103
104  long timestamp;
105
106  String getIP(String str) {
107    String res;
108    int n = str.indexOf(":");
109    if (n == -1) {
110      res = str;
111      this.tmp = "0";
112    } else {
113      String[] vec = str.split(":");
114      res = vec[0];
115      this.tmp = vec[1];
116    }
117    return res;
118  }
119
120  long getTime(String str) {
121    SimpleDateFormat sdf = new SimpleDateFormat("dd/MM:HH:mm:ss",
122        Locale.TAIWAN);
123    Long timestamp = sdf.parse(str, new ParsePosition(0)).getTime();
124    return timestamp;
125  }
126}
127
128// import AccessLogParser
129public class SnortBase {
130  static HBaseConfiguration conf = new HBaseConfiguration();
131
132  public static final String TABLE = "table.name";
133
134  static String tableName = "NewTable2";
135
136  static HTable table = null;
137
138  public static class MapClass extends MapReduceBase implements
139      Mapper<WritableComparable, Text, Text, Writable> {
140
141    @Override
142    // MapReduceBase.configure(JobConf job)
143    // Default implementation that does nothing.
144    public void configure(JobConf job) {
145      // String get(String name,String defaultValue)
146      // Get the value of the name property. If no such property exists,\
147      // then defaultValue is returned.
148    }
149
150    public void map(WritableComparable key, Text value,
151        OutputCollector<Text, Writable> output, Reporter reporter)
152        throws IOException {
153
154      // try {
155
156      Log log = new Log(value.toString());
157
158      // 查看value的值
159      // FileWriter out = new FileWriter(new File(
160      // "/home/waue/Desktop/snort-result.txt"));
161      // out.write(value.toString() + "_time=" + log.timestamp + "\n");
162      // out.flush();
163      // out.close();
164
165      if (table == null)
166        table = new HTable(conf, new Text(tableName));
167      // 實驗三
168
169      String property_name = "name=" + log.alert_name + ";priority="
170          + log.priority + ";class=" + log.class_type + ";dst_port="
171          + log.dstport + ";type=" + log.type;
172      long lockId = table.startUpdate(new Text(log.destination));
173      table.put(lockId, new Text("SourceSID:" + log.source + "("
174          + log.sid+")"), property_name.getBytes());
175      // 實驗二
176      // long lockId = table.startUpdate(new
177      // Text(log.destination+":"+log.sid));
178      // String property_name =
179      // "priority="+log.priority+
180      // ";class="+log.class_type+
181      // ";snort_id="+log.sid;
182      // String property_source =
183      // log.source+":"+log.srcport+" => "
184      // +log.destination+":"+log.dstport;
185      // String property_payload = log.type;
186      // table.put(lockId, new Text("name:"+log.alert_name),
187      // property_name.getBytes());
188      // table.put(lockId, new Text("from:"+log.source),
189      // property_source.getBytes());
190      // table.put(lockId, new Text("payload:"+log.type),
191      // property_payload.getBytes());
192      // 實驗一
193      // table.put(lockId, new Text("property:gen_id"),
194      // log.gid.getBytes());
195      // table.put(lockId, new Text("property:name"), log.sid.getBytes());
196      // table.put(lockId, new Text("id:version"),
197      // log.version.getBytes());
198      // table.put(lockId, new Text("name:name"),
199      // log.alert_name.getBytes());
200      // table
201      // .put(lockId, new Text("name:class"), log.class_type
202      // .getBytes());
203      // table.put(lockId, new Text("id:priority"), log.priority
204      // .getBytes());
205      // table.put(lockId, new Text("direction:soure"),
206      // log.source.getBytes());
207      // table.put(lockId, new Text("direction:destination"),
208      // log.destination.getBytes());
209      // table.put(lockId, new Text("direction:srcport"),
210      // log.srcport.getBytes());
211      // table.put(lockId, new Text("direction:dstport"),
212      // log.dstport.getBytes());
213      // table.put(lockId, new Text("payload:type"), log.type.getBytes());
214
215      table.commit(lockId, log.timestamp);
216
217      // } catch (Exception e) {
218      // e.printStackTrace();
219      // }
220
221    }
222  }
223
224  // do it to resolve warning : FileSystem.listPaths
225  static public Path[] listPaths(FileSystem fsm, Path path)
226      throws IOException {
227    FileStatus[] fss = fsm.listStatus(path);
228    int length = fss.length;
229    Path[] pi = new Path[length];
230    for (int i = 0; i < length; i++) {
231      pi[i] = fss[i].getPath();
232    }
233    return pi;
234  }
235
236  public static void runMapReduce(String tableName, String inpath)
237      throws IOException {
238    Path tempDir = new Path("/tmp/Mylog/");
239    Path InputPath = new Path(inpath);
240    FileSystem fs = FileSystem.get(conf);
241    JobConf jobConf = new JobConf(conf, SnortBase.class);
242    jobConf.setJobName("Snort Parse");
243    jobConf.set(TABLE, tableName);
244    // 先省略 自動搜尋目錄的功能
245    // Path InputDir = new Path(inpath);
246    // Path[] in = listPaths(fs, InputDir);
247    // if (fs.isFile(InputDir))
248    // {
249    // jobConf.setInputPath(InputDir);
250    // }
251    // else{
252    // for (int i = 0; i < in.length; i++){
253    // if (fs.isFile(in[i])){
254    // jobConf.addInputPath(in[i]);
255    // } else
256    // {
257    // Path[] sub = listPaths(fs, in[i]);
258    // for (int j = 0; j < sub.length; j++)
259    // {
260    // if (fs.isFile(sub[j]))
261    // {
262    // jobConf.addInputPath(sub[j]);
263    // } } } } }
264
265    jobConf.setInputPath(InputPath);
266    jobConf.setOutputPath(tempDir);
267    jobConf.setMapperClass(MapClass.class);
268    JobClient client = new JobClient(jobConf);
269    ClusterStatus cluster = client.getClusterStatus();
270    jobConf.setNumMapTasks(cluster.getMapTasks());
271    jobConf.setNumReduceTasks(0);
272    fs.delete(tempDir);
273    JobClient.runJob(jobConf);
274    fs.delete(tempDir);
275    fs.close();
276  }
277
278  public static void creatTable(String table) throws IOException {
279    HBaseAdmin admin = new HBaseAdmin(conf);
280    if (!admin.tableExists(new Text(table))) {
281      System.out.println("1. " + table
282          + " table creating ... please wait");
283      HTableDescriptor tableDesc = new HTableDescriptor(table);
284      // 實驗三
285      tableDesc.addFamily(new HColumnDescriptor("SourceSID:"));
286      // 實驗二
287      // tableDesc.addFamily(new HColumnDescriptor("name:"));
288      // tableDesc.addFamily(new HColumnDescriptor("from:"));
289      // tableDesc.addFamily(new HColumnDescriptor("payload:"));
290      admin.createTable(tableDesc);
291    } else {
292      System.out.println("1. " + table + " table already exists.");
293    }
294    System.out.println("2. access_log files fetching using map/reduce");
295  }
296
297  public static void main(String[] args) throws IOException, Exception {
298
299    String path = "/user/waue/snort-log/alert_flex_parsed.txt";
300
301    // 先省略掉 parse完後自動上傳部份
302    /*
303     * SnortParser sp = new
304     * SnortParser("/tmp/alert","/tmp/alert_SnortBase"); sp.parseToLine();
305     */
306    creatTable(tableName);
307    Long start_time = (new Date()).getTime();
308    runMapReduce(tableName, path);
309    Long end_time = (new Date()).getTime();
310    System.out.println(end_time - start_time);
311  }
312
313}
Note: See TracBrowser for help on using the repository browser.