Changes between Version 1 and Version 2 of waue/2010/0204-07


Ignore:
Timestamp:
Feb 3, 2010, 9:54:37 PM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0204-07

    v1 v2  
    2727{{{
    2828#!java
     29package tsmc;
    2930
     31// LoadHBaseMapper
     32//說明:
     33//      此程式碼將HBase的資料取出來,再將結果塞回hdfs上
     34//
     35//運算方法:
     36//      將此程式運作在hadoop 0.20 平台上,用(參考2)的方法加入hbase參數後,將此程式碼打包成XX.jar
     37//  執行:
     38//      ---------------------------
     39//      hadoop jar XX.jar LoadHBaseMapper <hdfs_output>
     40//      ---------------------------
     41//
     42//結果:
     43// $ hadoop fs -cat <hdfs_output>/part-r-00000
     44// ---------------------------
     45//      54 30 31        GunLong
     46//      54 30 32        Esing
     47//      54 30 33        SunDon
     48//      54 30 34        StarBucks
     49// ---------------------------
     50//注意:
     51//1.    請注意hbase 上必須要有 table, 並且已經有資料
     52//2.    運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內
     53//              請注意 沒有 <hdfs_output> 資料夾
     54
     55import java.io.IOException;
     56
     57import org.apache.hadoop.conf.Configuration;
     58import org.apache.hadoop.fs.Path;
     59import org.apache.hadoop.hbase.client.Result;
     60import org.apache.hadoop.hbase.client.Scan;
     61import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     62import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
     63import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
     64import org.apache.hadoop.hbase.mapreduce.TableMapper;
     65import org.apache.hadoop.hbase.util.Bytes;
     66import org.apache.hadoop.io.Text;
     67import org.apache.hadoop.mapreduce.Job;
     68import org.apache.hadoop.mapreduce.Reducer;
     69import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     70import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     71
     72public class LoadHBaseMapper {
     73        public static class HtMap extends TableMapper<Text, Text> {
     74
     75                public void map(ImmutableBytesWritable key, Result value,
     76                                Context context) throws IOException, InterruptedException {
     77
     78                        String res = Bytes.toString(value.getValue(Bytes.toBytes("Detail"),
     79                                        Bytes.toBytes("Name")));
     80
     81                        context.write(new Text(key.toString()), new Text(res));
     82
     83                }
     84        }
     85
     86        // TableReducer<KEYIN,VALUEIN,KEYOUT>
     87        // 原本為 TableReducer<Text, IntWritable, NullWritable >
     88        // 但在此改成 LongWritable 也可以
     89        // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
     90        public static class HtReduce extends Reducer<Text, Text, Text, Text> {
     91
     92                public void reduce(Text key, Iterable<Text> values, Context context)
     93                                throws IOException, InterruptedException {
     94
     95                        String str = new String("");
     96                        Text final_key = new Text(key);
     97                        Text final_value = new Text();
     98                        // 將key值相同的values,透過 && 符號分隔之
     99                        for (Text tmp : values) {
     100                                str += tmp.toString();
     101                        }
     102                        final_value.set(str);
     103                        context.write(final_key, final_value);
     104                }
     105        }
     106
     107        public static void main(String args[]) throws Exception {
     108                // debug
     109                String[] argv = { "output-lhm" };
     110                args = argv;
     111                String input = args[0];
     112
     113                String tablename = "tsmc";
     114
     115                Configuration conf = new Configuration();
     116
     117                Job job = new Job(conf, tablename + " hbase data to hdfs");
     118                job.setJarByClass(LoadHBaseMapper.class);
     119
     120                // // 設定哪張hbase的table為輸入
     121                conf.set(TableInputFormat.INPUT_TABLE, tablename);// 用此方法會遇到
     122                                                                                                                        // java.lang.IllegalArgumentException:
     123                                                                                                                        // string cannot be
     124                                                                                                                        // null
     125                Scan myScan = new Scan("".getBytes(), "12".getBytes());
     126                myScan.addColumn("Detail:Name".getBytes());
     127
     128                // 先用 TableMapReduceUtil 的 initTableMapperJob 與 initTableReducerJob 來做設定
     129                TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class,
     130                                Text.class, Text.class, job);
     131
     132                job.setMapperClass(HtMap.class);
     133                job.setReducerClass(HtReduce.class);
     134
     135                job.setMapOutputKeyClass(Text.class);
     136                job.setMapOutputValueClass(Text.class);
     137
     138                job.setInputFormatClass(TableInputFormat.class);
     139                job.setOutputFormatClass(TextOutputFormat.class);
     140
     141                job.setOutputKeyClass(Text.class);
     142                job.setOutputValueClass(Text.class);
     143
     144                FileOutputFormat.setOutputPath(job, new Path(input));
     145
     146                System.exit(job.waitForCompletion(true) ? 0 : 1);
     147        }
     148}
    30149}}}