source: sample/hadoop-0.16/WordCount2.java @ 24

Last change on this file since 24 was 24, checked in by waue, 16 years ago

upload test

File size: 4.4 KB
Line 
1import java.io.BufferedReader;
2import java.io.FileReader;
3import java.io.IOException;
4import java.util.HashSet;
5import java.util.Iterator;
6import java.util.Set;
7import java.util.StringTokenizer;
8
9import org.apache.hadoop.conf.Configured;
10import org.apache.hadoop.filecache.DistributedCache;
11import org.apache.hadoop.fs.FileSystem;
12import org.apache.hadoop.fs.Path;
13import org.apache.hadoop.io.IntWritable;
14import org.apache.hadoop.io.LongWritable;
15import org.apache.hadoop.io.Text;
16import org.apache.hadoop.mapred.FileInputFormat;
17import org.apache.hadoop.mapred.FileOutputFormat;
18import org.apache.hadoop.mapred.JobClient;
19import org.apache.hadoop.mapred.JobConf;
20import org.apache.hadoop.mapred.MapReduceBase;
21import org.apache.hadoop.mapred.Mapper;
22import org.apache.hadoop.mapred.OutputCollector;
23import org.apache.hadoop.mapred.Reducer;
24import org.apache.hadoop.mapred.Reporter;
25import org.apache.hadoop.util.StringUtils;
26
27public class WordCount2 extends Configured {
28
29  public static class Map extends MapReduceBase implements
30      Mapper<LongWritable, Text, Text, IntWritable> {
31
32    static enum Counters {
33      INPUT_WORDS
34    }
35
36    private final static IntWritable one = new IntWritable(1);
37
38    private Text word = new Text();
39
40    private boolean caseSensitive = true;
41
42    private Set<String> patternsToSkip = new HashSet<String>();
43
44    private long numRecords = 0;
45
46    private String inputFile;
47
48    public void configure(JobConf job) {
49      caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
50      inputFile = job.get("map.input.file");
51
52      if (job.getBoolean("wordcount.skip.patterns", false)) {
53        Path[] patternsFiles = new Path[0];
54        try {
55          patternsFiles = DistributedCache.getLocalCacheFiles(job);
56        } catch (IOException ioe) {
57          System.err
58              .println("Caught exception while getting cached files: "
59                  + StringUtils.stringifyException(ioe));
60        }
61        for (Path patternsFile : patternsFiles) {
62          parseSkipFile(patternsFile);
63        }
64      }
65    }
66
67    private void parseSkipFile(Path patternsFile) {
68      try {
69        BufferedReader fis = new BufferedReader(new FileReader(
70            patternsFile.toString()));
71        String pattern = null;
72        while ((pattern = fis.readLine()) != null) {
73          patternsToSkip.add(pattern);
74        }
75      } catch (IOException ioe) {
76        System.err
77            .println("Caught exception while parsing the cached file '"
78                + patternsFile
79                + "' : "
80                + StringUtils.stringifyException(ioe));
81      }
82    }
83
84    public void map(LongWritable key, Text value,
85        OutputCollector<Text, IntWritable> output, Reporter reporter)
86        throws IOException {
87      String line = (caseSensitive) ? value.toString() : value.toString()
88          .toLowerCase();
89
90      for (String pattern : patternsToSkip) {
91        line = line.replaceAll(pattern, "");
92      }
93
94      StringTokenizer tokenizer = new StringTokenizer(line);
95      while (tokenizer.hasMoreTokens()) {
96        word.set(tokenizer.nextToken());
97        output.collect(word, one);
98        reporter.incrCounter(Counters.INPUT_WORDS, 1);
99      }
100
101      if ((++numRecords % 100) == 0) {
102        reporter.setStatus("Finished processing " + numRecords
103            + " records " + "from the input file: " + inputFile);
104      }
105    }
106  }
107
108  public static class Reduce extends MapReduceBase implements
109      Reducer<Text, IntWritable, Text, IntWritable> {
110    public void reduce(Text key, Iterator<IntWritable> values,
111        OutputCollector<Text, IntWritable> output, Reporter reporter)
112        throws IOException {
113      int sum = 0;
114      while (values.hasNext()) {
115        sum += values.next().get();
116      }
117      output.collect(key, new IntWritable(sum));
118    }
119  }
120
121  public static void main(String[] args) throws IOException {
122    String filename = "/user/waue/input/";
123    String outputPath = "sample-counts";
124    int mapTasks = 20;
125    int reduceTasks = 1;
126
127    JobConf conf = new JobConf(WordCount2.class);
128    conf.setJobName("wordcount");
129
130    conf.setNumMapTasks(mapTasks);
131    conf.setNumReduceTasks(reduceTasks);
132
133//    conf.setInputPath(new Path(filename));
134    FileInputFormat.setInputPaths(conf,new Path(filename));
135    conf.setOutputKeyClass(Text.class);
136    conf.setOutputValueClass(IntWritable.class);
137   
138//    conf.setOutputPath(new Path(outputPath));
139    FileOutputFormat.setOutputPath( conf, new Path(filename));
140
141
142    conf.setMapperClass(Map.class);
143    conf.setCombinerClass(Reduce.class);
144    conf.setReducerClass(Reduce.class);
145
146    // Delete the output directory if it exists already
147    Path outputDir = new Path(outputPath);
148    FileSystem.get(conf).delete(outputDir,true);
149    JobClient.runJob(conf);
150  }
151}
Note: See TracBrowser for help on using the repository browser.