/** * Copyright 2007 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package tw.org.nchc.demo; import java.io.IOException; import java.text.ParseException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseAdmin; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTable; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; /** * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default, * W3CExtended, IISw3cExtended) */ public class LogFetcher { static HBaseConfiguration conf = new HBaseConfiguration(); public static final String TABLE = "table.name"; static String tableName; static HTable table = null; static boolean eclipseRun = false; public static class MapClass extends MapReduceBase implements Mapper { @Override public void configure(JobConf job) { tableName = job.get(TABLE, ""); } public void map(WritableComparable key, Text value, OutputCollector output, Reporter reporter) throws IOException { try { AccessLogParser log = new AccessLogParser(value.toString()); if (table == null) table = new HTable(conf, new Text(tableName)); long lockId = table.startUpdate(new Text(log.getIp())); table.put(lockId, new Text("http:protocol"), log.getProtocol() .getBytes()); table.put(lockId, new Text("http:method"), log.getMethod() .getBytes()); table.put(lockId, new Text("http:code"), log.getCode() .getBytes()); table.put(lockId, new Text("http:bytesize"), log.getByteSize() .getBytes()); table.put(lockId, new Text("http:agent"), log.getAgent() .getBytes()); table.put(lockId, new Text("url:" + log.getUrl()), log .getReferrer().getBytes()); table.put(lockId, new Text("referrer:" + log.getReferrer()), log.getUrl().getBytes()); table.commit(lockId, log.getTimestamp()); } catch (ParseException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } static public Path[] listPaths(FileSystem fsm,Path path) throws IOException { FileStatus[] fss = fsm.listStatus(path); int length = fss.length; Path[] pi = new Path[length]; for (int i=0 ; i< length; i++) { pi[i] = fss[i].getPath(); } return pi; } public static void runMapReduce(String table, String dir) throws IOException { Path tempDir = new Path("log/temp"); Path InputDir = new Path(dir); FileSystem fs = FileSystem.get(conf); JobConf jobConf = new JobConf(conf, LogFetcher.class); jobConf.setJobName("apache log fetcher"); jobConf.set(TABLE, table); // my convert function from 0.16 to 0.17 Path[] in = listPaths(fs, InputDir); if (fs.isFile(InputDir)) { jobConf.setInputPath(InputDir); } else { for (int i = 0; i < in.length; i++) { if (fs.isFile(in[i])) { jobConf.addInputPath(in[i]); } else { // my convert function from 0.16 to 0.17 Path[] sub = listPaths(fs, in[i]); for (int j = 0; j < sub.length; j++) { if (fs.isFile(sub[j])) { jobConf.addInputPath(sub[j]); } } } } } jobConf.setOutputPath(tempDir); jobConf.setMapperClass(MapClass.class); JobClient client = new JobClient(jobConf); ClusterStatus cluster = client.getClusterStatus(); jobConf.setNumMapTasks(cluster.getMapTasks()); jobConf.setNumReduceTasks(0); JobClient.runJob(jobConf); fs.delete(tempDir); fs.close(); } public static void creatTable(String table) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (!admin.tableExists(new Text(table))) { System.out.println("1. " + table + " table creating ... please wait"); HTableDescriptor tableDesc = new HTableDescriptor(table); tableDesc.addFamily(new HColumnDescriptor("http:")); tableDesc.addFamily(new HColumnDescriptor("url:")); tableDesc.addFamily(new HColumnDescriptor("referrer:")); admin.createTable(tableDesc); } else { System.out.println("1. " + table + " table already exists."); } System.out.println("2. access_log files fetching using map/reduce"); } @SuppressWarnings("deprecation") public static void main(String[] args) throws IOException { String table_name = "log"; String dir = "apache-log"; if (eclipseRun) { table_name = "log"; dir = "apache-log"; } else if (args.length < 2) { System.out .println("Usage: logfetcher "); System.exit(1); } else { table_name = args[1]; dir = args[0]; } creatTable(table_name); runMapReduce(table_name, dir); } }