source: sample/hadoop-0.16/tw/org/nchc/demo/LogFetcher.java @ 25

Last change on this file since 25 was 25, checked in by waue, 16 years ago

downgrade from 0.17 to 0.16
test for work -> not yet

File size: 6.0 KB
Line 
1/**
2 * Copyright 2007 The Apache Software Foundation
3 *
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements.  See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership.  The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License.  You may obtain a copy of the License at
11 *
12 *     http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20package tw.org.nchc.demo;
21
22import java.io.IOException;
23import java.text.ParseException;
24
25import org.apache.hadoop.fs.FileStatus;
26import org.apache.hadoop.fs.FileSystem;
27import org.apache.hadoop.fs.Path;
28import org.apache.hadoop.hbase.HBaseAdmin;
29import org.apache.hadoop.hbase.HBaseConfiguration;
30import org.apache.hadoop.hbase.HColumnDescriptor;
31import org.apache.hadoop.hbase.HTable;
32import org.apache.hadoop.hbase.HTableDescriptor;
33import org.apache.hadoop.io.Text;
34import org.apache.hadoop.io.Writable;
35import org.apache.hadoop.io.WritableComparable;
36import org.apache.hadoop.mapred.ClusterStatus;
37import org.apache.hadoop.mapred.JobClient;
38import org.apache.hadoop.mapred.JobConf;
39import org.apache.hadoop.mapred.MapReduceBase;
40import org.apache.hadoop.mapred.Mapper;
41import org.apache.hadoop.mapred.OutputCollector;
42import org.apache.hadoop.mapred.Reporter;
43
44/**
45 * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
46 * W3CExtended, IISw3cExtended)
47 */
48public class LogFetcher {
49  static HBaseConfiguration conf = new HBaseConfiguration();
50
51  public static final String TABLE = "table.name";
52
53  static String tableName;
54
55  static HTable table = null;
56
57  static boolean eclipseRun = false;
58
59  public static class MapClass extends MapReduceBase implements
60      Mapper<WritableComparable, Text, Text, Writable> {
61
62    @Override
63    public void configure(JobConf job) {
64      tableName = job.get(TABLE, "");
65    }
66
67    public void map(WritableComparable key, Text value,
68        OutputCollector<Text, Writable> output, Reporter reporter)
69        throws IOException {
70      try {
71        AccessLogParser log = new AccessLogParser(value.toString());
72        if (table == null)
73          table = new HTable(conf, new Text(tableName));
74        long lockId = table.startUpdate(new Text(log.getIp()));
75        table.put(lockId, new Text("http:protocol"), log.getProtocol()
76            .getBytes());
77        table.put(lockId, new Text("http:method"), log.getMethod()
78            .getBytes());
79        table.put(lockId, new Text("http:code"), log.getCode()
80            .getBytes());
81        table.put(lockId, new Text("http:bytesize"), log.getByteSize()
82            .getBytes());
83        table.put(lockId, new Text("http:agent"), log.getAgent()
84            .getBytes());
85        table.put(lockId, new Text("url:" + log.getUrl()), log
86            .getReferrer().getBytes());
87        table.put(lockId, new Text("referrer:" + log.getReferrer()),
88            log.getUrl().getBytes());
89        table.commit(lockId, log.getTimestamp());
90      } catch (ParseException e) {
91        e.printStackTrace();
92      } catch (Exception e) {
93        e.printStackTrace();
94      }
95    }
96  }
97  static public Path[] listPaths(FileSystem fsm,Path path) throws IOException
98  {
99    FileStatus[] fss = fsm.listStatus(path);
100    int length = fss.length;
101    Path[] pi = new Path[length];
102    for (int i=0 ; i< length; i++)
103    {
104      pi[i] = fss[i].getPath();
105    }
106    return pi;
107  } 
108  public static void runMapReduce(String table, String dir)
109      throws IOException {
110    Path tempDir = new Path("log/temp");
111    Path InputDir = new Path(dir);
112    FileSystem fs = FileSystem.get(conf);
113    JobConf jobConf = new JobConf(conf, LogFetcher.class);
114    jobConf.setJobName("apache log fetcher");
115    jobConf.set(TABLE, table);
116    // my convert function from 0.16 to 0.17
117    Path[] in = listPaths(fs, InputDir);
118    if (fs.isFile(InputDir)) {
119      jobConf.setInputPath(InputDir);
120    } else {
121      for (int i = 0; i < in.length; i++) {
122        if (fs.isFile(in[i])) {
123          jobConf.addInputPath(in[i]);
124        } else {
125          // my convert function from 0.16 to 0.17
126          Path[] sub = listPaths(fs, in[i]);
127          for (int j = 0; j < sub.length; j++) {
128            if (fs.isFile(sub[j])) {
129              jobConf.addInputPath(sub[j]);
130            }
131          }
132        }
133      }
134    }
135    jobConf.setOutputPath(tempDir);
136   
137    jobConf.setMapperClass(MapClass.class);
138
139    JobClient client = new JobClient(jobConf);
140    ClusterStatus cluster = client.getClusterStatus();
141    jobConf.setNumMapTasks(cluster.getMapTasks());
142    jobConf.setNumReduceTasks(0);
143
144    JobClient.runJob(jobConf);
145
146    fs.delete(tempDir);   
147    fs.close();
148  }
149
150  public static void creatTable(String table) throws IOException {
151    HBaseAdmin admin = new HBaseAdmin(conf);
152    if (!admin.tableExists(new Text(table))) {
153      System.out.println("1. " + table
154          + " table creating ... please wait");
155      HTableDescriptor tableDesc = new HTableDescriptor(table);
156      tableDesc.addFamily(new HColumnDescriptor("http:"));
157      tableDesc.addFamily(new HColumnDescriptor("url:"));
158      tableDesc.addFamily(new HColumnDescriptor("referrer:"));
159      admin.createTable(tableDesc);
160    } else {
161      System.out.println("1. " + table + " table already exists.");
162    }
163    System.out.println("2. access_log files fetching using map/reduce");
164  }
165
166  @SuppressWarnings("deprecation")
167  public static void main(String[] args) throws IOException {
168    String table_name = "log";
169    String dir = "apache-log";
170
171    if (eclipseRun) {
172      table_name = "log";
173      dir = "apache-log";
174    } else if (args.length < 2) {
175      System.out
176          .println("Usage: logfetcher <access_log file or directory> <table_name>");
177      System.exit(1);
178    } else {
179      table_name = args[1];
180      dir = args[0];
181    }
182    creatTable(table_name);
183    runMapReduce(table_name, dir);
184
185  }
186
187}
Note: See TracBrowser for help on using the repository browser.