source: sample/hadoop-0.16/tw/org/nchc/code/LogParserGo.java @ 64

Last change on this file since 64 was 45, checked in by waue, 16 years ago

SnortBase? is needed to debug.

File size: 7.7 KB
Line 
1/**
2 * Program: LogParserGo.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7/**
8 * Purpose :
9 *  This program will parse your apache log and store it into Hbase.
10 *
11 * HowToUse :
12 *  Make sure two thing :
13 *  1. Upload apache logs ( /var/log/apache2/access.log* ) to \
14 *    hdfs (default: /user/waue/apache-log) \
15 *   $ bin/hadoop dfs -put /var/log/apache2/ apache-log
16 *  2. parameter "dir" in main contains the logs.
17 *  3. you should filter the exception contents manually, \
18 *    ex:  ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...
19 * 
20 * Check Result:
21 *  Go to hbase console, type :
22 *    hql > select * from apache-log;
23
24+-------------------------+-------------------------+-------------------------+
25| Row                     | Column                  | Cell                    |
26+-------------------------+-------------------------+-------------------------+
27| 118.170.101.250         | http:agent              | Mozilla/4.0 (compatible;|
28|                         |                         |  MSIE 4.01; Windows 95) |
29+-------------------------+-------------------------+-------------------------+
30| 118.170.101.250         | http:bytesize           | 318                     |
31+-------------------------+-------------------------+-------------------------+
32..........(skip)........
33+-------------------------+-------------------------+-------------------------+
34| 87.65.93.58             | http:method             | OPTIONS                 |
35+-------------------------+-------------------------+-------------------------+
36| 87.65.93.58             | http:protocol           | HTTP/1.1                |
37+-------------------------+-------------------------+-------------------------+
38| 87.65.93.58             | referrer:-              | *                       |
39+-------------------------+-------------------------+-------------------------+
40| 87.65.93.58             | url:*                   | -                       |
41+-------------------------+-------------------------+-------------------------+
4231 row(s) in set. (0.58 sec)
43
44
45
46 */
47package tw.org.nchc.code;
48
49import java.io.File;
50import java.io.FileWriter;
51import java.io.IOException;
52
53import org.apache.hadoop.fs.FileStatus;
54import org.apache.hadoop.fs.FileSystem;
55import org.apache.hadoop.fs.Path;
56import org.apache.hadoop.hbase.HBaseAdmin;
57import org.apache.hadoop.hbase.HBaseConfiguration;
58import org.apache.hadoop.hbase.HColumnDescriptor;
59import org.apache.hadoop.hbase.HTable;
60import org.apache.hadoop.hbase.HTableDescriptor;
61import org.apache.hadoop.io.Text;
62import org.apache.hadoop.io.Writable;
63import org.apache.hadoop.io.WritableComparable;
64import org.apache.hadoop.mapred.ClusterStatus;
65import org.apache.hadoop.mapred.JobClient;
66import org.apache.hadoop.mapred.JobConf;
67import org.apache.hadoop.mapred.MapReduceBase;
68import org.apache.hadoop.mapred.Mapper;
69import org.apache.hadoop.mapred.OutputCollector;
70import org.apache.hadoop.mapred.Reporter;
71
72// import AccessLogParser
73/**
74 * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
75 * W3CExtended, IISw3cExtended)
76 */
77public class LogParserGo {
78  static HBaseConfiguration conf = new HBaseConfiguration();
79
80  public static final String TABLE = "table.name";
81
82  static String tableName;
83
84  static HTable table = null;
85 
86  static void print(String str){
87    System.out.println("STR  = "+str);
88  }
89  public static class MapClass extends MapReduceBase implements
90      Mapper<WritableComparable, Text, Text, Writable> {
91
92    @Override
93    // MapReduceBase.configure(JobConf job)
94    // Default implementation that does nothing.
95    public void configure(JobConf job) {
96      // String get(String name,String defaultValue)
97      // Get the value of the name property. If no such property exists,\
98      //  then defaultValue is returned.
99      tableName = job.get(TABLE, "");
100    }
101
102    public void map(WritableComparable key, Text value,
103        OutputCollector<Text, Writable> output, Reporter reporter)
104        throws IOException {
105     
106      try {
107        LogParser log = new LogParser(value.toString());
108        print(value.toString());
109        FileWriter out = new FileWriter(new File(
110        "/home/waue/Desktop/mr-result.txt"));
111        out.write(value.toString());
112        out.flush();
113        out.close();
114
115        if (table == null)
116          table = new HTable(conf, new Text(tableName));
117        long lockId = table.startUpdate(new Text(log.getIp()));
118        table.put(lockId, new Text("http:protocol"), log.getProtocol()
119            .getBytes());
120        table.put(lockId, new Text("http:method"), log.getMethod()
121            .getBytes());
122        table.put(lockId, new Text("http:code"), log.getCode()
123            .getBytes());
124        table.put(lockId, new Text("http:bytesize"), log.getByteSize()
125            .getBytes());
126        table.put(lockId, new Text("http:agent"), log.getAgent()
127            .getBytes());
128        table.put(lockId, new Text("url:" + log.getUrl()), log
129            .getReferrer().getBytes());
130        table.put(lockId, new Text("referrer:" + log.getReferrer()),
131            log.getUrl().getBytes());
132        table.commit(lockId, log.getTimestamp());
133       
134      } catch (Exception e) {
135        e.printStackTrace();
136      }
137     
138    }
139  }
140
141  // do it to resolve warning : FileSystem.listPaths
142  static public Path[] listPaths(FileSystem fsm, Path path)
143      throws IOException {
144    FileStatus[] fss = fsm.listStatus(path);
145    int length = fss.length;
146    Path[] pi = new Path[length];
147    for (int i = 0; i < length; i++) {
148      pi[i] = fss[i].getPath();
149    }
150    return pi;
151  }
152
153  public static void runMapReduce(String table, String dir)
154      throws IOException {
155    Path tempDir = new Path("/tmp/Mylog/");
156    Path InputDir = new Path(dir);
157    FileSystem fs = FileSystem.get(conf);
158    JobConf jobConf = new JobConf(conf, LogParserGo.class);
159    jobConf.setJobName("apache log fetcher");
160    jobConf.set(TABLE, table);
161    Path[] in = listPaths(fs, InputDir);
162    if (fs.isFile(InputDir)) {
163      jobConf.setInputPath(InputDir);
164    } else {
165      for (int i = 0; i < in.length; i++) {
166        if (fs.isFile(in[i])) {
167          jobConf.addInputPath(in[i]);
168        } else {
169          Path[] sub = listPaths(fs, in[i]);
170          for (int j = 0; j < sub.length; j++) {
171            if (fs.isFile(sub[j])) {
172              jobConf.addInputPath(sub[j]);
173            }
174          }
175        }
176      }
177    }
178    jobConf.setOutputPath(tempDir);
179
180    jobConf.setMapperClass(MapClass.class);
181
182    JobClient client = new JobClient(jobConf);
183    ClusterStatus cluster = client.getClusterStatus();
184    jobConf.setNumMapTasks(cluster.getMapTasks());
185    jobConf.setNumReduceTasks(0);
186
187    JobClient.runJob(jobConf);
188
189    fs.delete(tempDir);
190    fs.close();
191  }
192
193  public static void creatTable(String table) throws IOException {
194    HBaseAdmin admin = new HBaseAdmin(conf);
195    if (!admin.tableExists(new Text(table))) {
196      System.out.println("1. " + table
197          + " table creating ... please wait");
198      HTableDescriptor tableDesc = new HTableDescriptor(table);
199      tableDesc.addFamily(new HColumnDescriptor("http:"));
200      tableDesc.addFamily(new HColumnDescriptor("url:"));
201      tableDesc.addFamily(new HColumnDescriptor("referrer:"));
202      admin.createTable(tableDesc);
203    } else {
204      System.out.println("1. " + table + " table already exists.");
205    }
206    System.out.println("2. access_log files fetching using map/reduce");
207  }
208
209  public static void main(String[] args) throws IOException {
210    String table_name = "apache-log2";
211    String dir = "/user/waue/apache-log";
212   
213    // if (eclipseRun) {
214    // table_name = "log";
215    // dir = "apache-log";
216    // } else if (args.length < 2) {
217    // System.out
218    // .println("Usage: logfetcher <access_log file or directory>
219    // <table_name>");
220    // System.exit(1);
221    // } else {
222    // table_name = args[1];
223    // dir = args[0];
224    // }
225
226    creatTable(table_name);
227    runMapReduce(table_name, dir);
228
229  }
230
231}
Note: See TracBrowser for help on using the repository browser.