hadoop-sample-code: WordCount.java

File WordCount.java, 2.7 KB (added by waue, 16 years ago)
Line 
1/*
2 * map reduce sample code
3 */
4
5import java.io.IOException;
6import java.util.Iterator;
7import java.util.StringTokenizer;
8
9import org.apache.hadoop.fs.FileSystem;
10import org.apache.hadoop.fs.Path;
11import org.apache.hadoop.io.IntWritable;
12import org.apache.hadoop.io.LongWritable;
13import org.apache.hadoop.io.Text;
14import org.apache.hadoop.mapred.JobClient;
15import org.apache.hadoop.mapred.JobConf;
16import org.apache.hadoop.mapred.MapReduceBase;
17import org.apache.hadoop.mapred.Mapper;
18import org.apache.hadoop.mapred.OutputCollector;
19import org.apache.hadoop.mapred.Reducer;
20import org.apache.hadoop.mapred.Reporter;
21
22
23public class WordCount {
24
25  // mapper: emits (token, 1) for every word occurrence
26  private static class MapClass extends MapReduceBase implements
27      Mapper<LongWritable, Text, Text, IntWritable> {
28
29    // reuse objects to save overhead of object creation
30    private final static IntWritable one = new IntWritable(1);
31    private Text word = new Text();
32
33    public void map(LongWritable key, Text value,
34        OutputCollector<Text, IntWritable> output, Reporter reporter)
35        throws IOException {
36      String line = ((Text) value).toString();
37      StringTokenizer itr = new StringTokenizer(line);
38      while (itr.hasMoreTokens()) {
39        word.set(itr.nextToken());
40        output.collect(word, one);
41      }
42    }
43  }
44
45  // reducer: sums up all the counts
46  private static class ReduceClass extends MapReduceBase implements
47      Reducer<Text, IntWritable, Text, IntWritable> {
48
49    // reuse objects
50    private final static IntWritable SumValue = new IntWritable();
51
52    public void reduce(Text key, Iterator<IntWritable> values,
53        OutputCollector<Text, IntWritable> output, Reporter reporter)
54        throws IOException {
55      // sum up values
56      int sum = 0;
57      while (values.hasNext()) {
58        sum += values.next().get();
59      }
60      SumValue.set(sum);
61      output.collect(key, SumValue);
62    }
63  }
64
65 
66  /**
67   * Runs the demo.
68   */
69  public static void main(String[] args) throws IOException {
70    String filename = "/user/waue/test/132.txt";
71    String outputPath = "sample-counts";
72    int mapTasks = 20;
73    int reduceTasks = 1;
74
75    JobConf conf = new JobConf(WordCount.class);
76    conf.setJobName("wordcount");
77
78    conf.setNumMapTasks(mapTasks);
79    conf.setNumReduceTasks(reduceTasks);
80
81    conf.setInputPath(new Path(filename));
82    conf.setOutputKeyClass(Text.class);
83    conf.setOutputValueClass(IntWritable.class);
84    conf.setOutputPath(new Path(outputPath));
85
86    conf.setMapperClass(MapClass.class);
87    conf.setCombinerClass(ReduceClass.class);
88    conf.setReducerClass(ReduceClass.class);
89   
90    // Delete the output directory if it exists already
91    Path outputDir = new Path(outputPath);
92    FileSystem.get(conf).delete(outputDir);
93
94    JobClient.runJob(conf);
95  }
96}