source: sample/hadoop-0.17/tw/org/nchc/demo/LogFetcher.java @ 20

Last change on this file since 20 was 20, checked in by waue, 16 years ago

將改完的 hadoop 0.17版package 放來備份
目前繼續開發 hadoop 0.16 + hbase 1.3

File size: 6.0 KB
Line 
1/**
2 * Copyright 2007 The Apache Software Foundation
3 *
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements.  See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership.  The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License.  You may obtain a copy of the License at
11 *
12 *     http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20package tw.org.nchc.demo;
21
22import java.io.IOException;
23import java.text.ParseException;
24
25import org.apache.hadoop.fs.FileSystem;
26import org.apache.hadoop.fs.Path;
27import org.apache.hadoop.hbase.HBaseAdmin;
28import org.apache.hadoop.hbase.HBaseConfiguration;
29import org.apache.hadoop.hbase.HColumnDescriptor;
30import org.apache.hadoop.hbase.HTable;
31import org.apache.hadoop.hbase.HTableDescriptor;
32import org.apache.hadoop.io.Text;
33import org.apache.hadoop.io.Writable;
34import org.apache.hadoop.io.WritableComparable;
35import org.apache.hadoop.mapred.ClusterStatus;
36import org.apache.hadoop.mapred.JobClient;
37import org.apache.hadoop.mapred.JobConf;
38import org.apache.hadoop.mapred.MapReduceBase;
39import org.apache.hadoop.mapred.Mapper;
40import org.apache.hadoop.mapred.OutputCollector;
41import org.apache.hadoop.mapred.Reporter;
42
43import tw.org.nchc.code.Convert;
44
45/**
46 * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
47 * W3CExtended, IISw3cExtended)
48 */
49public class LogFetcher {
50  static HBaseConfiguration conf = new HBaseConfiguration();
51
52  public static final String TABLE = "table.name";
53
54  static String tableName;
55
56  static HTable table = null;
57
58  static boolean eclipseRun = false;
59
60  public static class MapClass extends MapReduceBase implements
61      Mapper<WritableComparable, Text, Text, Writable> {
62
63    @Override
64    public void configure(JobConf job) {
65      tableName = job.get(TABLE, "");
66    }
67
68    public void map(WritableComparable key, Text value,
69        OutputCollector<Text, Writable> output, Reporter reporter)
70        throws IOException {
71      try {
72        AccessLogParser log = new AccessLogParser(value.toString());
73        if (table == null)
74          table = new HTable(conf, new Text(tableName));
75        long lockId = table.startUpdate(new Text(log.getIp()));
76        table.put(lockId, new Text("http:protocol"), log.getProtocol()
77            .getBytes());
78        table.put(lockId, new Text("http:method"), log.getMethod()
79            .getBytes());
80        table.put(lockId, new Text("http:code"), log.getCode()
81            .getBytes());
82        table.put(lockId, new Text("http:bytesize"), log.getByteSize()
83            .getBytes());
84        table.put(lockId, new Text("http:agent"), log.getAgent()
85            .getBytes());
86        table.put(lockId, new Text("url:" + log.getUrl()), log
87            .getReferrer().getBytes());
88        table.put(lockId, new Text("referrer:" + log.getReferrer()),
89            log.getUrl().getBytes());
90        table.commit(lockId, log.getTimestamp());
91      } catch (ParseException e) {
92        e.printStackTrace();
93      } catch (Exception e) {
94        e.printStackTrace();
95      }
96    }
97  }
98
99  public static void runMapReduce(String table, String dir)
100      throws IOException {
101    Path tempDir = new Path("log/temp");
102    Path InputDir = new Path(dir);
103    FileSystem fs = FileSystem.get(conf);
104    JobConf jobConf = new JobConf(conf, LogFetcher.class);
105    jobConf.setJobName("apache log fetcher");
106    jobConf.set(TABLE, table);
107    // my convert function from 0.16 to 0.17
108    Path[] in = Convert.listPaths(fs, InputDir);
109    if (fs.isFile(InputDir)) {
110      // 0.16
111//      jobConf.setInputPath(InputDir);
112      Convert.setInputPath(jobConf, InputDir);
113    } else {
114      for (int i = 0; i < in.length; i++) {
115        if (fs.isFile(in[i])) {
116          // 0.16
117//          jobConf.addInputPath(in[i]);
118          Convert.addInputPath(jobConf,in[i]);
119        } else {
120          // my convert function from 0.16 to 0.17
121          Path[] sub = Convert.listPaths(fs, in[i]);
122          for (int j = 0; j < sub.length; j++) {
123            if (fs.isFile(sub[j])) {
124              // 0.16
125//              jobConf.addInputPath(sub[j]);
126              Convert.addInputPath(jobConf, sub[j]);
127            }
128          }
129        }
130      }
131    }
132    // 0.16
133//    jobConf.setOutputPath(tempDir);
134    Convert.setOutputPath(jobConf, tempDir);
135   
136    jobConf.setMapperClass(MapClass.class);
137
138    JobClient client = new JobClient(jobConf);
139    ClusterStatus cluster = client.getClusterStatus();
140    jobConf.setNumMapTasks(cluster.getMapTasks());
141    jobConf.setNumReduceTasks(0);
142
143    JobClient.runJob(jobConf);
144    // 0.16
145//    fs.delete(tempDir);
146    fs.delete(tempDir,true);
147   
148    fs.close();
149  }
150
151  public static void creatTable(String table) throws IOException {
152    HBaseAdmin admin = new HBaseAdmin(conf);
153    if (!admin.tableExists(new Text(table))) {
154      System.out.println("1. " + table
155          + " table creating ... please wait");
156      HTableDescriptor tableDesc = new HTableDescriptor(table);
157      tableDesc.addFamily(new HColumnDescriptor("http:"));
158      tableDesc.addFamily(new HColumnDescriptor("url:"));
159      tableDesc.addFamily(new HColumnDescriptor("referrer:"));
160      admin.createTable(tableDesc);
161    } else {
162      System.out.println("1. " + table + " table already exists.");
163    }
164    System.out.println("2. access_log files fetching using map/reduce");
165  }
166
167  @SuppressWarnings("deprecation")
168  public static void main(String[] args) throws IOException {
169    String table_name = "log";
170    String dir = "apache-log";
171
172    if (eclipseRun) {
173      table_name = "log";
174      dir = "apache-log";
175    } else if (args.length < 2) {
176      System.out
177          .println("Usage: logfetcher <access_log file or directory> <table_name>");
178      System.exit(1);
179    } else {
180      table_name = args[1];
181      dir = args[0];
182    }
183    creatTable(table_name);
184    runMapReduce(table_name, dir);
185
186  }
187
188}
Note: See TracBrowser for help on using the repository browser.