source: sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCountTuple2.java @ 20

Last change on this file since 20 was 20, checked in by waue, 16 years ago

將改完的 hadoop 0.17版package 放來備份
目前繼續開發 hadoop 0.16 + hbase 1.3

File size: 4.2 KB
Line 
1/**
2 * Program: HBaseRecordPro.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 * Upgrade to 0.17
7 */
8/*
9 * Cloud9: A MapReduce Library for Hadoop
10 */
11
12package tw.org.nchc.demo;
13
14import java.io.IOException;
15import java.util.Iterator;
16
17import org.apache.hadoop.fs.Path;
18import org.apache.hadoop.io.IntWritable;
19import org.apache.hadoop.io.LongWritable;
20import org.apache.hadoop.io.Text;
21import org.apache.hadoop.mapred.JobClient;
22import org.apache.hadoop.mapred.JobConf;
23import org.apache.hadoop.mapred.MapReduceBase;
24import org.apache.hadoop.mapred.Mapper;
25import org.apache.hadoop.mapred.OutputCollector;
26import org.apache.hadoop.mapred.Reducer;
27import org.apache.hadoop.mapred.Reporter;
28import org.apache.hadoop.mapred.SequenceFileInputFormat;
29import org.apache.hadoop.mapred.SequenceFileOutputFormat;
30
31import tw.org.nchc.code.Convert;
32import tw.org.nchc.tuple.ListWritable;
33import tw.org.nchc.tuple.Schema;
34import tw.org.nchc.tuple.Tuple;
35
36/**
37 * <p>
38 * Demo that illustrates the use of the tuple library ({@link Tuple} and
39 * {@link ListWritable} class). Input comes from Bible+Shakespeare sample
40 * collection, encoded with {@link DemoPackRecords2}. Otherwise, this demo is
41 * exactly the same as {@link DemoWordCountTuple}.
42 * </p>
43 */
44public class DemoWordCountTuple2 {
45
46  // create the schema for the tuple that will serve as the key
47  private static final Schema KEY_SCHEMA = new Schema();
48
49  // define the schema statically
50  static {
51    KEY_SCHEMA.addField("Token", String.class, "");
52    KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
53  }
54
55  // mapper that emits tuple as the key, and value '1' for each occurrence
56  private static class MapClass extends MapReduceBase implements
57      Mapper<LongWritable, Tuple, Tuple, IntWritable> {
58
59    // define value '1' statically so we can reuse the object, i.e., avoid
60    // unnecessary object creation
61    private final static IntWritable one = new IntWritable(1);
62
63    // once again, reuse tuples if possible
64    private Tuple tupleOut = KEY_SCHEMA.instantiate();
65
66    public void map(LongWritable key, Tuple tupleIn,
67        OutputCollector<Tuple, IntWritable> output, Reporter reporter)
68        throws IOException {
69
70      @SuppressWarnings("unchecked")
71      ListWritable<Text> list = (ListWritable<Text>) tupleIn.get(1);
72
73      for (int i = 0; i < list.size(); i++) {
74        Text t = (Text) list.get(i);
75
76        String token = t.toString();
77
78        // put new values into the tuple
79        tupleOut.set("Token", token);
80        tupleOut.set("EvenOrOdd", ((Integer) tupleIn.get(0)) % 2);
81
82        // emit key-value pair
83        output.collect(tupleOut, one);
84      }
85    }
86  }
87
88  // reducer counts up tuple occurrences
89  private static class ReduceClass extends MapReduceBase implements
90      Reducer<Tuple, IntWritable, Tuple, IntWritable> {
91    private final static IntWritable SumValue = new IntWritable();
92
93    public synchronized void reduce(Tuple tupleKey,
94        Iterator<IntWritable> values,
95        OutputCollector<Tuple, IntWritable> output, Reporter reporter)
96        throws IOException {
97      // sum values
98      int sum = 0;
99      while (values.hasNext()) {
100        sum += values.next().get();
101      }
102
103      // keep original tuple key, emit sum of counts as value
104      SumValue.set(sum);
105      output.collect(tupleKey, SumValue);
106    }
107  }
108
109  // dummy constructor
110  private DemoWordCountTuple2() {
111  }
112
113  /**
114   * Runs the demo.
115   */
116  public static void main(String[] args) throws IOException {
117    String inPath = "/shared/sample-input/bible+shakes.nopunc.packed2";
118    String outputPath = "word-counts2-tuple";
119    int numMapTasks = 20;
120    int numReduceTasks = 20;
121
122    JobConf conf = new JobConf(DemoWordCountTuple2.class);
123    conf.setJobName("wordcount");
124
125    conf.setNumMapTasks(numMapTasks);
126    conf.setNumReduceTasks(numReduceTasks);
127   
128    // 0.16
129//    conf.setInputPath(new Path(inPath));
130    Convert.setInputPath(conf,new Path(inPath));
131    conf.setInputFormat(SequenceFileInputFormat.class);
132    // 0.16
133//    conf.setOutputPath(new Path(outputPath));
134    Convert.setInputPath(conf, new Path(outputPath));
135   
136    conf.setOutputKeyClass(Tuple.class);
137    conf.setOutputValueClass(IntWritable.class);
138    conf.setOutputFormat(SequenceFileOutputFormat.class);
139
140    conf.setMapperClass(MapClass.class);
141    conf.setCombinerClass(ReduceClass.class);
142    conf.setReducerClass(ReduceClass.class);
143
144    JobClient.runJob(conf);
145  }
146}
Note: See TracBrowser for help on using the repository browser.