| | 1 | {{{ |
| | 2 | #!html |
| | 3 | <div style="text-align: center; color:#151B8D"><big style="font-weight: bold;"><big><big> |
| | 4 | ITRI HBase 進階課程 |
| | 5 | </big></big></big></div> <div style="text-align: center; color:#7E2217"><big style="font-weight: bold;"><big> |
| | 6 | HBase 範例 |
| | 7 | </big></big></div> |
| | 8 | }}} |
| | 9 | |
| | 10 | [wiki:waue/2011/0426_4_6 上一關 < ] 最後一關 [wiki:waue/2011/0426 >> 回課程大綱] |
| | 11 | |
| | 12 | = 範例七: LoadHBaseMapper = |
| | 13 | |
| | 14 | == 說明: == |
| | 15 | 此程式碼將HBase的資料取出來,再將結果塞回hdfs上 |
| | 16 | {{{ |
| | 17 | $ bin/hadoop jar ItriMenu.jar LoadHBaseMapper output |
| | 18 | }}} |
| | 19 | |
| | 20 | == 注意: == |
| | 21 | 1. 請注意之前已經run 過 範例六 |
| | 22 | |
| | 23 | 2. 運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內 |
| | 24 | 請注意 沒有 <hdfs_output> 資料夾 |
| | 25 | |
| | 26 | |
| | 27 | {{{ |
| | 28 | #!java |
| | 29 | |
| | 30 | package itri; |
| | 31 | |
| | 32 | // LoadHBaseMapper |
| | 33 | //說明: |
| | 34 | // 此程式碼將HBase的資料取出來,再將結果塞回hdfs上 |
| | 35 | // |
| | 36 | //運算方法: |
| | 37 | // 將此程式運作在hadoop 0.20 平台上,用(參考2)的方法加入hbase參數後,將此程式碼打包成XX.jar |
| | 38 | // 執行: |
| | 39 | // --------------------------- |
| | 40 | // hadoop jar XX.jar LoadHBaseMapper <hdfs_output> |
| | 41 | // --------------------------- |
| | 42 | // |
| | 43 | //結果: |
| | 44 | // $ hadoop fs -cat <hdfs_output>/part-r-00000 |
| | 45 | // --------------------------- |
| | 46 | // 54 30 31 GunLong |
| | 47 | // 54 30 32 Esing |
| | 48 | // 54 30 33 SunDon |
| | 49 | // 54 30 34 StarBucks |
| | 50 | // --------------------------- |
| | 51 | //注意: |
| | 52 | //1. 請注意hbase 上必須要有 table, 並且已經有資料 |
| | 53 | //2. 運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內 |
| | 54 | // 請注意 沒有 <hdfs_output> 資料夾 |
| | 55 | |
| | 56 | import java.io.IOException; |
| | 57 | |
| | 58 | import org.apache.hadoop.conf.Configuration; |
| | 59 | import org.apache.hadoop.fs.Path; |
| | 60 | import org.apache.hadoop.hbase.client.Result; |
| | 61 | import org.apache.hadoop.hbase.client.Scan; |
| | 62 | import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| | 63 | import org.apache.hadoop.hbase.mapreduce.TableInputFormat; |
| | 64 | import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; |
| | 65 | import org.apache.hadoop.hbase.mapreduce.TableMapper; |
| | 66 | import org.apache.hadoop.hbase.util.Bytes; |
| | 67 | import org.apache.hadoop.io.Text; |
| | 68 | import org.apache.hadoop.mapreduce.Job; |
| | 69 | import org.apache.hadoop.mapreduce.Reducer; |
| | 70 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| | 71 | import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; |
| | 72 | import org.apache.hadoop.util.GenericOptionsParser; |
| | 73 | |
| | 74 | public class LoadHBaseMapper { |
| | 75 | public static class HtMap extends TableMapper<Text, Text> { |
| | 76 | |
| | 77 | public void map(ImmutableBytesWritable key, Result value, |
| | 78 | Context context) throws IOException, InterruptedException { |
| | 79 | |
| | 80 | String word = Bytes.toString(value.getValue(Bytes.toBytes("content"), |
| | 81 | Bytes.toBytes("word"))); |
| | 82 | String count = Bytes.toString(value.getValue(Bytes.toBytes("content"), |
| | 83 | Bytes.toBytes("count"))); |
| | 84 | |
| | 85 | context.write(new Text(key.toString()), new Text(word+"="+count)); |
| | 86 | |
| | 87 | } |
| | 88 | } |
| | 89 | |
| | 90 | // TableReducer<KEYIN,VALUEIN,KEYOUT> |
| | 91 | // 原本為 TableReducer<Text, IntWritable, NullWritable > |
| | 92 | // 但在此改成 LongWritable 也可以 |
| | 93 | // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可 |
| | 94 | public static class HtReduce extends Reducer<Text, Text, Text, Text> { |
| | 95 | |
| | 96 | public void reduce(Text key, Iterable<Text> values, Context context) |
| | 97 | throws IOException, InterruptedException { |
| | 98 | |
| | 99 | String str = new String(""); |
| | 100 | Text final_key = new Text(key); |
| | 101 | Text final_value = new Text(); |
| | 102 | // 將key值相同的values,透過 && 符號分隔之 |
| | 103 | for (Text tmp : values) { |
| | 104 | str += tmp.toString(); |
| | 105 | } |
| | 106 | final_value.set(str); |
| | 107 | context.write(final_key, final_value); |
| | 108 | } |
| | 109 | } |
| | 110 | |
| | 111 | public static void main(String argv[]) throws Exception { |
| | 112 | // String[] argc = {"output-0426"}; argv=argc; |
| | 113 | Configuration conf = new Configuration(); |
| | 114 | String[] otherArgs = new GenericOptionsParser(conf, argv) |
| | 115 | .getRemainingArgs(); |
| | 116 | if (otherArgs.length < 1) { |
| | 117 | System.out.println("LoadHBaseMapper <hdfsOutDir>"); |
| | 118 | return; |
| | 119 | } |
| | 120 | String tablename = "wordcounthbase"; |
| | 121 | String output = otherArgs[0]; |
| | 122 | |
| | 123 | Job job = new Job(conf, tablename + " hbase data to hdfs"); |
| | 124 | job.setJarByClass(LoadHBaseMapper.class); |
| | 125 | |
| | 126 | // // 設定哪張hbase的table為輸入 |
| | 127 | conf.set(TableInputFormat.INPUT_TABLE, tablename); |
| | 128 | Scan myScan = new Scan("".getBytes(), "12".getBytes()); |
| | 129 | myScan.addColumn("content:count".getBytes()); |
| | 130 | myScan.addColumn("content:word".getBytes()); |
| | 131 | // 先用 TableMapReduceUtil 的 initTableMapperJob 與 initTableReducerJob 來做設定 |
| | 132 | TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class, |
| | 133 | Text.class, Text.class, job); |
| | 134 | |
| | 135 | job.setMapperClass(HtMap.class); |
| | 136 | job.setReducerClass(HtReduce.class); |
| | 137 | |
| | 138 | job.setMapOutputKeyClass(Text.class); |
| | 139 | job.setMapOutputValueClass(Text.class); |
| | 140 | |
| | 141 | job.setInputFormatClass(TableInputFormat.class); |
| | 142 | job.setOutputFormatClass(TextOutputFormat.class); |
| | 143 | |
| | 144 | job.setOutputKeyClass(Text.class); |
| | 145 | job.setOutputValueClass(Text.class); |
| | 146 | |
| | 147 | FileOutputFormat.setOutputPath(job, new Path(output)); |
| | 148 | |
| | 149 | System.exit(job.waitForCompletion(true) ? 0 : 1); |
| | 150 | } |
| | 151 | } |
| | 152 | |
| | 153 | |
| | 154 | }}} |
| | 155 | |
| | 156 | * 執行結果 |
| | 157 | |
| | 158 | 觀察 /user/hadoop/output-lhm/part-r-00000 檔案的結果 |