{{{ public static void creatTable(String table) throws IOException{ HConnection conn = HConnectionManager.getConnection(conf); HBaseAdmin admin = new HBaseAdmin(conf); if(!admin.tableExists(new Text(table))){ System.out.println("1. " + table + " table creating ... please wait"); HTableDescriptor tableDesc = new HTableDescriptor(table); tableDesc.addFamily(new HColumnDescriptor("http:")); tableDesc.addFamily(new HColumnDescriptor("url:")); tableDesc.addFamily(new HColumnDescriptor("referrer:")); admin.createTable(tableDesc); } else { System.out.println("1. " + table + " table already exists."); } System.out.println("2. access_log files fetching using map/reduce"); } }}} {{{ public static class MapClass extends MapReduceBase implements Mapper { @Override public void configure(JobConf job) { tableName = job.get(TABLE, ""); } public void map(WritableComparable key, Text value, OutputCollector output, Reporter reporter) throws IOException { try { AccessLogParser log = new AccessLogParser(value.toString()); if(table==null) table = new HTable(conf, new Text(tableName)); long lockId = table.startUpdate(new Text(log.getIp())); table.put(lockId, new Text("http:protocol"), log.getProtocol().getBytes()); table.put(lockId, new Text("http:method"), log.getMethod().getBytes()); table.put(lockId, new Text("http:code"), log.getCode().getBytes()); table.put(lockId, new Text("http:bytesize"), log.getByteSize().getBytes()); table.put(lockId, new Text("http:agent"), log.getAgent().getBytes()); table.put(lockId, new Text("url:" + log.getUrl()), log.getReferrer().getBytes()); table.put(lockId, new Text("referrer:" + log.getReferrer()), log.getUrl().getBytes()); table.commit(lockId, log.getTimestamp()); } catch (ParseException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } } }}} {{{ public static void runMapReduce(String table,String dir) throws IOException{ Path tempDir = new Path("log/temp"); Path InputDir = new Path(dir); FileSystem fs = FileSystem.get(conf); JobConf jobConf = new JobConf(conf, LogFetcher.class); jobConf.setJobName("apache log fetcher"); jobConf.set(TABLE, table); Path[] in = fs.listPaths(InputDir); if (fs.isFile(InputDir)) { jobConf.setInputPath(InputDir); } else { for (int i = 0; i < in.length; i++) { if (fs.isFile(in[i])) { jobConf.addInputPath(in[i]); } else { Path[] sub = fs.listPaths(in[i]); for (int j = 0; j < sub.length; j++) { if (fs.isFile(sub[j])) { jobConf.addInputPath(sub[j]); } } } } } jobConf.setOutputPath(tempDir); jobConf.setMapperClass(MapClass.class); JobClient client = new JobClient(jobConf); ClusterStatus cluster = client.getClusterStatus(); jobConf.setNumMapTasks(cluster.getMapTasks()); jobConf.setNumReduceTasks(0); JobClient.runJob(jobConf); fs.delete(tempDir); fs.close(); } }}}