source: sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCountTuple.java

Last change on this file was 20, checked in by waue, 16 years ago

將改完的 hadoop 0.17版package 放來備份
目前繼續開發 hadoop 0.16 + hbase 1.3

File size: 5.5 KB
Line 
1/**
2 * Program: HBaseRecordPro.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 * Upgrade to 0.17
7 */
8/*
9 * Cloud9: A MapReduce Library for Hadoop
10 */
11
12package tw.org.nchc.demo;
13
14import java.io.IOException;
15import java.util.Iterator;
16import java.util.StringTokenizer;
17
18import org.apache.hadoop.fs.Path;
19import org.apache.hadoop.io.IntWritable;
20import org.apache.hadoop.io.LongWritable;
21import org.apache.hadoop.mapred.JobClient;
22import org.apache.hadoop.mapred.JobConf;
23import org.apache.hadoop.mapred.MapReduceBase;
24import org.apache.hadoop.mapred.Mapper;
25import org.apache.hadoop.mapred.OutputCollector;
26import org.apache.hadoop.mapred.Reducer;
27import org.apache.hadoop.mapred.Reporter;
28import org.apache.hadoop.mapred.SequenceFileInputFormat;
29import org.apache.hadoop.mapred.SequenceFileOutputFormat;
30
31import tw.org.nchc.code.Convert;
32import tw.org.nchc.tuple.Schema;
33import tw.org.nchc.tuple.Tuple;
34
35/**
36 * <p>
37 * Demo that illustrates the use of the tuple library ({@link Tuple} class).
38 * Input comes from Bible+Shakespeare sample collection, encoded as single-field
39 * tuples; see {@link DemoPackRecords}. Sample of final output:
40 * </p>
41 *
42 * <pre>
43 * ...
44 * (admirable, 0)    9
45 * (admirable, 1)    6
46 * (admiral, 0)      2
47 * (admiral, 1)      4
48 * (admiration, 0)  10
49 * (admiration, 1)   6
50 * (admire, 0)       5
51 * (admire, 1)       3
52 * (admired, 0)     12
53 * (admired, 1)      7
54 * ...
55 * </pre>
56 *
57 * <p>
58 * The first field of the key tuple contains a token, the second field indicates
59 * whether it was found on a even-length or odd-length line. The value is the
60 * count of the tuple occurrences in the collection. In the MapReduce cycle,
61 * output keys consist of tuples (Token, EvenOrOdd). The second field of the
62 * tuple indicates whether the token was found on a line with an even or an odd
63 * number of characters. Values consist of counts of tuple occurrences. Expected
64 * trace of the demo:
65 * </p>
66 *
67 * <pre>
68 * Map input records=156215
69 * Map output records=1734298
70 * Map input bytes=13118917
71 * Map output bytes=66214039
72 * Combine input records=1734298
73 * Combine output records=192045
74 * Reduce input groups=59225
75 * Reduce input records=192045
76 * Reduce output records=59225
77 * </pre>
78 *
79 * <p>
80 * Obviously, this isn't a particularly meaningful program, but does illustrate
81 * the use of the {@link Tuple} class.
82 * </p>
83 */
84public class DemoWordCountTuple {
85
86  // create the schema for the tuple that will serve as the key
87  private static final Schema KEY_SCHEMA = new Schema();
88
89  // define the schema statically
90  static {
91    KEY_SCHEMA.addField("Token", String.class, "");
92    KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
93  }
94
95  // mapper that emits tuple as the key, and value '1' for each occurrence
96  private static class MapClass extends MapReduceBase implements
97      Mapper<LongWritable, Tuple, Tuple, IntWritable> {
98
99    // define value '1' statically so we can reuse the object, i.e., avoid
100    // unnecessary object creation
101    private final static IntWritable one = new IntWritable(1);
102
103    // once again, reuse tuples if possible
104    private Tuple tupleOut = KEY_SCHEMA.instantiate();
105
106    public void map(LongWritable key, Tuple tupleIn,
107        OutputCollector<Tuple, IntWritable> output, Reporter reporter)
108        throws IOException {
109
110      // the input value is a tuple; get field 0
111      // see DemoPackRecords of how input SequenceFile is generated
112      String line = (String) tupleIn.get(0);
113      StringTokenizer itr = new StringTokenizer(line);
114      while (itr.hasMoreTokens()) {
115        String token = itr.nextToken();
116
117        // put new values into the tuple
118        tupleOut.set("Token", token);
119        tupleOut.set("EvenOrOdd", line.length() % 2);
120
121        // emit key-value pair
122        output.collect(tupleOut, one);
123      }
124    }
125  }
126
127  // reducer counts up tuple occurrences
128  private static class ReduceClass extends MapReduceBase implements
129      Reducer<Tuple, IntWritable, Tuple, IntWritable> {
130    private final static IntWritable SumValue = new IntWritable();
131
132    public synchronized void reduce(Tuple tupleKey,
133        Iterator<IntWritable> values,
134        OutputCollector<Tuple, IntWritable> output, Reporter reporter)
135        throws IOException {
136      // sum values
137      int sum = 0;
138      while (values.hasNext()) {
139        sum += values.next().get();
140      }
141
142      // keep original tuple key, emit sum of counts as value
143      SumValue.set(sum);
144      output.collect(tupleKey, SumValue);
145    }
146  }
147
148  // dummy constructor
149  private DemoWordCountTuple() {
150  }
151
152  /**
153   * Runs the demo.
154   */
155  public static void main(String[] args) throws IOException {
156    String inPath = "/shared/sample-input/bible+shakes.nopunc.packed";
157    String outputPath = "word-counts-tuple";
158    int numMapTasks = 20;
159    int numReduceTasks = 20;
160
161    JobConf conf = new JobConf(DemoWordCountTuple.class);
162    conf.setJobName("wordcount");
163
164    conf.setNumMapTasks(numMapTasks);
165    conf.setNumReduceTasks(numReduceTasks);
166    // 0.16
167    // conf.setInputPath(new Path(inPath));
168    Convert.setInputPath(conf, new Path(inPath));
169    conf.setInputFormat(SequenceFileInputFormat.class);
170
171    // conf.setOutputPath(new Path(outputPath));
172    Convert.setInputPath(conf, new Path(outputPath));
173    conf.setOutputKeyClass(Tuple.class);
174    conf.setOutputValueClass(IntWritable.class);
175    conf.setOutputFormat(SequenceFileOutputFormat.class);
176
177    conf.setMapperClass(MapClass.class);
178    conf.setCombinerClass(ReduceClass.class);
179    conf.setReducerClass(ReduceClass.class);
180
181    JobClient.runJob(conf);
182  }
183}
Note: See TracBrowser for help on using the repository browser.