Changes between Version 1 and Version 2 of waue/2010/0204-06


Ignore:
Timestamp:
Feb 3, 2010, 9:53:49 PM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0204-06

    v1 v2  
    77 == 結果: ==
    88{{{
    9         $ hbase shell
     9$ hbase shell
    1010        > scan 'wordcount'
    1111        ROW             COLUMN+CELL
    1212        am              column=content:count, timestamp=1264406245488, value=1
    13   chen  column=content:count, timestamp=1264406245488, value=1
     13        chen    column=content:count, timestamp=1264406245488, value=1
    1414        hi,             column=content:count, timestamp=1264406245488, value=2
    1515  ......(略)
     
    3131{{{
    3232#!java
     33package tsmc;
    3334
     35import java.io.IOException;
     36
     37import org.apache.hadoop.conf.Configuration;
     38import org.apache.hadoop.fs.Path;
     39import org.apache.hadoop.hbase.client.Put;
     40import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
     41import org.apache.hadoop.hbase.mapreduce.TableReducer;
     42import org.apache.hadoop.hbase.util.Bytes;
     43import org.apache.hadoop.io.IntWritable;
     44import org.apache.hadoop.io.LongWritable;
     45import org.apache.hadoop.io.Text;
     46import org.apache.hadoop.mapreduce.Job;
     47import org.apache.hadoop.mapreduce.Mapper;
     48import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     49import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     50
     51public class CountToHBaseReducer {
     52        public static class HtMap extends
     53                        Mapper<LongWritable, Text, Text, IntWritable> {
     54                private IntWritable one = new IntWritable(1);
     55
     56                public void map(LongWritable key, Text value, Context context)
     57                                throws IOException, InterruptedException {
     58                        // 輸入的字串先轉換成小寫再用空白區隔
     59                        String s[] = value.toString().toLowerCase().trim().split(" ");
     60                        for (String m : s) {
     61                                // 寫入到輸出串流
     62                                context.write(new Text(m), one);
     63                        }
     64                }
     65        }
     66
     67        // TableReducer<KEYIN,VALUEIN,KEYOUT>
     68        // 原本為 TableReducer<Text, IntWritable, NullWritable >
     69        // 但在此改成 LongWritable 也可以
     70        // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
     71        public static class HtReduce extends
     72                        TableReducer<Text, IntWritable, LongWritable> {
     73
     74                public void reduce(Text key, Iterable<IntWritable> values,
     75                                Context context) throws IOException, InterruptedException {
     76                        int sum = 0;
     77                        for (IntWritable i : values) {
     78                                sum += i.get();
     79                        }
     80
     81                        // org.apache.hadoop.hbase.client.Put
     82                        // Used to perform Put operations for a single row.
     83                        // new Put(byte[] row)
     84                        Put put = new Put(Bytes.toBytes(key.toString()));
     85
     86                        // add(byte[] family, byte[] qualifier, byte[] value)
     87                        // 在main設定output format class 為 TableOutputFormat
     88                        // TableReducer 內有定義 output Key class 必須為 Put 或 Delete
     89                        put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes
     90                                        .toBytes(String.valueOf(sum)));
     91
     92                        // NullWritable.get(): Returns the single instance of this class.
     93                        // NullWritable.write(): Serialize the fields of this object to out.
     94                        context.write(new LongWritable(), put);
     95                        // context.write(NullWritable.get(), put)
     96                }
     97        }
     98
     99        public static void main(String args[]) throws Exception {
     100                // debug
     101                String[] argv = { "/user/waue/input" };
     102                args = argv;
     103                String input = args[0];
     104
     105                String tablename = "wordcount";
     106                String family = "content";
     107
     108                Configuration conf = new Configuration();
     109                // OUTPUT_TABLE = "hbase.mapred.outputtable"
     110                // conf.set 用於設定 如 core-site.xml 的 name 與 value
     111                // 告訴程式 hbase.mapred.outputtable --> wordcount
     112                conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
     113                // 建立hbase 的table 否則沒先建立會出錯
     114                CreateTable.createHBaseTable(tablename, family);
     115
     116                Job job = new Job(conf, "WordCount table with " + input);
     117
     118                job.setJarByClass(CountToHBaseReducer.class);
     119
     120                job.setMapperClass(HtMap.class);
     121                job.setReducerClass(HtReduce.class);
     122                // 此範例的輸出為 <Text,IntWritable> 因此其實可以省略宣告
     123                // set{Map|Reduce}Output{Key|Value}Class()
     124                job.setMapOutputKeyClass(Text.class);
     125                job.setMapOutputValueClass(IntWritable.class);
     126                // InputFormat 只有一個子介面
     127                // FileInputFormat <-(SequenceFileInputFormat,TextInputFormat)
     128                // 其中TextInputFormat 最常用 ,預設輸入為 LongWritable,Text
     129                // 另外HBase 則設計了一個子類別 TableInputFormat
     130                job.setInputFormatClass(TextInputFormat.class);
     131                // TAbleOutputFormat
     132                // 宣告此行則可使 reduce 輸出為 HBase 的table
     133                job.setOutputFormatClass(TableOutputFormat.class);
     134
     135                // 原本設定輸入檔案為 Config.setInputPath(Path) 卻改為
     136                // FileInputFormat.addInputPath(Job, Path()) 的設計,
     137                // 猜測應該是考慮某些檔案操作並不需要跑mapreduce的Job,因此提到外面
     138                FileInputFormat.addInputPath(job, new Path(input));
     139
     140                System.exit(job.waitForCompletion(true) ? 0 : 1);
     141        }
     142}
    34143}}}