ITRI HBase 進階課程
HBase 範例
範例七: LoadHBaseMapper
說明:
此程式碼將HBase的資料取出來,再將結果塞回hdfs上
$ bin/hadoop jar ItriMenu.jar LoadHBaseMapper output
注意:
- 請注意之前已經run 過 範例六
- 運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內
請注意 沒有 <hdfs_output> 資料夾
package itri; // LoadHBaseMapper //說明: // 此程式碼將HBase的資料取出來,再將結果塞回hdfs上 // //運算方法: // 將此程式運作在hadoop 0.20 平台上,用(參考2)的方法加入hbase參數後,將此程式碼打包成XX.jar // 執行: // --------------------------- // hadoop jar XX.jar LoadHBaseMapper <hdfs_output> // --------------------------- // //結果: // $ hadoop fs -cat <hdfs_output>/part-r-00000 //注意: //1. 請注意hbase 上必須要有 table, 並且已經有資料 //2. 運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內 // 請注意 沒有 <hdfs_output> 資料夾 import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class LoadHBaseMapper { public static class HtMap extends TableMapper<Text, Text> { public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String word = Bytes.toString(value.getValue(Bytes.toBytes("content"), Bytes.toBytes("word"))); String count = Bytes.toString(value.getValue(Bytes.toBytes("content"), Bytes.toBytes("count"))); context.write(new Text(key.toString()), new Text(word+"="+count)); } } // TableReducer<KEYIN,VALUEIN,KEYOUT> // 原本為 TableReducer<Text, IntWritable, NullWritable > // 但在此改成 LongWritable 也可以 // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可 public static class HtReduce extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String str = new String(""); Text final_key = new Text(key); Text final_value = new Text(); // 將key值相同的values,透過 && 符號分隔之 for (Text tmp : values) { str += tmp.toString(); } final_value.set(str); context.write(final_key, final_value); } } public static void main(String argv[]) throws Exception { // String[] argc = {"output-0426"}; argv=argc; Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, argv) .getRemainingArgs(); if (otherArgs.length < 1) { System.out.println("LoadHBaseMapper <hdfsOutDir>"); return; } String tablename = "wordcounthbase"; String output = otherArgs[0]; Job job = new Job(conf, tablename + " hbase data to hdfs"); job.setJarByClass(LoadHBaseMapper.class); // // 設定哪張hbase的table為輸入 conf.set(TableInputFormat.INPUT_TABLE, tablename); Scan myScan = new Scan("".getBytes(), "12".getBytes()); myScan.addColumn("content:count".getBytes()); myScan.addColumn("content:word".getBytes()); // 先用 TableMapReduceUtil 的 initTableMapperJob 與 initTableReducerJob 來做設定 TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class, Text.class, Text.class, job); job.setMapperClass(HtMap.class); job.setReducerClass(HtReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TableInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileOutputFormat.setOutputPath(job, new Path(output)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
- 執行結果
觀察 /user/hadoop/output-lhm/part-r-00000 檔案的結果
Last modified 14 years ago
Last modified on Apr 25, 2011, 3:58:51 PM