source: sample/WordCount.java @ 7

Last change on this file since 7 was 7, checked in by waue, 16 years ago

6/12 modify

File size: 2.8 KB
Line 
1/*
2 * map reduce sample code
3 */
4package tw.org.nchc.demo;
5
6import java.io.IOException;
7import java.util.Iterator;
8import java.util.StringTokenizer;
9
10import org.apache.hadoop.fs.FileSystem;
11import org.apache.hadoop.fs.Path;
12import org.apache.hadoop.io.IntWritable;
13import org.apache.hadoop.io.LongWritable;
14import org.apache.hadoop.io.Text;
15import org.apache.hadoop.mapred.JobClient;
16import org.apache.hadoop.mapred.JobConf;
17import org.apache.hadoop.mapred.MapReduceBase;
18import org.apache.hadoop.mapred.Mapper;
19import org.apache.hadoop.mapred.OutputCollector;
20import org.apache.hadoop.mapred.Reducer;
21import org.apache.hadoop.mapred.Reporter;
22
23
24public class WordCount {
25
26  // mapper: emits (token, 1) for every word occurrence
27  private static class MapClass extends MapReduceBase implements
28      Mapper<LongWritable, Text, Text, IntWritable> {
29
30    // reuse objects to save overhead of object creation
31    private final static IntWritable one = new IntWritable(1);
32    private Text word = new Text();
33
34    public void map(LongWritable key, Text value,
35        OutputCollector<Text, IntWritable> output, Reporter reporter)
36        throws IOException {
37      String line = ((Text) value).toString();
38      StringTokenizer itr = new StringTokenizer(line);
39      while (itr.hasMoreTokens()) {
40        word.set(itr.nextToken());
41        output.collect(word, one);
42      }
43    }
44  }
45
46  // reducer: sums up all the counts
47  private static class ReduceClass extends MapReduceBase implements
48      Reducer<Text, IntWritable, Text, IntWritable> {
49
50    // reuse objects
51    private final static IntWritable SumValue = new IntWritable();
52
53    public void reduce(Text key, Iterator<IntWritable> values,
54        OutputCollector<Text, IntWritable> output, Reporter reporter)
55        throws IOException {
56      // sum up values
57      int sum = 0;
58      while (values.hasNext()) {
59        sum += values.next().get();
60      }
61      SumValue.set(sum);
62      output.collect(key, SumValue);
63    }
64  }
65
66 
67  /**
68   * Runs the demo.
69   */
70  public static void main(String[] args) throws IOException {
71    String filename = "/user/waue/input/";
72    String outputPath = "sample-counts";
73    int mapTasks = 20;
74    int reduceTasks = 1;
75
76    JobConf conf = new JobConf(WordCount.class);
77    conf.setJobName("wordcount");
78
79    conf.setNumMapTasks(mapTasks);
80    conf.setNumReduceTasks(reduceTasks);
81
82    conf.setInputPath(new Path(filename));
83    conf.setOutputKeyClass(Text.class);
84    conf.setOutputValueClass(IntWritable.class);
85    conf.setOutputPath(new Path(outputPath));
86
87    conf.setMapperClass(MapClass.class);
88    conf.setCombinerClass(ReduceClass.class);
89    conf.setReducerClass(ReduceClass.class);
90   
91    // Delete the output directory if it exists already
92    Path outputDir = new Path(outputPath);
93    FileSystem.get(conf).delete(outputDir);
94
95    JobClient.runJob(conf);
96  }
97}
Note: See TracBrowser for help on using the repository browser.