source: sample/hadoop-0.16/tw/org/nchc/demo/DemoWordCountTuple.java @ 27

Last change on this file since 27 was 27, checked in by waue, 16 years ago

test!

File size: 5.5 KB
Line 
1/**
2 * Program: HBaseRecordPro.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7/*
8 * Cloud9: A MapReduce Library for Hadoop
9 */
10
11package tw.org.nchc.demo;
12
13import java.io.IOException;
14import java.util.Iterator;
15import java.util.StringTokenizer;
16
17import org.apache.hadoop.fs.Path;
18import org.apache.hadoop.io.IntWritable;
19import org.apache.hadoop.io.LongWritable;
20import org.apache.hadoop.mapred.JobClient;
21import org.apache.hadoop.mapred.JobConf;
22import org.apache.hadoop.mapred.MapReduceBase;
23import org.apache.hadoop.mapred.Mapper;
24import org.apache.hadoop.mapred.OutputCollector;
25import org.apache.hadoop.mapred.Reducer;
26import org.apache.hadoop.mapred.Reporter;
27import org.apache.hadoop.mapred.SequenceFileInputFormat;
28import org.apache.hadoop.mapred.SequenceFileOutputFormat;
29
30import tw.org.nchc.tuple.Schema;
31import tw.org.nchc.tuple.Tuple;
32
33/**
34 * <p>
35 * Demo that illustrates the use of the tuple library ({@link Tuple} class).
36 * Input comes from Bible+Shakespeare sample collection, encoded as single-field
37 * tuples; see {@link DemoPackRecords}. Sample of final output:
38 * </p>
39 *
40 * <pre>
41 * ...
42 * (admirable, 0)    9
43 * (admirable, 1)    6
44 * (admiral, 0)      2
45 * (admiral, 1)      4
46 * (admiration, 0)  10
47 * (admiration, 1)   6
48 * (admire, 0)       5
49 * (admire, 1)       3
50 * (admired, 0)     12
51 * (admired, 1)      7
52 * ...
53 * </pre>
54 *
55 * <p>
56 * The first field of the key tuple contains a token, the second field indicates
57 * whether it was found on a even-length or odd-length line. The value is the
58 * count of the tuple occurrences in the collection. In the MapReduce cycle,
59 * output keys consist of tuples (Token, EvenOrOdd). The second field of the
60 * tuple indicates whether the token was found on a line with an even or an odd
61 * number of characters. Values consist of counts of tuple occurrences. Expected
62 * trace of the demo:
63 * </p>
64 *
65 * <pre>
66 * Map input records=156215
67 * Map output records=1734298
68 * Map input bytes=13118917
69 * Map output bytes=66214039
70 * Combine input records=1734298
71 * Combine output records=192045
72 * Reduce input groups=59225
73 * Reduce input records=192045
74 * Reduce output records=59225
75 * </pre>
76 *
77 * <p>
78 * Obviously, this isn't a particularly meaningful program, but does illustrate
79 * the use of the {@link Tuple} class.
80 * </p>
81 */
82public class DemoWordCountTuple {
83
84  // create the schema for the tuple that will serve as the key
85  private static final Schema KEY_SCHEMA = new Schema();
86
87  // define the schema statically
88  static {
89    KEY_SCHEMA.addField("Token", String.class, "");
90    KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
91  }
92
93  // mapper that emits tuple as the key, and value '1' for each occurrence
94  private static class MapClass extends MapReduceBase implements
95      Mapper<LongWritable, Tuple, Tuple, IntWritable> {
96
97    // define value '1' statically so we can reuse the object, i.e., avoid
98    // unnecessary object creation
99    private final static IntWritable one = new IntWritable(1);
100
101    // once again, reuse tuples if possible
102    private Tuple tupleOut = KEY_SCHEMA.instantiate();
103
104    public void map(LongWritable key, Tuple tupleIn,
105        OutputCollector<Tuple, IntWritable> output, Reporter reporter)
106        throws IOException {
107
108      // the input value is a tuple; get field 0
109      // see DemoPackRecords of how input SequenceFile is generated
110      String line = (String) tupleIn.get(0);
111      StringTokenizer itr = new StringTokenizer(line);
112      while (itr.hasMoreTokens()) {
113        String token = itr.nextToken();
114
115        // put new values into the tuple
116        tupleOut.set("Token", token);
117        tupleOut.set("EvenOrOdd", line.length() % 2);
118
119        // emit key-value pair
120        output.collect(tupleOut, one);
121      }
122    }
123  }
124
125  // reducer counts up tuple occurrences
126  private static class ReduceClass extends MapReduceBase implements
127      Reducer<Tuple, IntWritable, Tuple, IntWritable> {
128    private final static IntWritable SumValue = new IntWritable();
129
130    public synchronized void reduce(Tuple tupleKey,
131        Iterator<IntWritable> values,
132        OutputCollector<Tuple, IntWritable> output, Reporter reporter)
133        throws IOException {
134      // sum values
135      int sum = 0;
136      while (values.hasNext()) {
137        sum += values.next().get();
138      }
139
140      // keep original tuple key, emit sum of counts as value
141      SumValue.set(sum);
142      output.collect(tupleKey, SumValue);
143    }
144  }
145
146  // dummy constructor
147  private DemoWordCountTuple() {
148  }
149
150  /**
151   * Runs the demo.
152   */
153  public static void main(String[] args) throws IOException {
154    String inPath = "/shared/sample-input/bible+shakes.nopunc.packed";
155    String outputPath = "word-counts-tuple";
156    int numMapTasks = 20;
157    int numReduceTasks = 20;
158
159    JobConf conf = new JobConf(DemoWordCountTuple.class);
160    conf.setJobName("wordcount");
161
162    conf.setNumMapTasks(numMapTasks);
163    conf.setNumReduceTasks(numReduceTasks);
164    // 0.16
165    // conf.setInputPath(new Path(inPath));
166    Convert.setInputPath(conf, new Path(inPath));
167    conf.setInputFormat(SequenceFileInputFormat.class);
168
169    // conf.setOutputPath(new Path(outputPath));
170    Convert.setInputPath(conf, new Path(outputPath));
171    conf.setOutputKeyClass(Tuple.class);
172    conf.setOutputValueClass(IntWritable.class);
173    conf.setOutputFormat(SequenceFileOutputFormat.class);
174
175    conf.setMapperClass(MapClass.class);
176    conf.setCombinerClass(ReduceClass.class);
177    conf.setReducerClass(ReduceClass.class);
178
179    JobClient.runJob(conf);
180  }
181}
Note: See TracBrowser for help on using the repository browser.