source: sample/hadoop-0.16/tw/org/nchc/demo/LogFetcher.java @ 218

Last change on this file since 218 was 31, checked in by waue, 16 years ago

update some new ..

File size: 7.2 KB
RevLine 
[21]1/**
[27]2 * Program: LogFetcher.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7/**
[31]8 * Purpose :
9 *  This program will parse your apache log and store it into Hbase.
10 *
11 * HowToUse :
12 *  Make sure two thing :
13 *  1. Upload apache logs ( /var/log/apache2/access.log* ) to \
14 *    hdfs (default: /user/waue/apache-log) \
15 *   $ bin/hadoop dfs -put /var/log/apache2/ apache-log
16 *  2. parameter "dir" in main contains the logs.
17 *  3. you should filter the exception contents manually, \
18 *    ex:  ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...
19 * 
20 * Check Result:
21 *  Go to hbase console, type :
22 *    hql > select * from apache-log;
23
24+-------------------------+-------------------------+-------------------------+
25| Row                     | Column                  | Cell                    |
26+-------------------------+-------------------------+-------------------------+
27| 118.170.101.250         | http:agent              | Mozilla/4.0 (compatible;|
28|                         |                         |  MSIE 4.01; Windows 95) |
29+-------------------------+-------------------------+-------------------------+
30| 118.170.101.250         | http:bytesize           | 318                     |
31+-------------------------+-------------------------+-------------------------+
32..........(skip)........
33+-------------------------+-------------------------+-------------------------+
34| 87.65.93.58             | http:method             | OPTIONS                 |
35+-------------------------+-------------------------+-------------------------+
36| 87.65.93.58             | http:protocol           | HTTP/1.1                |
37+-------------------------+-------------------------+-------------------------+
38| 87.65.93.58             | referrer:-              | *                       |
39+-------------------------+-------------------------+-------------------------+
40| 87.65.93.58             | url:*                   | -                       |
41+-------------------------+-------------------------+-------------------------+
4231 row(s) in set. (0.58 sec)
43
[21]44 */
[31]45
46
[21]47package tw.org.nchc.demo;
48
49import java.io.IOException;
50import java.text.ParseException;
51
[25]52import org.apache.hadoop.fs.FileStatus;
[21]53import org.apache.hadoop.fs.FileSystem;
54import org.apache.hadoop.fs.Path;
55import org.apache.hadoop.hbase.HBaseAdmin;
56import org.apache.hadoop.hbase.HBaseConfiguration;
57import org.apache.hadoop.hbase.HColumnDescriptor;
58import org.apache.hadoop.hbase.HTable;
59import org.apache.hadoop.hbase.HTableDescriptor;
60import org.apache.hadoop.io.Text;
61import org.apache.hadoop.io.Writable;
62import org.apache.hadoop.io.WritableComparable;
63import org.apache.hadoop.mapred.ClusterStatus;
64import org.apache.hadoop.mapred.JobClient;
65import org.apache.hadoop.mapred.JobConf;
66import org.apache.hadoop.mapred.MapReduceBase;
67import org.apache.hadoop.mapred.Mapper;
68import org.apache.hadoop.mapred.OutputCollector;
69import org.apache.hadoop.mapred.Reporter;
[29]70// import AccessLogParser
[21]71/**
72 * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
73 * W3CExtended, IISw3cExtended)
74 */
75public class LogFetcher {
76  static HBaseConfiguration conf = new HBaseConfiguration();
77
78  public static final String TABLE = "table.name";
79
80  static String tableName;
81
82  static HTable table = null;
83
84  static boolean eclipseRun = false;
85
86  public static class MapClass extends MapReduceBase implements
87      Mapper<WritableComparable, Text, Text, Writable> {
88
89    @Override
90    public void configure(JobConf job) {
91      tableName = job.get(TABLE, "");
92    }
93
94    public void map(WritableComparable key, Text value,
95        OutputCollector<Text, Writable> output, Reporter reporter)
96        throws IOException {
97      try {
[29]98       
[21]99        AccessLogParser log = new AccessLogParser(value.toString());
100        if (table == null)
101          table = new HTable(conf, new Text(tableName));
102        long lockId = table.startUpdate(new Text(log.getIp()));
103        table.put(lockId, new Text("http:protocol"), log.getProtocol()
104            .getBytes());
105        table.put(lockId, new Text("http:method"), log.getMethod()
106            .getBytes());
107        table.put(lockId, new Text("http:code"), log.getCode()
108            .getBytes());
109        table.put(lockId, new Text("http:bytesize"), log.getByteSize()
110            .getBytes());
111        table.put(lockId, new Text("http:agent"), log.getAgent()
112            .getBytes());
113        table.put(lockId, new Text("url:" + log.getUrl()), log
114            .getReferrer().getBytes());
115        table.put(lockId, new Text("referrer:" + log.getReferrer()),
116            log.getUrl().getBytes());
117        table.commit(lockId, log.getTimestamp());
118      } catch (ParseException e) {
119        e.printStackTrace();
120      } catch (Exception e) {
121        e.printStackTrace();
122      }
123    }
124  }
[27]125//   do it to resolve warning : FileSystem.listPaths
[25]126  static public Path[] listPaths(FileSystem fsm,Path path) throws IOException
127  {
128    FileStatus[] fss = fsm.listStatus(path);
129    int length = fss.length;
130    Path[] pi = new Path[length];
131    for (int i=0 ; i< length; i++)
132    {
133      pi[i] = fss[i].getPath();
134    }
135    return pi;
136  } 
[21]137  public static void runMapReduce(String table, String dir)
138      throws IOException {
139    Path tempDir = new Path("log/temp");
140    Path InputDir = new Path(dir);
141    FileSystem fs = FileSystem.get(conf);
142    JobConf jobConf = new JobConf(conf, LogFetcher.class);
143    jobConf.setJobName("apache log fetcher");
144    jobConf.set(TABLE, table);
[25]145    Path[] in = listPaths(fs, InputDir);
[21]146    if (fs.isFile(InputDir)) {
[25]147      jobConf.setInputPath(InputDir);
[21]148    } else {
149      for (int i = 0; i < in.length; i++) {
150        if (fs.isFile(in[i])) {
[25]151          jobConf.addInputPath(in[i]);
[21]152        } else {
[25]153          Path[] sub = listPaths(fs, in[i]);
[21]154          for (int j = 0; j < sub.length; j++) {
155            if (fs.isFile(sub[j])) {
[25]156              jobConf.addInputPath(sub[j]);
[21]157            }
158          }
159        }
160      }
161    }
[25]162    jobConf.setOutputPath(tempDir);
[21]163   
164    jobConf.setMapperClass(MapClass.class);
165
166    JobClient client = new JobClient(jobConf);
167    ClusterStatus cluster = client.getClusterStatus();
168    jobConf.setNumMapTasks(cluster.getMapTasks());
169    jobConf.setNumReduceTasks(0);
170
171    JobClient.runJob(jobConf);
[25]172
173    fs.delete(tempDir);   
[21]174    fs.close();
175  }
176
177  public static void creatTable(String table) throws IOException {
178    HBaseAdmin admin = new HBaseAdmin(conf);
179    if (!admin.tableExists(new Text(table))) {
180      System.out.println("1. " + table
181          + " table creating ... please wait");
182      HTableDescriptor tableDesc = new HTableDescriptor(table);
183      tableDesc.addFamily(new HColumnDescriptor("http:"));
184      tableDesc.addFamily(new HColumnDescriptor("url:"));
185      tableDesc.addFamily(new HColumnDescriptor("referrer:"));
186      admin.createTable(tableDesc);
187    } else {
188      System.out.println("1. " + table + " table already exists.");
189    }
190    System.out.println("2. access_log files fetching using map/reduce");
191  }
192
193  public static void main(String[] args) throws IOException {
194    String table_name = "log";
195    String dir = "apache-log";
196
197    if (eclipseRun) {
198      table_name = "log";
199      dir = "apache-log";
200    } else if (args.length < 2) {
201      System.out
202          .println("Usage: logfetcher <access_log file or directory> <table_name>");
203      System.exit(1);
204    } else {
205      table_name = args[1];
206      dir = args[0];
207    }
208    creatTable(table_name);
209    runMapReduce(table_name, dir);
210
211  }
212
213}
Note: See TracBrowser for help on using the repository browser.