273 | | import org.apache.hadoop.mapreduce.Mapper; |
274 | | |
275 | | public class mapper extends Mapper<Object, Text, Text, IntWritable> { |
276 | | |
277 | | private final static IntWritable one = new IntWritable(1); |
278 | | private Text word = new Text(); |
279 | | |
280 | | public void map(Object key, Text value, Context context) |
281 | | throws IOException, InterruptedException { |
282 | | StringTokenizer itr = new StringTokenizer(value.toString()); |
283 | | while (itr.hasMoreTokens()) { |
284 | | word.set(itr.nextToken()); |
285 | | context.write(word, one); |
286 | | } |
287 | | } |
288 | | } |
| 274 | import org.apache.hadoop.mapred.MapReduceBase; |
| 275 | import org.apache.hadoop.mapred.Mapper; |
| 276 | import org.apache.hadoop.mapred.OutputCollector; |
| 277 | import org.apache.hadoop.mapred.Reporter; |
| 278 | |
| 279 | public class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { |
| 280 | private final static IntWritable one = new IntWritable(1); |
| 281 | private Text word = new Text(); |
| 282 | |
| 283 | public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { |
| 284 | String line = value.toString(); |
| 285 | StringTokenizer tokenizer = new StringTokenizer(line); |
| 286 | while (tokenizer.hasMoreTokens()) { |
| 287 | word.set(tokenizer.nextToken()); |
| 288 | output.collect(word, one); |
| 289 | } |
| 290 | } |
| 291 | } |
| 292 | |
326 | | import org.apache.hadoop.mapreduce.Reducer; |
327 | | |
328 | | public class reducer extends Reducer<Text, IntWritable, Text, IntWritable> { |
329 | | private IntWritable result = new IntWritable(); |
330 | | |
331 | | public void reduce(Text key, Iterable<IntWritable> values, Context context) |
332 | | throws IOException, InterruptedException { |
333 | | int sum = 0; |
334 | | for (IntWritable val : values) { |
335 | | sum += val.get(); |
336 | | } |
337 | | result.set(sum); |
338 | | context.write(key, result); |
339 | | } |
340 | | } |
| 331 | import org.apache.hadoop.mapred.MapReduceBase; |
| 332 | import org.apache.hadoop.mapred.OutputCollector; |
| 333 | import org.apache.hadoop.mapred.Reducer; |
| 334 | import org.apache.hadoop.mapred.Reporter; |
| 335 | |
| 336 | public class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { |
| 337 | public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { |
| 338 | int sum = 0; |
| 339 | while (values.hasNext()) { |
| 340 | sum += values.next().get(); |
| 341 | } |
| 342 | output.collect(key, new IntWritable(sum)); |
| 343 | } |
| 344 | } |
375 | | import org.apache.hadoop.mapreduce.Job; |
376 | | import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
377 | | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
378 | | import org.apache.hadoop.util.GenericOptionsParser; |
| 377 | import org.apache.hadoop.mapred.FileInputFormat; |
| 378 | import org.apache.hadoop.mapred.FileOutputFormat; |
| 379 | import org.apache.hadoop.mapred.JobClient; |
| 380 | import org.apache.hadoop.mapred.JobConf; |
| 381 | import org.apache.hadoop.mapred.TextInputFormat; |
| 382 | import org.apache.hadoop.mapred.TextOutputFormat; |
382 | | public static void main(String[] args) throws Exception { |
383 | | Configuration conf = new Configuration(); |
384 | | String[] otherArgs = new GenericOptionsParser(conf, args) |
385 | | .getRemainingArgs(); |
386 | | if (otherArgs.length != 2) { |
387 | | System.err.println("Usage: wordcount <in> <out>"); |
388 | | System.exit(2); |
389 | | } |
390 | | Job job = new Job(conf, "word count"); |
391 | | job.setJarByClass(WordCount.class); |
392 | | job.setMapperClass(mapper.class); |
393 | | |
394 | | job.setCombinerClass(reducer.class); |
395 | | job.setReducerClass(reducer.class); |
396 | | job.setOutputKeyClass(Text.class); |
397 | | job.setOutputValueClass(IntWritable.class); |
398 | | FileInputFormat.addInputPath(job, new Path(otherArgs[0])); |
399 | | FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); |
400 | | System.exit(job.waitForCompletion(true) ? 0 : 1); |
401 | | } |
| 386 | public static void main(String[] args) throws Exception { |
| 387 | JobConf conf = new JobConf(WordCount.class); |
| 388 | conf.setJobName("wordcount"); |
| 389 | |
| 390 | conf.setOutputKeyClass(Text.class); |
| 391 | conf.setOutputValueClass(IntWritable.class); |
| 392 | |
| 393 | conf.setMapperClass(Map.class); |
| 394 | conf.setCombinerClass(Reduce.class); |
| 395 | conf.setReducerClass(Reduce.class); |
| 396 | |
| 397 | conf.setInputFormat(TextInputFormat.class); |
| 398 | conf.setOutputFormat(TextOutputFormat.class); |
| 399 | |
| 400 | FileInputFormat.setInputPaths(conf, new Path(args[0])); |
| 401 | FileOutputFormat.setOutputPath(conf, new Path(args[1])); |
| 402 | |
| 403 | JobClient.runJob(conf); |
| 404 | } |