Changes between Version 1 and Version 2 of NCHCCloudCourse100929_4_HBEX7


Ignore:
Timestamp:
Sep 28, 2010, 8:17:16 PM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • NCHCCloudCourse100929_4_HBEX7

    v1 v2  
    3333{{{
    3434#!java
     35
    3536package org.nchc.hbase;
    36 
    37 // LoadHBaseMapper
    38 //說明:
    39 //      此程式碼將HBase的資料取出來,再將結果塞回hdfs上
    40 //
    41 //運算方法:
    42 //      將此程式運作在hadoop 0.20 平台上,用(參考2)的方法加入hbase參數後,將此程式碼打包成XX.jar
    43 //  執行:
    44 //      ---------------------------
    45 //      hadoop jar XX.jar LoadHBaseMapper <hdfs_output>
    46 //      ---------------------------
    47 //
    48 //結果:
    49 // $ hadoop fs -cat <hdfs_output>/part-r-00000
    50 // ---------------------------
    51 //      54 30 31        GunLong
    52 //      54 30 32        Esing
    53 //      54 30 33        SunDon
    54 //      54 30 34        StarBucks
    55 // ---------------------------
    56 //注意:
    57 //1.    請注意hbase 上必須要有 table, 並且已經有資料
    58 //2.    運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內
    59 //              請注意 沒有 <hdfs_output> 資料夾
    6037
    6138import java.io.IOException;
     
    7754
    7855public class LoadHBaseMapper {
    79         public static class HtMap extends TableMapper<Text, Text> {
     56  public static class HtMap extends TableMapper<Text, Text> {
    8057
    81                 public void map(ImmutableBytesWritable key, Result value,
    82                                 Context context) throws IOException, InterruptedException {
     58    public void map(ImmutableBytesWritable key, Result value,
     59        Context context) throws IOException, InterruptedException {
     60//      content:count
     61      String res = Bytes.toString(value.getValue(Bytes.toBytes("content"),
     62          Bytes.toBytes("count")));
    8363
    84                         String res = Bytes.toString(value.getValue(Bytes.toBytes("Detail"),
    85                                         Bytes.toBytes("Name")));
     64      context.write(new Text(key.toString()), new Text(res));
    8665
    87                         context.write(new Text(key.toString()), new Text(res));
     66    }
     67  }
    8868
    89                 }
    90         }
     69  // TableReducer<KEYIN,VALUEIN,KEYOUT>
     70  // 原本為 TableReducer<Text, IntWritable, NullWritable >
     71  // 但在此改成 LongWritable 也可以
     72  // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
     73  public static class HtReduce extends Reducer<Text, Text, Text, Text> {
    9174
    92         // TableReducer<KEYIN,VALUEIN,KEYOUT>
    93         // 原本為 TableReducer<Text, IntWritable, NullWritable >
    94         // 但在此改成 LongWritable 也可以
    95         // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
    96         public static class HtReduce extends Reducer<Text, Text, Text, Text> {
     75    public void reduce(Text key, Iterable<Text> values, Context context)
     76        throws IOException, InterruptedException {
    9777
    98                 public void reduce(Text key, Iterable<Text> values, Context context)
    99                                 throws IOException, InterruptedException {
     78      String str = new String("");
     79      Text final_key = new Text(key);
     80      Text final_value = new Text();
     81      // 將key值相同的values,透過 && 符號分隔之
     82      for (Text tmp : values) {
     83        str += tmp.toString();
     84      }
     85      final_value.set(str);
     86      context.write(final_key, final_value);
     87    }
     88  }
    10089
    101                         String str = new String("");
    102                         Text final_key = new Text(key);
    103                         Text final_value = new Text();
    104                         // 將key值相同的values,透過 && 符號分隔之
    105                         for (Text tmp : values) {
    106                                 str += tmp.toString();
    107                         }
    108                         final_value.set(str);
    109                         context.write(final_key, final_value);
    110                 }
    111         }
     90  public static void main(String args[]) throws Exception {
     91    // debug
     92//    String[] argv = { "wordcount", "output-lhm" };
     93//    args = argv;
    11294
    113         public static void main(String args[]) throws Exception {
    114                 // debug
    115                 String[] argv = { "output-lhm" };
    116                 args = argv;
    117                 String input = args[0];
     95    String tablename = args[0] ;
    11896
    119                 String tablename = "tsmc";
     97    Configuration conf = new Configuration();
    12098
    121                 Configuration conf = new Configuration();
     99    Job job = new Job(conf, tablename + " hbase data to hdfs");
     100    job.setJarByClass(LoadHBaseMapper.class);
    122101
    123                 Job job = new Job(conf, tablename + " hbase data to hdfs");
    124                 job.setJarByClass(LoadHBaseMapper.class);
     102    // // 設定哪張hbase的table為輸入
     103    conf.set(TableInputFormat.INPUT_TABLE, tablename);
     104    Scan myScan = new Scan("".getBytes(), "12".getBytes());
     105    myScan.addColumn("content:count".getBytes());
    125106
    126                 // // 設定哪張hbase的table為輸入
    127                 conf.set(TableInputFormat.INPUT_TABLE, tablename);
    128                 Scan myScan = new Scan("".getBytes(), "12".getBytes());
    129                 myScan.addColumn("Detail:Name".getBytes());
     107    // 先用 TableMapReduceUtil 的 initTableMapperJob 與 initTableReducerJob 來做設定
     108    TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class,
     109        Text.class, Text.class, job);
    130110
    131                 // 先用 TableMapReduceUtil 的 initTableMapperJob 與 initTableReducerJob 來做設定
    132                 TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class,
    133                                 Text.class, Text.class, job);
     111    job.setMapperClass(HtMap.class);
     112    job.setReducerClass(HtReduce.class);
    134113
    135                 job.setMapperClass(HtMap.class);
    136                 job.setReducerClass(HtReduce.class);
     114    job.setMapOutputKeyClass(Text.class);
     115    job.setMapOutputValueClass(Text.class);
    137116
    138                 job.setMapOutputKeyClass(Text.class);
    139                 job.setMapOutputValueClass(Text.class);
     117    job.setInputFormatClass(TableInputFormat.class);
     118    job.setOutputFormatClass(TextOutputFormat.class);
    140119
    141                 job.setInputFormatClass(TableInputFormat.class);
    142                 job.setOutputFormatClass(TextOutputFormat.class);
     120    job.setOutputKeyClass(Text.class);
     121    job.setOutputValueClass(Text.class);
    143122
    144                 job.setOutputKeyClass(Text.class);
    145                 job.setOutputValueClass(Text.class);
     123    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    146124
    147                 FileOutputFormat.setOutputPath(job, new Path(input));
     125    System.exit(job.waitForCompletion(true) ? 0 : 1);
     126  }
     127}
    148128
    149                 System.exit(job.waitForCompletion(true) ? 0 : 1);
    150         }
    151 }
    152129}}}
    153130