source: sample/hadoop-0.16/tw/org/nchc/code/SnortUploadHbase.java @ 33

Last change on this file since 33 was 33, checked in by waue, 16 years ago

little tunning SnortParser?.java

new SnortUploadHbase? to parse data to hbase, testing

File size: 4.8 KB
Line 
1/**
2 * Program: SnortUploadHbase.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7
8/**
9 * Purpose :
10 *  First, program would parse your record and create Hbase.\
11 *  Then it sets the first line as column qualify \
12 *  Finally it stores in HBase automatically.
13 *
14 * HowToUse :
15 *  Make sure two thing :
16 *  1. source_file must be regular as follow:
17 *    first line: qualify1:qualify2:...:qualifyN
18 *    other line: records1:records2:...:recordsN
19 *  2. source_file path must be correct.
20
21 * Check Result:
22 *  Go to hbase console, type :
23 *    hql > select * from t1_table;
24
25
26 */
27
28package tw.org.nchc.code;
29
30import java.io.File;
31import java.io.IOException;
32import java.util.Iterator;
33
34import org.apache.hadoop.fs.FileSystem;
35import org.apache.hadoop.fs.Path;
36import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37import org.apache.hadoop.hbase.mapred.TableReduce;
38import org.apache.hadoop.io.LongWritable;
39import org.apache.hadoop.io.MapWritable;
40import org.apache.hadoop.io.Text;
41import org.apache.hadoop.mapred.JobClient;
42import org.apache.hadoop.mapred.JobConf;
43import org.apache.hadoop.mapred.OutputCollector;
44import org.apache.hadoop.mapred.Reporter;
45import org.apache.hadoop.mapred.lib.IdentityMapper;
46import org.apache.hadoop.mapred.lib.IdentityReducer;
47
48public class SnortUploadHbase {
49  /* Major parameter */
50  // it indicates local path, not hadoop file system path
51  final static String source_file = "/var/log/snort/alert";
52
53  /* Minor parameter */
54  // column family name
55  final static String column_family = "snort:";
56
57  // table name
58  final static String table_name = "SnortTable";
59
60  // separate char
61  final static String sp = ";";
62 
63  // data source tmp
64  final static String text_tmp = "/tmp/HBaseRecord.text.tmp";
65
66  // on this sample, map is nonuse, we use reduce to handle
67  private static class ReduceClass extends TableReduce<LongWritable, Text> {
68    public void reduce(LongWritable key, Iterator<Text> values,
69        OutputCollector<Text, MapWritable> output, Reporter reporter)
70        throws IOException {
71
72      String first_line = "gid;sid;version;alert name;" +
73          "class;priority;year;month;day;hour;min;second;source;" +
74          "destination;type;ttl;tos;id; iplen;dgmlen";
75
76      // extract cf data
77      String[] cf = first_line.split(sp);
78      int length = cf.length;
79       
80      // values.next().getByte() can get value and transfer to byte form,
81      String stro = new String(values.next().getBytes());
82      String str[] = stro.split(sp);
83
84      // Column id is created dymanically,
85      Text[] col_n = new Text[length];
86      byte[][] b_l = new byte[length][];
87      // contents must be ImmutableBytesWritable
88      ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
89
90      // This map connect to hbase table and holds the columns per row
91      MapWritable map = new MapWritable();
92      map.clear();
93
94      // prepare to write data into map
95      for (int i = 0; i < length; i++) {
96        col_n[i] = new Text(column_family + cf[i]);
97        b_l[i] = str[i].getBytes();
98        w_l[i] = new ImmutableBytesWritable(b_l[i]);
99        // populate the current row
100        map.put(col_n[i], w_l[i]);
101      }
102      // add the row with the key as the row id
103      output.collect(new Text(key.toString()), map);
104    }
105  }
106
107  public SnortUploadHbase() {
108  }
109 
110  // tmp file delete
111  boolean deleteFile(String str)throws IOException{
112    File df = new File(str);
113   
114    if(df.exists()){
115      if(!df.delete()){
116        System.err.print("delete file error !");
117      }
118    }else{
119      System.out.println("file not exit!");
120    }
121    return true;
122  }
123  /**
124   * Runs the demo.
125   */
126  public static void main(String[] args) throws IOException {
127   
128    String[] col_family = {column_family};
129    Path text_path = new Path(text_tmp);
130   
131//    setup.parseFirstLine(source_file, text_tmp);
132//    System.out.println(first_line);
133    new SnortParser(source_file,text_tmp);
134   
135   
136    BuildHTable build_table = new BuildHTable(table_name,
137        col_family);
138    if (!build_table.checkTableExist(table_name)) {
139      if (!build_table.createTable()) {
140        System.out.println("create table error !");
141      }
142    } else {
143      System.out.println("Table \"" + table_name
144          + "\" has already existed !");
145    }
146    JobConf conf = new JobConf(SnortUploadHbase.class);
147    FileSystem fileconf = FileSystem.get(conf);
148    fileconf.copyFromLocalFile(true, text_path, text_path);
149    // Job name; you can modify to any you like
150    conf.setJobName("SnortDataBase");
151    final int mapTasks = 1;
152    final int reduceTasks = 1;
153    // Hbase table name must be correct , in our profile is t1_table
154    TableReduce.initJob(table_name, ReduceClass.class, conf);
155
156    // below are map-reduce profile
157    conf.setNumMapTasks(mapTasks);
158    conf.setNumReduceTasks(reduceTasks);
159   
160    conf.setInputPath(text_path);
161
162   
163    conf.setMapperClass(IdentityMapper.class);
164    conf.setCombinerClass(IdentityReducer.class);
165    conf.setReducerClass(ReduceClass.class);
166
167    JobClient.runJob(conf);
168   
169    // delete tmp file
170    // 0.16
171    FileSystem.get(conf).delete(text_path);
172   
173  }
174}
Note: See TracBrowser for help on using the repository browser.