Changes between Initial Version and Version 1 of waue/2011/0426_4_7


Ignore:
Timestamp:
Apr 25, 2011, 3:55:59 PM (13 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2011/0426_4_7

    v1 v1  
     1{{{
     2#!html
     3<div style="text-align: center; color:#151B8D"><big style="font-weight: bold;"><big><big>
     4ITRI HBase 進階課程
     5</big></big></big></div> <div style="text-align: center; color:#7E2217"><big style="font-weight: bold;"><big>
     6HBase 範例
     7</big></big></div>
     8}}}
     9
     10[wiki:waue/2011/0426_4_6 上一關 < ] 最後一關 [wiki:waue/2011/0426 >> 回課程大綱]
     11
     12 = 範例七: LoadHBaseMapper =
     13
     14 == 說明:  ==
     15        此程式碼將HBase的資料取出來,再將結果塞回hdfs上
     16{{{
     17$ bin/hadoop jar ItriMenu.jar LoadHBaseMapper output
     18}}}
     19
     20== 注意: ==
     211.      請注意之前已經run 過 範例六
     22
     232.      運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內
     24                請注意 沒有 <hdfs_output> 資料夾
     25
     26
     27{{{
     28#!java
     29
     30package itri;
     31
     32// LoadHBaseMapper
     33//說明:
     34//      此程式碼將HBase的資料取出來,再將結果塞回hdfs上
     35//
     36//運算方法:
     37//      將此程式運作在hadoop 0.20 平台上,用(參考2)的方法加入hbase參數後,將此程式碼打包成XX.jar
     38//  執行:
     39//      ---------------------------
     40//      hadoop jar XX.jar LoadHBaseMapper <hdfs_output>
     41//      ---------------------------
     42//
     43//結果:
     44// $ hadoop fs -cat <hdfs_output>/part-r-00000
     45// ---------------------------
     46//      54 30 31        GunLong
     47//      54 30 32        Esing
     48//      54 30 33        SunDon
     49//      54 30 34        StarBucks
     50// ---------------------------
     51//注意:
     52//1.    請注意hbase 上必須要有 table, 並且已經有資料
     53//2.    運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內
     54//              請注意 沒有 <hdfs_output> 資料夾
     55
     56import java.io.IOException;
     57
     58import org.apache.hadoop.conf.Configuration;
     59import org.apache.hadoop.fs.Path;
     60import org.apache.hadoop.hbase.client.Result;
     61import org.apache.hadoop.hbase.client.Scan;
     62import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     63import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
     64import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
     65import org.apache.hadoop.hbase.mapreduce.TableMapper;
     66import org.apache.hadoop.hbase.util.Bytes;
     67import org.apache.hadoop.io.Text;
     68import org.apache.hadoop.mapreduce.Job;
     69import org.apache.hadoop.mapreduce.Reducer;
     70import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     71import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     72import org.apache.hadoop.util.GenericOptionsParser;
     73
     74public class LoadHBaseMapper {
     75        public static class HtMap extends TableMapper<Text, Text> {
     76
     77                public void map(ImmutableBytesWritable key, Result value,
     78                                Context context) throws IOException, InterruptedException {
     79
     80                        String word = Bytes.toString(value.getValue(Bytes.toBytes("content"),
     81                                        Bytes.toBytes("word")));
     82                        String count = Bytes.toString(value.getValue(Bytes.toBytes("content"),
     83                                        Bytes.toBytes("count")));
     84                       
     85                        context.write(new Text(key.toString()), new Text(word+"="+count));
     86
     87                }
     88        }
     89
     90        // TableReducer<KEYIN,VALUEIN,KEYOUT>
     91        // 原本為 TableReducer<Text, IntWritable, NullWritable >
     92        // 但在此改成 LongWritable 也可以
     93        // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
     94        public static class HtReduce extends Reducer<Text, Text, Text, Text> {
     95
     96                public void reduce(Text key, Iterable<Text> values, Context context)
     97                                throws IOException, InterruptedException {
     98
     99                        String str = new String("");
     100                        Text final_key = new Text(key);
     101                        Text final_value = new Text();
     102                        // 將key值相同的values,透過 && 符號分隔之
     103                        for (Text tmp : values) {
     104                                str += tmp.toString();
     105                        }
     106                        final_value.set(str);
     107                        context.write(final_key, final_value);
     108                }
     109        }
     110
     111        public static void main(String argv[]) throws Exception {
     112//              String[] argc = {"output-0426"}; argv=argc;
     113                Configuration conf = new Configuration();
     114                String[] otherArgs = new GenericOptionsParser(conf, argv)
     115                                .getRemainingArgs();
     116                if (otherArgs.length < 1) {
     117                        System.out.println("LoadHBaseMapper <hdfsOutDir>");
     118                        return;
     119                }
     120                String tablename = "wordcounthbase";
     121                String output = otherArgs[0];
     122
     123                Job job = new Job(conf, tablename + " hbase data to hdfs");
     124                job.setJarByClass(LoadHBaseMapper.class);
     125
     126                // // 設定哪張hbase的table為輸入
     127                conf.set(TableInputFormat.INPUT_TABLE, tablename);
     128                Scan myScan = new Scan("".getBytes(), "12".getBytes());
     129                myScan.addColumn("content:count".getBytes());
     130                myScan.addColumn("content:word".getBytes());
     131                // 先用 TableMapReduceUtil 的 initTableMapperJob 與 initTableReducerJob 來做設定
     132                TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class,
     133                                Text.class, Text.class, job);
     134
     135                job.setMapperClass(HtMap.class);
     136                job.setReducerClass(HtReduce.class);
     137
     138                job.setMapOutputKeyClass(Text.class);
     139                job.setMapOutputValueClass(Text.class);
     140
     141                job.setInputFormatClass(TableInputFormat.class);
     142                job.setOutputFormatClass(TextOutputFormat.class);
     143
     144                job.setOutputKeyClass(Text.class);
     145                job.setOutputValueClass(Text.class);
     146
     147                FileOutputFormat.setOutputPath(job, new Path(output));
     148
     149                System.exit(job.waitForCompletion(true) ? 0 : 1);
     150        }
     151}
     152
     153
     154}}}
     155
     156 * 執行結果
     157
     158觀察 /user/hadoop/output-lhm/part-r-00000 檔案的結果