wiki:waue/2009/0724

Version 1 (modified by waue, 15 years ago) (diff)

--

 public static void creatTable(String table) throws IOException{
	    HConnection conn = HConnectionManager.getConnection(conf);
	    HBaseAdmin admin = new HBaseAdmin(conf);
	    if(!admin.tableExists(new Text(table))){
	      System.out.println("1. " + table + " table creating ... please wait");
	      HTableDescriptor tableDesc = new HTableDescriptor(table);
	      tableDesc.addFamily(new HColumnDescriptor("http:"));
	      tableDesc.addFamily(new HColumnDescriptor("url:"));
	      tableDesc.addFamily(new HColumnDescriptor("referrer:"));
	      admin.createTable(tableDesc);
	    } else {
	      System.out.println("1. " + table + " table already exists.");
	    }
	    System.out.println("2. access_log files fetching using map/reduce");
  }
 public static class MapClass extends MapReduceBase implements
      Mapper<WritableComparable, Text, Text, Writable> {
 
    @Override
    public void configure(JobConf job) {
      tableName = job.get(TABLE, "");
    }
 
    public void map(WritableComparable key, Text value,
        OutputCollector<Text, Writable> output, Reporter reporter)
        throws IOException {
      try {
    	 AccessLogParser log = new AccessLogParser(value.toString());
        if(table==null)
        	table = new HTable(conf, new Text(tableName));
        long lockId = table.startUpdate(new Text(log.getIp()));
        table.put(lockId, new Text("http:protocol"), log.getProtocol().getBytes());
        table.put(lockId, new Text("http:method"), log.getMethod().getBytes());
        table.put(lockId, new Text("http:code"), log.getCode().getBytes());
        table.put(lockId, new Text("http:bytesize"), log.getByteSize().getBytes());
        table.put(lockId, new Text("http:agent"), log.getAgent().getBytes());
        table.put(lockId, new Text("url:" + log.getUrl()), log.getReferrer().getBytes());
        table.put(lockId, new Text("referrer:" + log.getReferrer()), log.getUrl().getBytes());
 
        table.commit(lockId, log.getTimestamp());
      } catch (ParseException e) {
        e.printStackTrace();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

 public static void runMapReduce(String table,String dir) throws IOException{
	  Path tempDir = new Path("log/temp");
	  Path InputDir = new Path(dir);
	  FileSystem fs = FileSystem.get(conf);
	  JobConf jobConf = new JobConf(conf, LogFetcher.class);
	  jobConf.setJobName("apache log fetcher");
	  jobConf.set(TABLE, table);
	  Path[] in = fs.listPaths(InputDir);
	  if (fs.isFile(InputDir)) {
	      jobConf.setInputPath(InputDir);
	  } else {
	      for (int i = 0; i < in.length; i++) {
	        if (fs.isFile(in[i])) {
	          jobConf.addInputPath(in[i]);
	        } else {
	          Path[] sub = fs.listPaths(in[i]);
	          for (int j = 0; j < sub.length; j++) {
	            if (fs.isFile(sub[j])) {
	              jobConf.addInputPath(sub[j]);
	            }
	          }
	        }
	      }
	    }
	    jobConf.setOutputPath(tempDir);
	    jobConf.setMapperClass(MapClass.class);
 
	    JobClient client = new JobClient(jobConf);
	    ClusterStatus cluster = client.getClusterStatus();
	    jobConf.setNumMapTasks(cluster.getMapTasks());
	    jobConf.setNumReduceTasks(0);
 
	    JobClient.runJob(jobConf);
	    fs.delete(tempDir);
	    fs.close();
  }