source: sample/hadoop-0.17/tw/org/nchc/demo/DemoHBaseSink.java @ 246

Last change on this file since 246 was 20, checked in by waue, 16 years ago

將改完的 hadoop 0.17版package 放來備份
目前繼續開發 hadoop 0.16 + hbase 1.3

File size: 2.5 KB
Line 
1/**
2 * Program: DemoHBaseSink.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 * Upgrade to 0.17
7 * Re-code from : Cloud9: A MapReduce Library for Hadoop
8 */
9/*
10 * Cloud9: A MapReduce Library for Hadoop
11 */
12
13package tw.org.nchc.demo;
14
15import java.io.IOException;
16import java.util.Iterator;
17
18import org.apache.hadoop.fs.Path;
19import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20import org.apache.hadoop.hbase.mapred.TableReduce;
21import org.apache.hadoop.io.LongWritable;
22import org.apache.hadoop.io.MapWritable;
23import org.apache.hadoop.io.Text;
24import org.apache.hadoop.mapred.JobClient;
25import org.apache.hadoop.mapred.JobConf;
26import org.apache.hadoop.mapred.OutputCollector;
27import org.apache.hadoop.mapred.Reporter;
28import org.apache.hadoop.mapred.lib.IdentityMapper;
29import org.apache.hadoop.mapred.lib.IdentityReducer;
30
31import tw.org.nchc.code.Convert;
32
33/**
34 *
35 */
36public class DemoHBaseSink {
37
38  private static class ReduceClass extends TableReduce<LongWritable, Text> {
39
40    // this is the column we're going to be writing
41    private static final Text col = new Text("default:text");
42
43    // this map holds the columns per row
44    private MapWritable map = new MapWritable();
45
46    public void reduce(LongWritable key, Iterator<Text> values,
47        OutputCollector<Text, MapWritable> output, Reporter reporter)
48        throws IOException {
49
50      // contents must be ImmutableBytesWritable
51      ImmutableBytesWritable bytes = new ImmutableBytesWritable(values
52          .next().getBytes());
53
54      // populate the current row
55      map.clear();
56      map.put(col, bytes);
57
58      // add the row with the key as the row id
59      output.collect(new Text(key.toString()), map);
60    }
61  }
62
63  private DemoHBaseSink() {
64  }
65
66  /**
67   * Runs the demo.
68   */
69  public static void main(String[] args) throws IOException {
70    String filename = "/shared/sample";
71
72    int mapTasks = 1;
73    int reduceTasks = 1;
74
75    JobConf conf = new JobConf(DemoHBaseSink.class);
76    conf.setJobName("wordcount");
77
78    // must initialize the TableReduce before running job
79    TableReduce.initJob("test", ReduceClass.class, conf);
80
81    conf.setNumMapTasks(mapTasks);
82    conf.setNumReduceTasks(reduceTasks);
83    // 0.16
84    // conf.setInputPath(new Path(filename));
85    Convert.setInputPath(conf, new Path(filename));
86
87    conf.setMapperClass(IdentityMapper.class);
88    conf.setCombinerClass(IdentityReducer.class);
89    conf.setReducerClass(ReduceClass.class);
90
91    JobClient.runJob(conf);
92  }
93}
Note: See TracBrowser for help on using the repository browser.