wiki:waue/2010/0204-07

Version 3 (modified by waue, 15 years ago) (diff)

--

範例七: LoadHBaseMapper

說明:

此程式碼將HBase的資料取出來,再將結果塞回hdfs上

運算方法:

將此程式運作在hadoop 0.20 平台上,用(參考2)的方法加入hbase參數後,將此程式碼打包成XX.jar

執行:

測試檔案 tsmcHBase_100203.jar

	bin/hadoop jar tsmcHBase_100203.jar LoadHBaseMapper <hdfs_output>

結果:

 $ hadoop fs -cat <hdfs_output>/part-r-00000

	54 30 31	GunLong
	54 30 32	Esing
	54 30 33	SunDon
	54 30 34	StarBucks

注意:

  1. 請注意hbase 上必須要有 table, 並且已經有資料
  1. 運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內

請注意 沒有 <hdfs_output> 資料夾

package tsmc;

// LoadHBaseMapper
//說明: 
//  此程式碼將HBase的資料取出來,再將結果塞回hdfs上
//
//運算方法:
//  將此程式運作在hadoop 0.20 平台上,用(參考2)的方法加入hbase參數後,將此程式碼打包成XX.jar 
//  執行:
//  ---------------------------
//  hadoop jar XX.jar LoadHBaseMapper <hdfs_output>
//  ---------------------------
//
//結果:
// $ hadoop fs -cat <hdfs_output>/part-r-00000
// ---------------------------
//  54 30 31  GunLong
//  54 30 32  Esing
//  54 30 33  SunDon
//  54 30 34  StarBucks
// ---------------------------
//注意:
//1.  請注意hbase 上必須要有 table, 並且已經有資料
//2.  運算完後,程式將執行結果放在你指定 hdfs的<hdfs_output> 內
//    請注意 沒有 <hdfs_output> 資料夾

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class LoadHBaseMapper {
  public static class HtMap extends TableMapper<Text, Text> {

    public void map(ImmutableBytesWritable key, Result value,
        Context context) throws IOException, InterruptedException {

      String res = Bytes.toString(value.getValue(Bytes.toBytes("Detail"),
          Bytes.toBytes("Name")));

      context.write(new Text(key.toString()), new Text(res));

    }
  }

  // TableReducer<KEYIN,VALUEIN,KEYOUT>
  // 原本為 TableReducer<Text, IntWritable, NullWritable >
  // 但在此改成 LongWritable 也可以
  // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
  public static class HtReduce extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {

      String str = new String("");
      Text final_key = new Text(key);
      Text final_value = new Text();
      // 將key值相同的values,透過 && 符號分隔之
      for (Text tmp : values) {
        str += tmp.toString();
      }
      final_value.set(str);
      context.write(final_key, final_value);
    }
  }

  public static void main(String args[]) throws Exception {
    // debug
    String[] argv = { "output-lhm" };
    args = argv;
    String input = args[0];

    String tablename = "tsmc";

    Configuration conf = new Configuration();

    Job job = new Job(conf, tablename + " hbase data to hdfs");
    job.setJarByClass(LoadHBaseMapper.class);

    // // 設定哪張hbase的table為輸入
    conf.set(TableInputFormat.INPUT_TABLE, tablename);
    Scan myScan = new Scan("".getBytes(), "12".getBytes());
    myScan.addColumn("Detail:Name".getBytes());

    // 先用 TableMapReduceUtil 的 initTableMapperJob 與 initTableReducerJob 來做設定
    TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class,
        Text.class, Text.class, job);

    job.setMapperClass(HtMap.class);
    job.setReducerClass(HtReduce.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setInputFormatClass(TableInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileOutputFormat.setOutputPath(job, new Path(input));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}