Changes between Initial Version and Version 1 of waue/2009/0724


Ignore:
Timestamp:
Jul 24, 2009, 10:37:26 AM (15 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2009/0724

    v1 v1  
     1{{{
     2
     3 public static void creatTable(String table) throws IOException{
     4            HConnection conn = HConnectionManager.getConnection(conf);
     5            HBaseAdmin admin = new HBaseAdmin(conf);
     6            if(!admin.tableExists(new Text(table))){
     7              System.out.println("1. " + table + " table creating ... please wait");
     8              HTableDescriptor tableDesc = new HTableDescriptor(table);
     9              tableDesc.addFamily(new HColumnDescriptor("http:"));
     10              tableDesc.addFamily(new HColumnDescriptor("url:"));
     11              tableDesc.addFamily(new HColumnDescriptor("referrer:"));
     12              admin.createTable(tableDesc);
     13            } else {
     14              System.out.println("1. " + table + " table already exists.");
     15            }
     16            System.out.println("2. access_log files fetching using map/reduce");
     17  }
     18}}}
     19
     20
     21{{{
     22 public static class MapClass extends MapReduceBase implements
     23      Mapper<WritableComparable, Text, Text, Writable> {
     24 
     25    @Override
     26    public void configure(JobConf job) {
     27      tableName = job.get(TABLE, "");
     28    }
     29 
     30    public void map(WritableComparable key, Text value,
     31        OutputCollector<Text, Writable> output, Reporter reporter)
     32        throws IOException {
     33      try {
     34         AccessLogParser log = new AccessLogParser(value.toString());
     35        if(table==null)
     36                table = new HTable(conf, new Text(tableName));
     37        long lockId = table.startUpdate(new Text(log.getIp()));
     38        table.put(lockId, new Text("http:protocol"), log.getProtocol().getBytes());
     39        table.put(lockId, new Text("http:method"), log.getMethod().getBytes());
     40        table.put(lockId, new Text("http:code"), log.getCode().getBytes());
     41        table.put(lockId, new Text("http:bytesize"), log.getByteSize().getBytes());
     42        table.put(lockId, new Text("http:agent"), log.getAgent().getBytes());
     43        table.put(lockId, new Text("url:" + log.getUrl()), log.getReferrer().getBytes());
     44        table.put(lockId, new Text("referrer:" + log.getReferrer()), log.getUrl().getBytes());
     45 
     46        table.commit(lockId, log.getTimestamp());
     47      } catch (ParseException e) {
     48        e.printStackTrace();
     49      } catch (Exception e) {
     50        e.printStackTrace();
     51      }
     52    }
     53  }
     54
     55}}}
     56
     57{{{
     58 public static void runMapReduce(String table,String dir) throws IOException{
     59          Path tempDir = new Path("log/temp");
     60          Path InputDir = new Path(dir);
     61          FileSystem fs = FileSystem.get(conf);
     62          JobConf jobConf = new JobConf(conf, LogFetcher.class);
     63          jobConf.setJobName("apache log fetcher");
     64          jobConf.set(TABLE, table);
     65          Path[] in = fs.listPaths(InputDir);
     66          if (fs.isFile(InputDir)) {
     67              jobConf.setInputPath(InputDir);
     68          } else {
     69              for (int i = 0; i < in.length; i++) {
     70                if (fs.isFile(in[i])) {
     71                  jobConf.addInputPath(in[i]);
     72                } else {
     73                  Path[] sub = fs.listPaths(in[i]);
     74                  for (int j = 0; j < sub.length; j++) {
     75                    if (fs.isFile(sub[j])) {
     76                      jobConf.addInputPath(sub[j]);
     77                    }
     78                  }
     79                }
     80              }
     81            }
     82            jobConf.setOutputPath(tempDir);
     83            jobConf.setMapperClass(MapClass.class);
     84 
     85            JobClient client = new JobClient(jobConf);
     86            ClusterStatus cluster = client.getClusterStatus();
     87            jobConf.setNumMapTasks(cluster.getMapTasks());
     88            jobConf.setNumReduceTasks(0);
     89 
     90            JobClient.runJob(jobConf);
     91            fs.delete(tempDir);
     92            fs.close();
     93  }
     94
     95}}}