/** * Program: WordCount.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 07/02/2008 * Upgrade to 0.17 */ /** * Purpose : * Store the result of WordCount.java from Hbase to Hadoop file system * * HowToUse : * Make sure Hadoop file system is running correctly. * Put text file on the directory "/local_src/input" * You can use the instruction to upload "/local_src/input" to HDFS input dir * $ bin/hadoop dfs -put /local_src/input input * Then modify the $filepath parameter in construtor to be correct and run this code. * * * Check Result: * inspect http://localhost:50070 by web explorer */ package tw.org.nchc.code; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class WordCount { private String filepath; private String outputPath; public WordCount() { filepath = "/user/waue/input/"; outputPath = "counts1"; } public WordCount(String path, String output) { filepath = path; outputPath = output; } // mapper: emits (token, 1) for every word occurrence private static class MapClass extends MapReduceBase implements Mapper { // reuse objects to save overhead of object creation private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = ((Text) value).toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } // reducer: sums up all the counts private static class ReduceClass extends MapReduceBase implements Reducer { // reuse objects private final static IntWritable SumValue = new IntWritable(); public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { // sum up values int sum = 0; while (values.hasNext()) { sum += values.next().get(); } SumValue.set(sum); output.collect(key, SumValue); } } /** * Runs the demo. */ public static void main(String[] args) throws IOException { WordCount wc = new WordCount(); int mapTasks = 1; int reduceTasks = 1; JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setNumMapTasks(mapTasks); conf.setNumReduceTasks(reduceTasks); // 0.16 // conf.setInputPath(new Path(wc.filepath)); Convert.setInputPath(conf, new Path(wc.filepath)); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); // 0.16 // conf.setOutputPath(new Path(wc.outputPath)); Convert.setOutputPath(conf, new Path(wc.outputPath)); conf.setMapperClass(MapClass.class); conf.setCombinerClass(ReduceClass.class); conf.setReducerClass(ReduceClass.class); // Delete the output directory if it exists already Path outputDir = new Path(wc.outputPath); // 0.16 FileSystem.get(conf).delete(outputDir,true); JobClient.runJob(conf); } }