public static void creatTable(String table) throws IOException{
HConnection conn = HConnectionManager.getConnection(conf);
HBaseAdmin admin = new HBaseAdmin(conf);
if(!admin.tableExists(new Text(table))){
System.out.println("1. " + table + " table creating ... please wait");
HTableDescriptor tableDesc = new HTableDescriptor(table);
tableDesc.addFamily(new HColumnDescriptor("http:"));
tableDesc.addFamily(new HColumnDescriptor("url:"));
tableDesc.addFamily(new HColumnDescriptor("referrer:"));
admin.createTable(tableDesc);
} else {
System.out.println("1. " + table + " table already exists.");
}
System.out.println("2. access_log files fetching using map/reduce");
}
public static class MapClass extends MapReduceBase implements
Mapper<WritableComparable, Text, Text, Writable> {
@Override
public void configure(JobConf job) {
tableName = job.get(TABLE, "");
}
public void map(WritableComparable key, Text value,
OutputCollector<Text, Writable> output, Reporter reporter)
throws IOException {
try {
AccessLogParser log = new AccessLogParser(value.toString());
if(table==null)
table = new HTable(conf, new Text(tableName));
long lockId = table.startUpdate(new Text(log.getIp()));
table.put(lockId, new Text("http:protocol"), log.getProtocol().getBytes());
table.put(lockId, new Text("http:method"), log.getMethod().getBytes());
table.put(lockId, new Text("http:code"), log.getCode().getBytes());
table.put(lockId, new Text("http:bytesize"), log.getByteSize().getBytes());
table.put(lockId, new Text("http:agent"), log.getAgent().getBytes());
table.put(lockId, new Text("url:" + log.getUrl()), log.getReferrer().getBytes());
table.put(lockId, new Text("referrer:" + log.getReferrer()), log.getUrl().getBytes());
table.commit(lockId, log.getTimestamp());
} catch (ParseException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void runMapReduce(String table,String dir) throws IOException{
Path tempDir = new Path("log/temp");
Path InputDir = new Path(dir);
FileSystem fs = FileSystem.get(conf);
JobConf jobConf = new JobConf(conf, LogFetcher.class);
jobConf.setJobName("apache log fetcher");
jobConf.set(TABLE, table);
Path[] in = fs.listPaths(InputDir);
if (fs.isFile(InputDir)) {
jobConf.setInputPath(InputDir);
} else {
for (int i = 0; i < in.length; i++) {
if (fs.isFile(in[i])) {
jobConf.addInputPath(in[i]);
} else {
Path[] sub = fs.listPaths(in[i]);
for (int j = 0; j < sub.length; j++) {
if (fs.isFile(sub[j])) {
jobConf.addInputPath(sub[j]);
}
}
}
}
}
jobConf.setOutputPath(tempDir);
jobConf.setMapperClass(MapClass.class);
JobClient client = new JobClient(jobConf);
ClusterStatus cluster = client.getClusterStatus();
jobConf.setNumMapTasks(cluster.getMapTasks());
jobConf.setNumReduceTasks(0);
JobClient.runJob(jobConf);
fs.delete(tempDir);
fs.close();
}