jazz/Hadoop_Lab6: WordCount2.java

File WordCount2.java, 4.2 KB (added by waue, 15 years ago)
Line 
1import java.io.*;
2import java.util.*;
3
4import org.apache.hadoop.fs.Path;
5import org.apache.hadoop.filecache.DistributedCache;
6import org.apache.hadoop.conf.*;
7import org.apache.hadoop.io.*;
8import org.apache.hadoop.mapred.*;
9import org.apache.hadoop.util.*;
10
11public class WordCount2 extends Configured implements Tool {
12
13   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
14
15     static enum Counters { INPUT_WORDS }
16
17     private final static IntWritable one = new IntWritable(1);
18     private Text word = new Text();
19
20     private boolean caseSensitive = true;
21     private Set<String> patternsToSkip = new HashSet<String>();
22
23     private long numRecords = 0;
24     private String inputFile;
25
26     public void configure(JobConf job) {
27       caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
28       inputFile = job.get("map.input.file");
29
30       if (job.getBoolean("wordcount.skip.patterns", false)) {
31         Path[] patternsFiles = new Path[0];
32         try {
33           patternsFiles = DistributedCache.getLocalCacheFiles(job);
34         } catch (IOException ioe) {
35           System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
36         }
37         for (Path patternsFile : patternsFiles) {
38           parseSkipFile(patternsFile);
39         }
40       }
41     }
42
43     private void parseSkipFile(Path patternsFile) {
44       try {
45         BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));
46         String pattern = null;
47         while ((pattern = fis.readLine()) != null) {
48           patternsToSkip.add(pattern);
49         }
50       } catch (IOException ioe) {
51         System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe));
52       }
53     }
54
55     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
56       String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
57
58       for (String pattern : patternsToSkip) {
59         line = line.replaceAll(pattern, "");
60       }
61
62       StringTokenizer tokenizer = new StringTokenizer(line);
63       while (tokenizer.hasMoreTokens()) {
64         word.set(tokenizer.nextToken());
65         output.collect(word, one);
66         reporter.incrCounter(Counters.INPUT_WORDS, 1);
67       }
68
69       if ((++numRecords % 100) == 0) {
70         reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);
71       }
72     }
73   }
74
75   public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
76     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
77       int sum = 0;
78       while (values.hasNext()) {
79         sum += values.next().get();
80       }
81       output.collect(key, new IntWritable(sum));
82     }
83   }
84
85   public int run(String[] args) throws Exception {
86     JobConf conf = new JobConf(getConf(), WordCount2.class);
87     conf.setJobName("wordcount");
88
89     conf.setOutputKeyClass(Text.class);
90     conf.setOutputValueClass(IntWritable.class);
91
92     conf.setMapperClass(Map.class);
93     conf.setCombinerClass(Reduce.class);
94     conf.setReducerClass(Reduce.class);
95
96     conf.setInputFormat(TextInputFormat.class);
97     conf.setOutputFormat(TextOutputFormat.class);
98
99     List<String> other_args = new ArrayList<String>();
100     for (int i=0; i < args.length; ++i) {
101       if ("-skip".equals(args[i])) {
102         DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
103         conf.setBoolean("wordcount.skip.patterns", true);
104       } else {
105         other_args.add(args[i]);
106       }
107     }
108
109     FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
110     FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
111
112     JobClient.runJob(conf);
113     return 0;
114   }
115
116   public static void main(String[] args) throws Exception {
117     int res = ToolRunner.run(new Configuration(), new WordCount2(), args);
118     System.exit(res);
119   }
120}