source: sample/hadoop-0.16/tw/org/nchc/demo/DemoHBaseSink.java @ 27

Last change on this file since 27 was 27, checked in by waue, 16 years ago

test!

File size: 2.3 KB
Line 
1/**
2 * Program: DemoHBaseSink.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 * Re-code from : Cloud9: A MapReduce Library for Hadoop
7 */
8/*
9 * Cloud9: A MapReduce Library for Hadoop
10 */
11
12package tw.org.nchc.demo;
13
14import java.io.IOException;
15import java.util.Iterator;
16
17import org.apache.hadoop.fs.Path;
18import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19import org.apache.hadoop.hbase.mapred.TableReduce;
20import org.apache.hadoop.io.LongWritable;
21import org.apache.hadoop.io.MapWritable;
22import org.apache.hadoop.io.Text;
23import org.apache.hadoop.mapred.JobClient;
24import org.apache.hadoop.mapred.JobConf;
25import org.apache.hadoop.mapred.OutputCollector;
26import org.apache.hadoop.mapred.Reporter;
27import org.apache.hadoop.mapred.lib.IdentityMapper;
28import org.apache.hadoop.mapred.lib.IdentityReducer;
29
30/**
31 *
32 */
33public class DemoHBaseSink {
34
35  private static class ReduceClass extends TableReduce<LongWritable, Text> {
36
37    // this is the column we're going to be writing
38    private static final Text col = new Text("default:text");
39
40    // this map holds the columns per row
41    private MapWritable map = new MapWritable();
42
43    public void reduce(LongWritable key, Iterator<Text> values,
44        OutputCollector<Text, MapWritable> output, Reporter reporter)
45        throws IOException {
46
47      // contents must be ImmutableBytesWritable
48      ImmutableBytesWritable bytes = new ImmutableBytesWritable(values
49          .next().getBytes());
50
51      // populate the current row
52      map.clear();
53      map.put(col, bytes);
54
55      // add the row with the key as the row id
56      output.collect(new Text(key.toString()), map);
57    }
58  }
59
60  private DemoHBaseSink() {
61  }
62
63  /**
64   * Runs the demo.
65   */
66  public static void main(String[] args) throws IOException {
67    String filename = "/shared/sample";
68
69    int mapTasks = 1;
70    int reduceTasks = 1;
71
72    JobConf conf = new JobConf(DemoHBaseSink.class);
73    conf.setJobName("wordcount");
74
75    // must initialize the TableReduce before running job
76    TableReduce.initJob("test", ReduceClass.class, conf);
77
78    conf.setNumMapTasks(mapTasks);
79    conf.setNumReduceTasks(reduceTasks);
80    conf.setInputPath(new Path(filename));
81    conf.setMapperClass(IdentityMapper.class);
82    conf.setCombinerClass(IdentityReducer.class);
83    conf.setReducerClass(ReduceClass.class);
84
85    JobClient.runJob(conf);
86  }
87}
Note: See TracBrowser for help on using the repository browser.