36 | | |
37 | | // LoadHBaseMapper |
38 | | //說明: |
39 | | // 此程式碼將HBase的資料取出來,再將結果塞回hdfs上 |
40 | | // |
41 | | //運算方法: |
42 | | // 將此程式運作在hadoop 0.20 平台上,用(參考2)的方法加入hbase參數後,將此程式碼打包成XX.jar |
43 | | // 執行: |
44 | | // --------------------------- |
45 | | // hadoop jar XX.jar LoadHBaseMapper <hdfs_output> |
46 | | // --------------------------- |
47 | | // |
48 | | //結果: |
49 | | // $ hadoop fs -cat <hdfs_output>/part-r-00000 |
50 | | // --------------------------- |
51 | | // 54 30 31 GunLong |
52 | | // 54 30 32 Esing |
53 | | // 54 30 33 SunDon |
54 | | // 54 30 34 StarBucks |
55 | | // --------------------------- |
56 | | //注意: |
57 | | //1. 請注意hbase 上必須要有 table, 並且已經有資料 |
58 | | //2. 運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內 |
59 | | // 請注意 沒有 <hdfs_output> 資料夾 |
81 | | public void map(ImmutableBytesWritable key, Result value, |
82 | | Context context) throws IOException, InterruptedException { |
| 58 | public void map(ImmutableBytesWritable key, Result value, |
| 59 | Context context) throws IOException, InterruptedException { |
| 60 | // content:count |
| 61 | String res = Bytes.toString(value.getValue(Bytes.toBytes("content"), |
| 62 | Bytes.toBytes("count"))); |
89 | | } |
90 | | } |
| 69 | // TableReducer<KEYIN,VALUEIN,KEYOUT> |
| 70 | // 原本為 TableReducer<Text, IntWritable, NullWritable > |
| 71 | // 但在此改成 LongWritable 也可以 |
| 72 | // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可 |
| 73 | public static class HtReduce extends Reducer<Text, Text, Text, Text> { |
92 | | // TableReducer<KEYIN,VALUEIN,KEYOUT> |
93 | | // 原本為 TableReducer<Text, IntWritable, NullWritable > |
94 | | // 但在此改成 LongWritable 也可以 |
95 | | // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可 |
96 | | public static class HtReduce extends Reducer<Text, Text, Text, Text> { |
| 75 | public void reduce(Text key, Iterable<Text> values, Context context) |
| 76 | throws IOException, InterruptedException { |
98 | | public void reduce(Text key, Iterable<Text> values, Context context) |
99 | | throws IOException, InterruptedException { |
| 78 | String str = new String(""); |
| 79 | Text final_key = new Text(key); |
| 80 | Text final_value = new Text(); |
| 81 | // 將key值相同的values,透過 && 符號分隔之 |
| 82 | for (Text tmp : values) { |
| 83 | str += tmp.toString(); |
| 84 | } |
| 85 | final_value.set(str); |
| 86 | context.write(final_key, final_value); |
| 87 | } |
| 88 | } |
101 | | String str = new String(""); |
102 | | Text final_key = new Text(key); |
103 | | Text final_value = new Text(); |
104 | | // 將key值相同的values,透過 && 符號分隔之 |
105 | | for (Text tmp : values) { |
106 | | str += tmp.toString(); |
107 | | } |
108 | | final_value.set(str); |
109 | | context.write(final_key, final_value); |
110 | | } |
111 | | } |
| 90 | public static void main(String args[]) throws Exception { |
| 91 | // debug |
| 92 | // String[] argv = { "wordcount", "output-lhm" }; |
| 93 | // args = argv; |
126 | | // // 設定哪張hbase的table為輸入 |
127 | | conf.set(TableInputFormat.INPUT_TABLE, tablename); |
128 | | Scan myScan = new Scan("".getBytes(), "12".getBytes()); |
129 | | myScan.addColumn("Detail:Name".getBytes()); |
| 107 | // 先用 TableMapReduceUtil 的 initTableMapperJob 與 initTableReducerJob 來做設定 |
| 108 | TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class, |
| 109 | Text.class, Text.class, job); |