Changes between Version 1 and Version 2 of waue/2009/0724


Ignore:
Timestamp:
Jul 24, 2009, 3:28:06 PM (15 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2009/0724

    v1 v2  
    1 {{{
     1[http://hadoop.apache.org/hbase/docs/r0.19.3/api/index.html hbase 的用法]
    22
    3  public static void creatTable(String table) throws IOException{
    4             HConnection conn = HConnectionManager.getConnection(conf);
    5             HBaseAdmin admin = new HBaseAdmin(conf);
    6             if(!admin.tableExists(new Text(table))){
    7               System.out.println("1. " + table + " table creating ... please wait");
    8               HTableDescriptor tableDesc = new HTableDescriptor(table);
    9               tableDesc.addFamily(new HColumnDescriptor("http:"));
    10               tableDesc.addFamily(new HColumnDescriptor("url:"));
    11               tableDesc.addFamily(new HColumnDescriptor("referrer:"));
    12               admin.createTable(tableDesc);
    13             } else {
    14               System.out.println("1. " + table + " table already exists.");
    15             }
    16             System.out.println("2. access_log files fetching using map/reduce");
    17   }
    18 }}}
    19 
    20 
    21 {{{
    22  public static class MapClass extends MapReduceBase implements
    23       Mapper<WritableComparable, Text, Text, Writable> {
    24  
    25     @Override
    26     public void configure(JobConf job) {
    27       tableName = job.get(TABLE, "");
    28     }
    29  
    30     public void map(WritableComparable key, Text value,
    31         OutputCollector<Text, Writable> output, Reporter reporter)
    32         throws IOException {
    33       try {
    34          AccessLogParser log = new AccessLogParser(value.toString());
    35         if(table==null)
    36                 table = new HTable(conf, new Text(tableName));
    37         long lockId = table.startUpdate(new Text(log.getIp()));
    38         table.put(lockId, new Text("http:protocol"), log.getProtocol().getBytes());
    39         table.put(lockId, new Text("http:method"), log.getMethod().getBytes());
    40         table.put(lockId, new Text("http:code"), log.getCode().getBytes());
    41         table.put(lockId, new Text("http:bytesize"), log.getByteSize().getBytes());
    42         table.put(lockId, new Text("http:agent"), log.getAgent().getBytes());
    43         table.put(lockId, new Text("url:" + log.getUrl()), log.getReferrer().getBytes());
    44         table.put(lockId, new Text("referrer:" + log.getReferrer()), log.getUrl().getBytes());
    45  
    46         table.commit(lockId, log.getTimestamp());
    47       } catch (ParseException e) {
    48         e.printStackTrace();
    49       } catch (Exception e) {
    50         e.printStackTrace();
    51       }
    52     }
    53   }
    54 
    55 }}}
    56 
    57 {{{
    58  public static void runMapReduce(String table,String dir) throws IOException{
    59           Path tempDir = new Path("log/temp");
    60           Path InputDir = new Path(dir);
    61           FileSystem fs = FileSystem.get(conf);
    62           JobConf jobConf = new JobConf(conf, LogFetcher.class);
    63           jobConf.setJobName("apache log fetcher");
    64           jobConf.set(TABLE, table);
    65           Path[] in = fs.listPaths(InputDir);
    66           if (fs.isFile(InputDir)) {
    67               jobConf.setInputPath(InputDir);
    68           } else {
    69               for (int i = 0; i < in.length; i++) {
    70                 if (fs.isFile(in[i])) {
    71                   jobConf.addInputPath(in[i]);
    72                 } else {
    73                   Path[] sub = fs.listPaths(in[i]);
    74                   for (int j = 0; j < sub.length; j++) {
    75                     if (fs.isFile(sub[j])) {
    76                       jobConf.addInputPath(sub[j]);
    77                     }
    78                   }
    79                 }
    80               }
    81             }
    82             jobConf.setOutputPath(tempDir);
    83             jobConf.setMapperClass(MapClass.class);
    84  
    85             JobClient client = new JobClient(jobConf);
    86             ClusterStatus cluster = client.getClusterStatus();
    87             jobConf.setNumMapTasks(cluster.getMapTasks());
    88             jobConf.setNumReduceTasks(0);
    89  
    90             JobClient.runJob(jobConf);
    91             fs.delete(tempDir);
    92             fs.close();
    93   }
    94 
    95 }}}
     3[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/mapred/package-summary.html#examples hbase + mapreduce 用法]