source: sample/hadoop-0.17/tw/org/nchc/demo/DemoHBaseSource.java @ 20

Last change on this file since 20 was 20, checked in by waue, 16 years ago

將改完的 hadoop 0.17版package 放來備份
目前繼續開發 hadoop 0.16 + hbase 1.3

File size: 3.2 KB
Line 
1/**
2 * Program: DemoHBaseSource.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 * Upgrade to 0.17
7 * Re-code from : Cloud9: A MapReduce Library for Hadoop
8 */
9
10package tw.org.nchc.demo;
11
12import java.io.IOException;
13import java.util.Iterator;
14import java.util.StringTokenizer;
15
16import org.apache.hadoop.fs.Path;
17import org.apache.hadoop.hbase.HStoreKey;
18import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19import org.apache.hadoop.hbase.mapred.TableInputFormat;
20import org.apache.hadoop.hbase.mapred.TableMap;
21import org.apache.hadoop.io.IntWritable;
22import org.apache.hadoop.io.MapWritable;
23import org.apache.hadoop.io.Text;
24import org.apache.hadoop.mapred.JobClient;
25import org.apache.hadoop.mapred.JobConf;
26import org.apache.hadoop.mapred.MapReduceBase;
27import org.apache.hadoop.mapred.OutputCollector;
28import org.apache.hadoop.mapred.Reducer;
29import org.apache.hadoop.mapred.Reporter;
30
31import tw.org.nchc.code.Convert;
32
33/**
34 *
35 */
36public class DemoHBaseSource {
37
38  // mapper: emits (token, 1) for every word occurrence
39  private static class MapClass extends TableMap<Text, IntWritable> {
40
41    // reuse objects to save overhead of object creation
42    private final static IntWritable one = new IntWritable(1);
43    private final static Text textcol = new Text("default:text");
44    private Text word = new Text();
45
46    public void map(HStoreKey key, MapWritable cols,
47        OutputCollector<Text, IntWritable> output, Reporter reporter)
48        throws IOException {
49
50      String line = Text.decode(((ImmutableBytesWritable) cols
51          .get(textcol)).get());
52
53      StringTokenizer itr = new StringTokenizer(line);
54      while (itr.hasMoreTokens()) {
55        word.set(itr.nextToken());
56        output.collect(word, one);
57      }
58    }
59  }
60
61  // reducer: sums up all the counts
62  private static class ReduceClass extends MapReduceBase implements
63      Reducer<Text, IntWritable, Text, IntWritable> {
64
65    // reuse objects
66    private final static IntWritable SumValue = new IntWritable();
67
68    public void reduce(Text key, Iterator<IntWritable> values,
69        OutputCollector<Text, IntWritable> output, Reporter reporter)
70        throws IOException {
71      // sum up values
72      int sum = 0;
73      while (values.hasNext()) {
74        sum += values.next().get();
75      }
76      SumValue.set(sum);
77      output.collect(key, SumValue);
78    }
79  }
80
81  private DemoHBaseSource() {
82  }
83
84  /**
85   * Runs the demo.
86   */
87  public static void main(String[] args) throws IOException {
88    String outputPath = "sample-counts2";
89
90    int mapTasks = 1;
91    int reduceTasks = 1;
92
93    JobConf conf = new JobConf(DemoHBaseSource.class);
94
95    TableMap.initJob("test", "default:text", MapClass.class, conf);
96
97    conf.setJobName("wordcount");
98
99    conf.setNumMapTasks(mapTasks);
100    conf.setNumReduceTasks(reduceTasks);
101
102    conf.setInputFormat(TableInputFormat.class);
103
104    conf.setOutputKeyClass(Text.class);
105    conf.setOutputValueClass(IntWritable.class);
106    //0.16
107//    conf.setOutputPath(new Path(outputPath));
108    Convert.setOutputPath(conf,new Path(outputPath));
109   
110    conf.setMapperClass(MapClass.class);
111    conf.setCombinerClass(ReduceClass.class);
112    conf.setReducerClass(ReduceClass.class);
113
114    JobClient.runJob(conf);
115  }
116}
Note: See TracBrowser for help on using the repository browser.