source: sample/hadoop-0.16/tw/org/nchc/demo/DemoWordCountTuple2.java

Last change on this file was 28, checked in by waue, 16 years ago

resolve 0.17 ->0.16 problem . all? not sure ! XD

File size: 4.2 KB
Line 
1/**
2 * Program: HBaseRecordPro.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7/*
8 * Cloud9: A MapReduce Library for Hadoop
9 */
10
11package tw.org.nchc.demo;
12
13import java.io.IOException;
14import java.util.Iterator;
15
16import org.apache.hadoop.fs.Path;
17import org.apache.hadoop.io.IntWritable;
18import org.apache.hadoop.io.LongWritable;
19import org.apache.hadoop.io.Text;
20import org.apache.hadoop.mapred.JobClient;
21import org.apache.hadoop.mapred.JobConf;
22import org.apache.hadoop.mapred.MapReduceBase;
23import org.apache.hadoop.mapred.Mapper;
24import org.apache.hadoop.mapred.OutputCollector;
25import org.apache.hadoop.mapred.Reducer;
26import org.apache.hadoop.mapred.Reporter;
27import org.apache.hadoop.mapred.SequenceFileInputFormat;
28import org.apache.hadoop.mapred.SequenceFileOutputFormat;
29
30import tw.org.nchc.tuple.ListWritable;
31import tw.org.nchc.tuple.Schema;
32import tw.org.nchc.tuple.Tuple;
33
34/**
35 * <p>
36 * Demo that illustrates the use of the tuple library ({@link Tuple} and
37 * {@link ListWritable} class). Input comes from Bible+Shakespeare sample
38 * collection, encoded with {@link DemoPackRecords2}. Otherwise, this demo is
39 * exactly the same as {@link DemoWordCountTuple}.
40 * </p>
41 */
42public class DemoWordCountTuple2 {
43
44  // create the schema for the tuple that will serve as the key
45  private static final Schema KEY_SCHEMA = new Schema();
46
47  // define the schema statically
48  static {
49    KEY_SCHEMA.addField("Token", String.class, "");
50    KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
51  }
52
53  // mapper that emits tuple as the key, and value '1' for each occurrence
54  private static class MapClass extends MapReduceBase implements
55      Mapper<LongWritable, Tuple, Tuple, IntWritable> {
56
57    // define value '1' statically so we can reuse the object, i.e., avoid
58    // unnecessary object creation
59    private final static IntWritable one = new IntWritable(1);
60
61    // once again, reuse tuples if possible
62    private Tuple tupleOut = KEY_SCHEMA.instantiate();
63
64    public void map(LongWritable key, Tuple tupleIn,
65        OutputCollector<Tuple, IntWritable> output, Reporter reporter)
66        throws IOException {
67
68      @SuppressWarnings("unchecked")
69      ListWritable<Text> list = (ListWritable<Text>) tupleIn.get(1);
70
71      for (int i = 0; i < list.size(); i++) {
72        Text t = (Text) list.get(i);
73
74        String token = t.toString();
75
76        // put new values into the tuple
77        tupleOut.set("Token", token);
78        tupleOut.set("EvenOrOdd", ((Integer) tupleIn.get(0)) % 2);
79
80        // emit key-value pair
81        output.collect(tupleOut, one);
82      }
83    }
84  }
85
86  // reducer counts up tuple occurrences
87  private static class ReduceClass extends MapReduceBase implements
88      Reducer<Tuple, IntWritable, Tuple, IntWritable> {
89    private final static IntWritable SumValue = new IntWritable();
90
91    public synchronized void reduce(Tuple tupleKey,
92        Iterator<IntWritable> values,
93        OutputCollector<Tuple, IntWritable> output, Reporter reporter)
94        throws IOException {
95      // sum values
96      int sum = 0;
97      while (values.hasNext()) {
98        sum += values.next().get();
99      }
100
101      // keep original tuple key, emit sum of counts as value
102      SumValue.set(sum);
103      output.collect(tupleKey, SumValue);
104    }
105  }
106
107  // dummy constructor
108  private DemoWordCountTuple2() {
109  }
110
111  /**
112   * Runs the demo.
113   */
114  public static void main(String[] args) throws IOException {
115    String inPath = "/shared/sample-input/bible+shakes.nopunc.packed2";
116    String outputPath = "word-counts2-tuple";
117    int numMapTasks = 20;
118    int numReduceTasks = 20;
119
120    JobConf conf = new JobConf(DemoWordCountTuple2.class);
121    conf.setJobName("wordcount");
122
123    conf.setNumMapTasks(numMapTasks);
124    conf.setNumReduceTasks(numReduceTasks);
125   
126    // 0.16
127    conf.setInputPath(new Path(inPath));
128//    Convert.setInputPath(conf,new Path(inPath));
129    conf.setInputFormat(SequenceFileInputFormat.class);
130    // 0.16
131    conf.setOutputPath(new Path(outputPath));
132    //    Convert.setInputPath(conf, new Path(outputPath));
133
134   
135    conf.setOutputKeyClass(Tuple.class);
136    conf.setOutputValueClass(IntWritable.class);
137    conf.setOutputFormat(SequenceFileOutputFormat.class);
138
139    conf.setMapperClass(MapClass.class);
140    conf.setCombinerClass(ReduceClass.class);
141    conf.setReducerClass(ReduceClass.class);
142
143    JobClient.runJob(conf);
144  }
145}
Note: See TracBrowser for help on using the repository browser.