Changes between Initial Version and Version 1 of NCHCCloudCourse100929_4_HBEX6


Ignore:
Timestamp:
Sep 27, 2010, 6:27:01 PM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • NCHCCloudCourse100929_4_HBEX6

    v1 v1  
     1{{{
     2#!html
     3<div style="text-align: center; color:#151B8D"><big style="font-weight: bold;"><big><big>
     4HBase 進階課程
     5</big></big></big></div> <div style="text-align: center; color:#7E2217"><big style="font-weight: bold;"><big>
     6程式範例練習
     7</big></big></div>
     8}}}
     9
     10[wiki:NCHCCloudCourse100929_4_HBEX5 上一關 < ] 第六關 [wiki:NCHCCloudCourse100929_4_HBEX7 > 下一關]
     11
     12 = 範例六:WordCountHBase =
     13
     14 == 說明:  ==
     15        此程式碼將輸入路徑的檔案內的字串取出做字數統計,再將結果塞回HTable內
     16
     17 == 結果: ==
     18{{{
     19$ hbase shell
     20        > scan 'wordcount'
     21        ROW     COLUMN+CELL
     22        am      column=content:count, timestamp=1264406245488, value=1
     23        chen    column=content:count, timestamp=1264406245488, value=1
     24        hi,     column=content:count, timestamp=1264406245488, value=2
     25  ......(略)
     26}}}
     27 == 注意: ==
     28
     291.      在hdfs 上來源檔案的路徑為 "/user/$YOUR_NAME/input"
     30
     31        請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
     32
     332.      運算完後,程式將執行結果放在hbase的wordcount資料表內
     34
     35
     36 == 程式碼 ==
     37
     38{{{
     39#!java
     40package org.nchc.hbase;
     41
     42import java.io.IOException;
     43
     44import org.apache.hadoop.conf.Configuration;
     45import org.apache.hadoop.fs.Path;
     46import org.apache.hadoop.hbase.client.Put;
     47import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
     48import org.apache.hadoop.hbase.mapreduce.TableReducer;
     49import org.apache.hadoop.hbase.util.Bytes;
     50import org.apache.hadoop.io.IntWritable;
     51import org.apache.hadoop.io.LongWritable;
     52import org.apache.hadoop.io.Text;
     53import org.apache.hadoop.mapreduce.Job;
     54import org.apache.hadoop.mapreduce.Mapper;
     55import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     56import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     57
     58public class CountToHBaseReducer {
     59        public static class HtMap extends
     60                        Mapper<LongWritable, Text, Text, IntWritable> {
     61                private IntWritable one = new IntWritable(1);
     62
     63                public void map(LongWritable key, Text value, Context context)
     64                                throws IOException, InterruptedException {
     65                        // 輸入的字串先轉換成小寫再用空白區隔
     66                        String s[] = value.toString().toLowerCase().trim().split(" ");
     67                        for (String m : s) {
     68                                // 寫入到輸出串流
     69                                context.write(new Text(m), one);
     70                        }
     71                }
     72        }
     73
     74        // TableReducer<KEYIN,VALUEIN,KEYOUT>
     75        // 原本為 TableReducer<Text, IntWritable, NullWritable >
     76        // 但在此改成 LongWritable 也可以
     77        // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
     78        public static class HtReduce extends
     79                        TableReducer<Text, IntWritable, LongWritable> {
     80
     81                public void reduce(Text key, Iterable<IntWritable> values,
     82                                Context context) throws IOException, InterruptedException {
     83                        int sum = 0;
     84                        for (IntWritable i : values) {
     85                                sum += i.get();
     86                        }
     87
     88                        // org.apache.hadoop.hbase.client.Put
     89                        // Used to perform Put operations for a single row.
     90                        // new Put(byte[] row)
     91                        Put put = new Put(Bytes.toBytes(key.toString()));
     92
     93                        // add(byte[] family, byte[] qualifier, byte[] value)
     94                        // 在main設定output format class 為 TableOutputFormat
     95                        // TableReducer 內有定義 output Key class 必須為 Put 或 Delete
     96                        put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes
     97                                        .toBytes(String.valueOf(sum)));
     98
     99                        // NullWritable.get(): Returns the single instance of this class.
     100                        // NullWritable.write(): Serialize the fields of this object to out.
     101                        context.write(new LongWritable(), put);
     102                        // context.write(NullWritable.get(), put)
     103                }
     104        }
     105
     106        public static void main(String args[]) throws Exception {
     107                // debug
     108                String[] argv = { "/user/waue/input" };
     109                args = argv;
     110                String input = args[0];
     111
     112                String tablename = "wordcount";
     113                String family = "content";
     114
     115                Configuration conf = new Configuration();
     116                // OUTPUT_TABLE = "hbase.mapred.outputtable"
     117                // conf.set 用於設定 如 core-site.xml 的 name 與 value
     118                // 告訴程式 hbase.mapred.outputtable --> wordcount
     119                conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
     120                // 建立hbase 的table 否則沒先建立會出錯
     121                CreateTable.createHBaseTable(tablename, family);
     122
     123                Job job = new Job(conf, "WordCount table with " + input);
     124
     125                job.setJarByClass(CountToHBaseReducer.class);
     126
     127                job.setMapperClass(HtMap.class);
     128                job.setReducerClass(HtReduce.class);
     129                // 此範例的輸出為 <Text,IntWritable> 因此其實可以省略宣告
     130                // set{Map|Reduce}Output{Key|Value}Class()
     131                job.setMapOutputKeyClass(Text.class);
     132                job.setMapOutputValueClass(IntWritable.class);
     133                // InputFormat 只有一個子介面
     134                // FileInputFormat <-(SequenceFileInputFormat,TextInputFormat)
     135                // 其中TextInputFormat 最常用 ,預設輸入為 LongWritable,Text
     136                // 另外HBase 則設計了一個子類別 TableInputFormat
     137                job.setInputFormatClass(TextInputFormat.class);
     138                // TAbleOutputFormat
     139                // 宣告此行則可使 reduce 輸出為 HBase 的table
     140                job.setOutputFormatClass(TableOutputFormat.class);
     141
     142                // 原本設定輸入檔案為 Config.setInputPath(Path) 卻改為
     143                // FileInputFormat.addInputPath(Job, Path()) 的設計,
     144                // 猜測應該是考慮某些檔案操作並不需要跑mapreduce的Job,因此提到外面
     145                FileInputFormat.addInputPath(job, new Path(input));
     146
     147                System.exit(job.waitForCompletion(true) ? 0 : 1);
     148        }
     149}
     150}}}
     151
     152 == 參考: ==
     153
     1541.程式碼改編於: http://blog.ring.idv.tw/comment.ser?i=337
     155
     1562.hbase 運作 mapreduce 程式的方法參考於:http://wiki.apache.org/hadoop/Hbase/MapReduce
     157