source: sample/hadoop-0.16/tw/org/nchc/code/SnortUploadHbase.java @ 77

Last change on this file since 77 was 43, checked in by waue, 16 years ago

HBaseRecordPro.java -> only modified for debuging SnortUploadHbase?.java

SnortParser?.java -> catch a bug and fix it

SnortUploadHbase?.java -> it can run finally ..

File size: 4.7 KB
Line 
1/**
2 * Program: SnortUploadHbase.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/23/2008
6 */
7
8/**
9 * Purpose :
10 *  The program will parse the log of snort (/var/log/snort/alert)
11 *    into Hbase table "snort".
12 *
13 * HowToUse :
14 *  Run by eclipse ! (dependency by SnortParser.java)
15
16 * Check Result:
17 *  Go to hbase console, type :
18 *    hql > select * from snort;
19
20
21 */
22
23package tw.org.nchc.code;
24
25import java.io.File;
26import java.io.IOException;
27import java.util.Iterator;
28
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32import org.apache.hadoop.hbase.mapred.TableReduce;
33import org.apache.hadoop.io.LongWritable;
34import org.apache.hadoop.io.MapWritable;
35import org.apache.hadoop.io.Text;
36import org.apache.hadoop.mapred.JobClient;
37import org.apache.hadoop.mapred.JobConf;
38import org.apache.hadoop.mapred.OutputCollector;
39import org.apache.hadoop.mapred.Reporter;
40import org.apache.hadoop.mapred.lib.IdentityMapper;
41import org.apache.hadoop.mapred.lib.IdentityReducer;
42
43import com.sun.org.apache.xerces.internal.impl.xpath.regex.ParseException;
44
45public class SnortUploadHbase {
46  /* Major parameter */
47  // it indicates local path, not hadoop file system path
48  final static String source_file = "/var/log/snort/alert";
49
50  /* Minor parameter */
51  // column family name
52  final static String column_family = "snort:";
53
54  // table name
55  final static String table_name = "Snort";
56
57  // separate char
58  final static String sp = ";";
59 
60  // data source tmp
61  final static String text_tmp = "/tmp/alert_my";
62
63  // on this sample, map is nonuse, we use reduce to handle
64  private static class ReduceClass extends TableReduce<LongWritable, Text> {
65    public void reduce(LongWritable key, Iterator<Text> values,
66        OutputCollector<Text, MapWritable> output, Reporter reporter)
67        throws IOException {
68
69      String first_line = "gid;sid;version;alert name;" +
70          "class;priority;month;day;hour;min;second;source;" +
71          "destination;type;ttl;tos;id; iplen;dgmlen;";
72
73      // extract cf data
74      String[] cf = first_line.split(sp);
75      int length = cf.length;
76       
77      // values.next().getByte() can get value and transfer to byte form,
78      String stro = new String(values.next().getBytes());
79      String str[] = stro.split(sp);
80
81      // Column id is created dymanically,
82      Text[] col_n = new Text[length];
83      byte[][] b_l = new byte[length][];
84      // contents must be ImmutableBytesWritable
85      ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
86
87      // This map connect to hbase table and holds the columns per row
88      MapWritable map = new MapWritable();
89      map.clear();
90
91      // prepare to write data into map
92      for (int i = 0; i < length; i++) {
93        col_n[i] = new Text(column_family + cf[i]);
94        b_l[i] = str[i].getBytes();
95        w_l[i] = new ImmutableBytesWritable(b_l[i]);
96        // populate the current row
97        map.put(col_n[i], w_l[i]);
98      }
99      // add the row with the key as the row id
100      output.collect(new Text(key.toString()), map);
101    }
102  }
103
104  public SnortUploadHbase() {
105  }
106 
107  // tmp file delete
108  boolean deleteFile(String str)throws IOException{
109    File df = new File(str);
110   
111    if(df.exists()){
112      if(!df.delete()){
113        System.err.print("delete file error !");
114      }
115    }else{
116      System.out.println("file not exit!");
117    }
118    return true;
119  }
120  /**
121   * Runs the demo.
122   */
123  public static void main(String[] args) throws IOException,ParseException,Exception {
124   
125    String[] col_family = {column_family};
126    Path text_path = new Path(text_tmp);
127   
128//    setup.parseFirstLine(source_file, text_tmp);
129//    System.out.println(first_line);
130    SnortParser sp = new SnortParser(source_file,text_tmp);
131    sp.parseToLine();
132   
133   
134    BuildHTable build_table = new BuildHTable(table_name,
135        col_family);
136    if (!build_table.checkTableExist(table_name)) {
137      if (!build_table.createTable()) {
138        System.out.println("create table error !");
139      }
140    } else {
141      System.out.println("Table \"" + table_name
142          + "\" has already existed !");
143    }
144    JobConf conf = new JobConf(SnortUploadHbase.class);
145    FileSystem fileconf = FileSystem.get(conf);
146    fileconf.copyFromLocalFile(true, text_path, text_path);
147    // Job name; you can modify to any you like
148    conf.setJobName("SnortDataBase");
149    final int mapTasks = 1;
150    final int reduceTasks = 1;
151    // Hbase table name must be correct , in our profile is t1_table
152    TableReduce.initJob(table_name, ReduceClass.class, conf);
153
154    // below are map-reduce profile
155    conf.setNumMapTasks(mapTasks);
156    conf.setNumReduceTasks(reduceTasks);
157   
158    conf.setInputPath(text_path);
159
160   
161    conf.setMapperClass(IdentityMapper.class);
162    conf.setCombinerClass(IdentityReducer.class);
163    conf.setReducerClass(ReduceClass.class);
164
165    JobClient.runJob(conf);
166   
167    // delete tmp file
168    // 0.16
169    FileSystem.get(conf).delete(text_path);
170   
171  }
172}
Note: See TracBrowser for help on using the repository browser.