| 1 | {{{ |
| 2 | #!html |
| 3 | <div style="text-align: center; color:#151B8D"><big style="font-weight: bold;"><big><big> |
| 4 | HBase 進階課程 |
| 5 | </big></big></big></div> <div style="text-align: center; color:#7E2217"><big style="font-weight: bold;"><big> |
| 6 | 程式範例練習 |
| 7 | </big></big></div> |
| 8 | }}} |
| 9 | |
| 10 | [wiki:NCHCCloudCourse100929_4_HBEX6 上一關 < ] 最後一關 [wiki:NCHCCloudCourse100928 >> 回課程大綱] |
| 11 | |
| 12 | = 範例七: LoadHBaseMapper = |
| 13 | |
| 14 | == 說明: == |
| 15 | 此程式碼將HBase的資料取出來,再將結果塞回hdfs上 |
| 16 | |
| 17 | == 結果: == |
| 18 | {{{ |
| 19 | $ hadoop fs -cat <hdfs_output>/part-r-00000 |
| 20 | |
| 21 | 54 30 31 GunLong |
| 22 | 54 30 32 Esing |
| 23 | 54 30 33 SunDon |
| 24 | 54 30 34 StarBucks |
| 25 | }}} |
| 26 | == 注意: == |
| 27 | 1. 請注意hbase 上必須要有 table, 並且已經有資料 |
| 28 | |
| 29 | 2. 運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內 |
| 30 | 請注意 沒有 <hdfs_output> 資料夾 |
| 31 | |
| 32 | |
| 33 | {{{ |
| 34 | #!java |
| 35 | package org.nchc.hbase; |
| 36 | |
| 37 | // LoadHBaseMapper |
| 38 | //說明: |
| 39 | // 此程式碼將HBase的資料取出來,再將結果塞回hdfs上 |
| 40 | // |
| 41 | //運算方法: |
| 42 | // 將此程式運作在hadoop 0.20 平台上,用(參考2)的方法加入hbase參數後,將此程式碼打包成XX.jar |
| 43 | // 執行: |
| 44 | // --------------------------- |
| 45 | // hadoop jar XX.jar LoadHBaseMapper <hdfs_output> |
| 46 | // --------------------------- |
| 47 | // |
| 48 | //結果: |
| 49 | // $ hadoop fs -cat <hdfs_output>/part-r-00000 |
| 50 | // --------------------------- |
| 51 | // 54 30 31 GunLong |
| 52 | // 54 30 32 Esing |
| 53 | // 54 30 33 SunDon |
| 54 | // 54 30 34 StarBucks |
| 55 | // --------------------------- |
| 56 | //注意: |
| 57 | //1. 請注意hbase 上必須要有 table, 並且已經有資料 |
| 58 | //2. 運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內 |
| 59 | // 請注意 沒有 <hdfs_output> 資料夾 |
| 60 | |
| 61 | import java.io.IOException; |
| 62 | |
| 63 | import org.apache.hadoop.conf.Configuration; |
| 64 | import org.apache.hadoop.fs.Path; |
| 65 | import org.apache.hadoop.hbase.client.Result; |
| 66 | import org.apache.hadoop.hbase.client.Scan; |
| 67 | import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| 68 | import org.apache.hadoop.hbase.mapreduce.TableInputFormat; |
| 69 | import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; |
| 70 | import org.apache.hadoop.hbase.mapreduce.TableMapper; |
| 71 | import org.apache.hadoop.hbase.util.Bytes; |
| 72 | import org.apache.hadoop.io.Text; |
| 73 | import org.apache.hadoop.mapreduce.Job; |
| 74 | import org.apache.hadoop.mapreduce.Reducer; |
| 75 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| 76 | import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; |
| 77 | |
| 78 | public class LoadHBaseMapper { |
| 79 | public static class HtMap extends TableMapper<Text, Text> { |
| 80 | |
| 81 | public void map(ImmutableBytesWritable key, Result value, |
| 82 | Context context) throws IOException, InterruptedException { |
| 83 | |
| 84 | String res = Bytes.toString(value.getValue(Bytes.toBytes("Detail"), |
| 85 | Bytes.toBytes("Name"))); |
| 86 | |
| 87 | context.write(new Text(key.toString()), new Text(res)); |
| 88 | |
| 89 | } |
| 90 | } |
| 91 | |
| 92 | // TableReducer<KEYIN,VALUEIN,KEYOUT> |
| 93 | // 原本為 TableReducer<Text, IntWritable, NullWritable > |
| 94 | // 但在此改成 LongWritable 也可以 |
| 95 | // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可 |
| 96 | public static class HtReduce extends Reducer<Text, Text, Text, Text> { |
| 97 | |
| 98 | public void reduce(Text key, Iterable<Text> values, Context context) |
| 99 | throws IOException, InterruptedException { |
| 100 | |
| 101 | String str = new String(""); |
| 102 | Text final_key = new Text(key); |
| 103 | Text final_value = new Text(); |
| 104 | // 將key值相同的values,透過 && 符號分隔之 |
| 105 | for (Text tmp : values) { |
| 106 | str += tmp.toString(); |
| 107 | } |
| 108 | final_value.set(str); |
| 109 | context.write(final_key, final_value); |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | public static void main(String args[]) throws Exception { |
| 114 | // debug |
| 115 | String[] argv = { "output-lhm" }; |
| 116 | args = argv; |
| 117 | String input = args[0]; |
| 118 | |
| 119 | String tablename = "tsmc"; |
| 120 | |
| 121 | Configuration conf = new Configuration(); |
| 122 | |
| 123 | Job job = new Job(conf, tablename + " hbase data to hdfs"); |
| 124 | job.setJarByClass(LoadHBaseMapper.class); |
| 125 | |
| 126 | // // 設定哪張hbase的table為輸入 |
| 127 | conf.set(TableInputFormat.INPUT_TABLE, tablename); |
| 128 | Scan myScan = new Scan("".getBytes(), "12".getBytes()); |
| 129 | myScan.addColumn("Detail:Name".getBytes()); |
| 130 | |
| 131 | // 先用 TableMapReduceUtil 的 initTableMapperJob 與 initTableReducerJob 來做設定 |
| 132 | TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class, |
| 133 | Text.class, Text.class, job); |
| 134 | |
| 135 | job.setMapperClass(HtMap.class); |
| 136 | job.setReducerClass(HtReduce.class); |
| 137 | |
| 138 | job.setMapOutputKeyClass(Text.class); |
| 139 | job.setMapOutputValueClass(Text.class); |
| 140 | |
| 141 | job.setInputFormatClass(TableInputFormat.class); |
| 142 | job.setOutputFormatClass(TextOutputFormat.class); |
| 143 | |
| 144 | job.setOutputKeyClass(Text.class); |
| 145 | job.setOutputValueClass(Text.class); |
| 146 | |
| 147 | FileOutputFormat.setOutputPath(job, new Path(input)); |
| 148 | |
| 149 | System.exit(job.waitForCompletion(true) ? 0 : 1); |
| 150 | } |
| 151 | } |
| 152 | }}} |
| 153 | |
| 154 | |