source: sample/hadoop-0.16/tw/org/nchc/code/LogParserGo.java @ 30

Last change on this file since 30 was 30, checked in by waue, 16 years ago

..

File size: 6.1 KB
Line 
1/**
2 * Program: LogFetcher.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7/**
8 * Copyright 2007 The Apache Software Foundation
9 *
10 * Licensed to the Apache Software Foundation (ASF) under one
11 * or more contributor license agreements.  See the NOTICE file
12 * distributed with this work for additional information
13 * regarding copyright ownership.  The ASF licenses this file
14 * to you under the Apache License, Version 2.0 (the
15 * "License"); you may not use this file except in compliance
16 * with the License.  You may obtain a copy of the License at
17 *
18 *     http://www.apache.org/licenses/LICENSE-2.0
19 *
20 * Unless required by applicable law or agreed to in writing, software
21 * distributed under the License is distributed on an "AS IS" BASIS,
22 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23 * See the License for the specific language governing permissions and
24 * limitations under the License.
25 */
26package tw.org.nchc.code;
27
28import java.io.IOException;
29import java.text.ParseException;
30
31import org.apache.hadoop.fs.FileStatus;
32import org.apache.hadoop.fs.FileSystem;
33import org.apache.hadoop.fs.Path;
34import org.apache.hadoop.hbase.HBaseAdmin;
35import org.apache.hadoop.hbase.HBaseConfiguration;
36import org.apache.hadoop.hbase.HColumnDescriptor;
37import org.apache.hadoop.hbase.HTable;
38import org.apache.hadoop.hbase.HTableDescriptor;
39import org.apache.hadoop.io.Text;
40import org.apache.hadoop.io.Writable;
41import org.apache.hadoop.io.WritableComparable;
42import org.apache.hadoop.mapred.ClusterStatus;
43import org.apache.hadoop.mapred.JobClient;
44import org.apache.hadoop.mapred.JobConf;
45import org.apache.hadoop.mapred.MapReduceBase;
46import org.apache.hadoop.mapred.Mapper;
47import org.apache.hadoop.mapred.OutputCollector;
48import org.apache.hadoop.mapred.Reporter;
49// import AccessLogParser
50/**
51 * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
52 * W3CExtended, IISw3cExtended)
53 */
54public class LogFetcher {
55  static HBaseConfiguration conf = new HBaseConfiguration();
56
57  public static final String TABLE = "table.name";
58
59  static String tableName;
60
61  static HTable table = null;
62
63  static boolean eclipseRun = false;
64
65  public static class MapClass extends MapReduceBase implements
66      Mapper<WritableComparable, Text, Text, Writable> {
67
68    @Override
69    public void configure(JobConf job) {
70      tableName = job.get(TABLE, "");
71    }
72
73    public void map(WritableComparable key, Text value,
74        OutputCollector<Text, Writable> output, Reporter reporter)
75        throws IOException {
76      try {
77       
78        AccessLogParser log = new AccessLogParser(value.toString());
79        if (table == null)
80          table = new HTable(conf, new Text(tableName));
81        long lockId = table.startUpdate(new Text(log.getIp()));
82        table.put(lockId, new Text("http:protocol"), log.getProtocol()
83            .getBytes());
84        table.put(lockId, new Text("http:method"), log.getMethod()
85            .getBytes());
86        table.put(lockId, new Text("http:code"), log.getCode()
87            .getBytes());
88        table.put(lockId, new Text("http:bytesize"), log.getByteSize()
89            .getBytes());
90        table.put(lockId, new Text("http:agent"), log.getAgent()
91            .getBytes());
92        table.put(lockId, new Text("url:" + log.getUrl()), log
93            .getReferrer().getBytes());
94        table.put(lockId, new Text("referrer:" + log.getReferrer()),
95            log.getUrl().getBytes());
96        table.commit(lockId, log.getTimestamp());
97      } catch (ParseException e) {
98        e.printStackTrace();
99      } catch (Exception e) {
100        e.printStackTrace();
101      }
102    }
103  }
104//   do it to resolve warning : FileSystem.listPaths
105  static public Path[] listPaths(FileSystem fsm,Path path) throws IOException
106  {
107    FileStatus[] fss = fsm.listStatus(path);
108    int length = fss.length;
109    Path[] pi = new Path[length];
110    for (int i=0 ; i< length; i++)
111    {
112      pi[i] = fss[i].getPath();
113    }
114    return pi;
115  } 
116  public static void runMapReduce(String table, String dir)
117      throws IOException {
118    Path tempDir = new Path("log/temp");
119    Path InputDir = new Path(dir);
120    FileSystem fs = FileSystem.get(conf);
121    JobConf jobConf = new JobConf(conf, LogFetcher.class);
122    jobConf.setJobName("apache log fetcher");
123    jobConf.set(TABLE, table);
124    Path[] in = listPaths(fs, InputDir);
125    if (fs.isFile(InputDir)) {
126      jobConf.setInputPath(InputDir);
127    } else {
128      for (int i = 0; i < in.length; i++) {
129        if (fs.isFile(in[i])) {
130          jobConf.addInputPath(in[i]);
131        } else {
132          Path[] sub = listPaths(fs, in[i]);
133          for (int j = 0; j < sub.length; j++) {
134            if (fs.isFile(sub[j])) {
135              jobConf.addInputPath(sub[j]);
136            }
137          }
138        }
139      }
140    }
141    jobConf.setOutputPath(tempDir);
142   
143    jobConf.setMapperClass(MapClass.class);
144
145    JobClient client = new JobClient(jobConf);
146    ClusterStatus cluster = client.getClusterStatus();
147    jobConf.setNumMapTasks(cluster.getMapTasks());
148    jobConf.setNumReduceTasks(0);
149
150    JobClient.runJob(jobConf);
151
152    fs.delete(tempDir);   
153    fs.close();
154  }
155
156  public static void creatTable(String table) throws IOException {
157    HBaseAdmin admin = new HBaseAdmin(conf);
158    if (!admin.tableExists(new Text(table))) {
159      System.out.println("1. " + table
160          + " table creating ... please wait");
161      HTableDescriptor tableDesc = new HTableDescriptor(table);
162      tableDesc.addFamily(new HColumnDescriptor("http:"));
163      tableDesc.addFamily(new HColumnDescriptor("url:"));
164      tableDesc.addFamily(new HColumnDescriptor("referrer:"));
165      admin.createTable(tableDesc);
166    } else {
167      System.out.println("1. " + table + " table already exists.");
168    }
169    System.out.println("2. access_log files fetching using map/reduce");
170  }
171
172  public static void main(String[] args) throws IOException {
173    String table_name = "log";
174    String dir = "apache-log";
175
176    if (eclipseRun) {
177      table_name = "log";
178      dir = "apache-log";
179    } else if (args.length < 2) {
180      System.out
181          .println("Usage: logfetcher <access_log file or directory> <table_name>");
182      System.exit(1);
183    } else {
184      table_name = args[1];
185      dir = args[0];
186    }
187    creatTable(table_name);
188    runMapReduce(table_name, dir);
189
190  }
191
192}
Note: See TracBrowser for help on using the repository browser.