source: sample/WordCount.java @ 11

Last change on this file since 11 was 9, checked in by waue, 16 years ago

comment

File size: 3.6 KB
RevLine 
[9]1/**
2 * Program: WordCount.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 06/13/2008
[7]6 */
[9]7
8/**
9 * Purpose :
10 *  Store the result of WordCount.java from Hbase to Hadoop file system
11 *
12 * HowToUse :
13 *  Make sure Hadoop file system is running correctly.
14 *  Put text file on the directory "/local_src/input"
15 *  You can use the instruction to upload "/local_src/input" to HDFS input dir
16 *    $ bin/hadoop dfs -put /local_src/input input
17 *  Then modify the $filepath parameter in construtor to be correct and run this code.
18 * 
19 *
20 * Check Result:
21 *  inspect http://localhost:50070 by web explorer
22 */
[8]23package tw.org.nchc.code;
[7]24
25import java.io.IOException;
26import java.util.Iterator;
27import java.util.StringTokenizer;
28
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.io.IntWritable;
32import org.apache.hadoop.io.LongWritable;
33import org.apache.hadoop.io.Text;
34import org.apache.hadoop.mapred.JobClient;
35import org.apache.hadoop.mapred.JobConf;
36import org.apache.hadoop.mapred.MapReduceBase;
37import org.apache.hadoop.mapred.Mapper;
38import org.apache.hadoop.mapred.OutputCollector;
39import org.apache.hadoop.mapred.Reducer;
40import org.apache.hadoop.mapred.Reporter;
41
42
43public class WordCount {
[9]44  private String filepath;
45  private String outputPath;
46 
47  public WordCount(){
48    filepath = "/user/waue/input/";
49    outputPath = "counts1";
50  }
51  public WordCount(String path,String output){
52    filepath = path;
53    outputPath = output;
54  }
[7]55  // mapper: emits (token, 1) for every word occurrence
56  private static class MapClass extends MapReduceBase implements
57      Mapper<LongWritable, Text, Text, IntWritable> {
58
59    // reuse objects to save overhead of object creation
60    private final static IntWritable one = new IntWritable(1);
61    private Text word = new Text();
62
63    public void map(LongWritable key, Text value,
64        OutputCollector<Text, IntWritable> output, Reporter reporter)
65        throws IOException {
66      String line = ((Text) value).toString();
67      StringTokenizer itr = new StringTokenizer(line);
68      while (itr.hasMoreTokens()) {
69        word.set(itr.nextToken());
70        output.collect(word, one);
71      }
72    }
73  }
74
75  // reducer: sums up all the counts
76  private static class ReduceClass extends MapReduceBase implements
77      Reducer<Text, IntWritable, Text, IntWritable> {
78
79    // reuse objects
80    private final static IntWritable SumValue = new IntWritable();
81
82    public void reduce(Text key, Iterator<IntWritable> values,
83        OutputCollector<Text, IntWritable> output, Reporter reporter)
84        throws IOException {
85      // sum up values
86      int sum = 0;
87      while (values.hasNext()) {
88        sum += values.next().get();
89      }
90      SumValue.set(sum);
91      output.collect(key, SumValue);
92    }
93  }
94
95 
96  /**
97   * Runs the demo.
98   */
99  public static void main(String[] args) throws IOException {
[9]100    WordCount wc = new WordCount();
101   
102    int mapTasks = 1;
[7]103    int reduceTasks = 1;
104    JobConf conf = new JobConf(WordCount.class);
105    conf.setJobName("wordcount");
106
107    conf.setNumMapTasks(mapTasks);
108    conf.setNumReduceTasks(reduceTasks);
109
[9]110    conf.setInputPath(new Path(wc.filepath));
[7]111    conf.setOutputKeyClass(Text.class);
112    conf.setOutputValueClass(IntWritable.class);
[9]113    conf.setOutputPath(new Path(wc.outputPath));
[7]114
115    conf.setMapperClass(MapClass.class);
116    conf.setCombinerClass(ReduceClass.class);
117    conf.setReducerClass(ReduceClass.class);
118   
119    // Delete the output directory if it exists already
[9]120    Path outputDir = new Path(wc.outputPath);
[7]121    FileSystem.get(conf).delete(outputDir);
122
123    JobClient.runJob(conf);
124  }
125}
Note: See TracBrowser for help on using the repository browser.