import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class Hello { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> { public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { output.collect(key, value); } } public static class Reduce extends MapReduceBase implements Reducer<LongWritable, Text, LongWritable, Text> { Text ret = new Text(""); public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { while (values.hasNext()) { ret = values.next(); } output.collect(key, ret); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(Hello.class); conf.setJobName("Hello"); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setMapOutputKeyClass(LongWritable.class); conf.setMapOutputValueClass(Text.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
Last modified 15 years ago
Last modified on Mar 3, 2010, 12:14:04 PM