Changes between Initial Version and Version 1 of NCHCCloudCourse100929_4_HBEX7


Ignore:
Timestamp:
Sep 27, 2010, 6:27:40 PM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • NCHCCloudCourse100929_4_HBEX7

    v1 v1  
     1{{{
     2#!html
     3<div style="text-align: center; color:#151B8D"><big style="font-weight: bold;"><big><big>
     4HBase 進階課程
     5</big></big></big></div> <div style="text-align: center; color:#7E2217"><big style="font-weight: bold;"><big>
     6程式範例練習
     7</big></big></div>
     8}}}
     9
     10[wiki:NCHCCloudCourse100929_4_HBEX6 上一關 < ] 最後一關 [wiki:NCHCCloudCourse100928 >> 回課程大綱]
     11
     12 = 範例七: LoadHBaseMapper =
     13
     14 == 說明:  ==
     15        此程式碼將HBase的資料取出來,再將結果塞回hdfs上
     16
     17 == 結果: ==
     18{{{
     19 $ hadoop fs -cat <hdfs_output>/part-r-00000
     20
     21        54 30 31        GunLong
     22        54 30 32        Esing
     23        54 30 33        SunDon
     24        54 30 34        StarBucks
     25}}}
     26== 注意: ==
     271.      請注意hbase 上必須要有 table, 並且已經有資料
     28
     292.      運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內
     30                請注意 沒有 <hdfs_output> 資料夾
     31
     32
     33{{{
     34#!java
     35package org.nchc.hbase;
     36
     37// LoadHBaseMapper
     38//說明:
     39//      此程式碼將HBase的資料取出來,再將結果塞回hdfs上
     40//
     41//運算方法:
     42//      將此程式運作在hadoop 0.20 平台上,用(參考2)的方法加入hbase參數後,將此程式碼打包成XX.jar
     43//  執行:
     44//      ---------------------------
     45//      hadoop jar XX.jar LoadHBaseMapper <hdfs_output>
     46//      ---------------------------
     47//
     48//結果:
     49// $ hadoop fs -cat <hdfs_output>/part-r-00000
     50// ---------------------------
     51//      54 30 31        GunLong
     52//      54 30 32        Esing
     53//      54 30 33        SunDon
     54//      54 30 34        StarBucks
     55// ---------------------------
     56//注意:
     57//1.    請注意hbase 上必須要有 table, 並且已經有資料
     58//2.    運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內
     59//              請注意 沒有 <hdfs_output> 資料夾
     60
     61import java.io.IOException;
     62
     63import org.apache.hadoop.conf.Configuration;
     64import org.apache.hadoop.fs.Path;
     65import org.apache.hadoop.hbase.client.Result;
     66import org.apache.hadoop.hbase.client.Scan;
     67import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     68import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
     69import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
     70import org.apache.hadoop.hbase.mapreduce.TableMapper;
     71import org.apache.hadoop.hbase.util.Bytes;
     72import org.apache.hadoop.io.Text;
     73import org.apache.hadoop.mapreduce.Job;
     74import org.apache.hadoop.mapreduce.Reducer;
     75import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     76import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     77
     78public class LoadHBaseMapper {
     79        public static class HtMap extends TableMapper<Text, Text> {
     80
     81                public void map(ImmutableBytesWritable key, Result value,
     82                                Context context) throws IOException, InterruptedException {
     83
     84                        String res = Bytes.toString(value.getValue(Bytes.toBytes("Detail"),
     85                                        Bytes.toBytes("Name")));
     86
     87                        context.write(new Text(key.toString()), new Text(res));
     88
     89                }
     90        }
     91
     92        // TableReducer<KEYIN,VALUEIN,KEYOUT>
     93        // 原本為 TableReducer<Text, IntWritable, NullWritable >
     94        // 但在此改成 LongWritable 也可以
     95        // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
     96        public static class HtReduce extends Reducer<Text, Text, Text, Text> {
     97
     98                public void reduce(Text key, Iterable<Text> values, Context context)
     99                                throws IOException, InterruptedException {
     100
     101                        String str = new String("");
     102                        Text final_key = new Text(key);
     103                        Text final_value = new Text();
     104                        // 將key值相同的values,透過 && 符號分隔之
     105                        for (Text tmp : values) {
     106                                str += tmp.toString();
     107                        }
     108                        final_value.set(str);
     109                        context.write(final_key, final_value);
     110                }
     111        }
     112
     113        public static void main(String args[]) throws Exception {
     114                // debug
     115                String[] argv = { "output-lhm" };
     116                args = argv;
     117                String input = args[0];
     118
     119                String tablename = "tsmc";
     120
     121                Configuration conf = new Configuration();
     122
     123                Job job = new Job(conf, tablename + " hbase data to hdfs");
     124                job.setJarByClass(LoadHBaseMapper.class);
     125
     126                // // 設定哪張hbase的table為輸入
     127                conf.set(TableInputFormat.INPUT_TABLE, tablename);
     128                Scan myScan = new Scan("".getBytes(), "12".getBytes());
     129                myScan.addColumn("Detail:Name".getBytes());
     130
     131                // 先用 TableMapReduceUtil 的 initTableMapperJob 與 initTableReducerJob 來做設定
     132                TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class,
     133                                Text.class, Text.class, job);
     134
     135                job.setMapperClass(HtMap.class);
     136                job.setReducerClass(HtReduce.class);
     137
     138                job.setMapOutputKeyClass(Text.class);
     139                job.setMapOutputValueClass(Text.class);
     140
     141                job.setInputFormatClass(TableInputFormat.class);
     142                job.setOutputFormatClass(TextOutputFormat.class);
     143
     144                job.setOutputKeyClass(Text.class);
     145                job.setOutputValueClass(Text.class);
     146
     147                FileOutputFormat.setOutputPath(job, new Path(input));
     148
     149                System.exit(job.waitForCompletion(true) ? 0 : 1);
     150        }
     151}
     152}}}
     153
     154