source: sample/hadoop-0.16/tw/org/nchc/demo/LogFetcher.java @ 28

Last change on this file since 28 was 27, checked in by waue, 16 years ago

test!

File size: 6.2 KB
Line 
1/**
2 * Program: LogFetcher.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7/**
8 * Copyright 2007 The Apache Software Foundation
9 *
10 * Licensed to the Apache Software Foundation (ASF) under one
11 * or more contributor license agreements.  See the NOTICE file
12 * distributed with this work for additional information
13 * regarding copyright ownership.  The ASF licenses this file
14 * to you under the Apache License, Version 2.0 (the
15 * "License"); you may not use this file except in compliance
16 * with the License.  You may obtain a copy of the License at
17 *
18 *     http://www.apache.org/licenses/LICENSE-2.0
19 *
20 * Unless required by applicable law or agreed to in writing, software
21 * distributed under the License is distributed on an "AS IS" BASIS,
22 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 * See the License for the specific language governing permissions and
24 * limitations under the License.
25 */
26package tw.org.nchc.demo;
27
28import java.io.IOException;
29import java.text.ParseException;
30
31import org.apache.hadoop.fs.FileStatus;
32import org.apache.hadoop.fs.FileSystem;
33import org.apache.hadoop.fs.Path;
34import org.apache.hadoop.hbase.HBaseAdmin;
35import org.apache.hadoop.hbase.HBaseConfiguration;
36import org.apache.hadoop.hbase.HColumnDescriptor;
37import org.apache.hadoop.hbase.HTable;
38import org.apache.hadoop.hbase.HTableDescriptor;
39import org.apache.hadoop.io.Text;
40import org.apache.hadoop.io.Writable;
41import org.apache.hadoop.io.WritableComparable;
42import org.apache.hadoop.mapred.ClusterStatus;
43import org.apache.hadoop.mapred.JobClient;
44import org.apache.hadoop.mapred.JobConf;
45import org.apache.hadoop.mapred.MapReduceBase;
46import org.apache.hadoop.mapred.Mapper;
47import org.apache.hadoop.mapred.OutputCollector;
48import org.apache.hadoop.mapred.Reporter;
49
50/**
51 * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
52 * W3CExtended, IISw3cExtended)
53 */
54public class LogFetcher {
55  static HBaseConfiguration conf = new HBaseConfiguration();
56
57  public static final String TABLE = "table.name";
58
59  static String tableName;
60
61  static HTable table = null;
62
63  static boolean eclipseRun = false;
64
65  public static class MapClass extends MapReduceBase implements
66      Mapper<WritableComparable, Text, Text, Writable> {
67
68    @Override
69    public void configure(JobConf job) {
70      tableName = job.get(TABLE, "");
71    }
72
73    public void map(WritableComparable key, Text value,
74        OutputCollector<Text, Writable> output, Reporter reporter)
75        throws IOException {
76      try {
77        AccessLogParser log = new AccessLogParser(value.toString());
78        if (table == null)
79          table = new HTable(conf, new Text(tableName));
80        long lockId = table.startUpdate(new Text(log.getIp()));
81        table.put(lockId, new Text("http:protocol"), log.getProtocol()
82            .getBytes());
83        table.put(lockId, new Text("http:method"), log.getMethod()
84            .getBytes());
85        table.put(lockId, new Text("http:code"), log.getCode()
86            .getBytes());
87        table.put(lockId, new Text("http:bytesize"), log.getByteSize()
88            .getBytes());
89        table.put(lockId, new Text("http:agent"), log.getAgent()
90            .getBytes());
91        table.put(lockId, new Text("url:" + log.getUrl()), log
92            .getReferrer().getBytes());
93        table.put(lockId, new Text("referrer:" + log.getReferrer()),
94            log.getUrl().getBytes());
95        table.commit(lockId, log.getTimestamp());
96      } catch (ParseException e) {
97        e.printStackTrace();
98      } catch (Exception e) {
99        e.printStackTrace();
100      }
101    }
102  }
103//   do it to resolve warning : FileSystem.listPaths
104  static public Path[] listPaths(FileSystem fsm,Path path) throws IOException
105  {
106    FileStatus[] fss = fsm.listStatus(path);
107    int length = fss.length;
108    Path[] pi = new Path[length];
109    for (int i=0 ; i< length; i++)
110    {
111      pi[i] = fss[i].getPath();
112    }
113    return pi;
114  } 
115  public static void runMapReduce(String table, String dir)
116      throws IOException {
117    Path tempDir = new Path("log/temp");
118    Path InputDir = new Path(dir);
119    FileSystem fs = FileSystem.get(conf);
120    JobConf jobConf = new JobConf(conf, LogFetcher.class);
121    jobConf.setJobName("apache log fetcher");
122    jobConf.set(TABLE, table);
123//     Path[] in = FileSystem.listPaths(fs, InputDir);
124    Path[] in = listPaths(fs, InputDir);
125    if (fs.isFile(InputDir)) {
126      jobConf.setInputPath(InputDir);
127    } else {
128      for (int i = 0; i < in.length; i++) {
129        if (fs.isFile(in[i])) {
130          jobConf.addInputPath(in[i]);
131        } else {
132//          Path[] sub = FileSystem.listPaths(fs, in[i]);
133          Path[] sub = listPaths(fs, in[i]);
134          for (int j = 0; j < sub.length; j++) {
135            if (fs.isFile(sub[j])) {
136              jobConf.addInputPath(sub[j]);
137            }
138          }
139        }
140      }
141    }
142    jobConf.setOutputPath(tempDir);
143   
144    jobConf.setMapperClass(MapClass.class);
145
146    JobClient client = new JobClient(jobConf);
147    ClusterStatus cluster = client.getClusterStatus();
148    jobConf.setNumMapTasks(cluster.getMapTasks());
149    jobConf.setNumReduceTasks(0);
150
151    JobClient.runJob(jobConf);
152
153    fs.delete(tempDir);   
154    fs.close();
155  }
156
157  public static void creatTable(String table) throws IOException {
158    HBaseAdmin admin = new HBaseAdmin(conf);
159    if (!admin.tableExists(new Text(table))) {
160      System.out.println("1. " + table
161          + " table creating ... please wait");
162      HTableDescriptor tableDesc = new HTableDescriptor(table);
163      tableDesc.addFamily(new HColumnDescriptor("http:"));
164      tableDesc.addFamily(new HColumnDescriptor("url:"));
165      tableDesc.addFamily(new HColumnDescriptor("referrer:"));
166      admin.createTable(tableDesc);
167    } else {
168      System.out.println("1. " + table + " table already exists.");
169    }
170    System.out.println("2. access_log files fetching using map/reduce");
171  }
172
173  @SuppressWarnings("deprecation")
174  public static void main(String[] args) throws IOException {
175    String table_name = "log";
176    String dir = "apache-log";
177
178    if (eclipseRun) {
179      table_name = "log";
180      dir = "apache-log";
181    } else if (args.length < 2) {
182      System.out
183          .println("Usage: logfetcher <access_log file or directory> <table_name>");
184      System.exit(1);
185    } else {
186      table_name = args[1];
187      dir = args[0];
188    }
189    creatTable(table_name);
190    runMapReduce(table_name, dir);
191
192  }
193
194}
Note: See TracBrowser for help on using the repository browser.