Word Index
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; // WordIndex // 說明: // 將每個字出於哪個檔案,那一行印出來 // // 測試方法: // 將此程式運作在hadoop 0.20 平台上,執行: // --------------------------- // hadoop jar WordIndex.jar <input> <output> // --------------------------- // // 注意: //1. 在hdfs 上來源檔案的路徑為 你所指定的 <input> //請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾 //2. 運算完後,程式將執行結果放在hdfs 的輸出路徑為 你所指定的 <output> public class WordIndex { public static class wordindexM extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) context.getInputSplit(); Text map_key = new Text(); Text map_value = new Text(); String line = value.toString(); StringTokenizer st = new StringTokenizer(line.toLowerCase()); while (st.hasMoreTokens()) { String word = st.nextToken(); map_key.set(word); map_value.set(fileSplit.getPath().getName() + ":" + line); context.write(map_key, map_value); } } } static public class wordindexR extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String v = ""; StringBuilder ret = new StringBuilder("\n"); for (Text val : values) { v += val.toString().trim(); if (v.length() > 0) ret.append(v + "\n"); } output.collect((Text) key, new Text(ret.toString())); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // debug using // String[] argv = { "input", "output-wi" }; // args = argv; Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>"); return; } Job job = new Job(conf, "word index"); job.setJobName("word inverted index"); job.setJarByClass(WordIndex.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(wordindexM.class); job.setReducerClass(wordindexR.class); job.setCombinerClass(wordindexR.class); FileInputFormat.setInputPaths(job, args[0]); CheckAndDelete.checkAndDelete(args[1], conf); FileOutputFormat.setOutputPath(job, new Path(args[1])); long start = System.nanoTime(); job.waitForCompletion(true); long time = System.nanoTime() - start; System.err.println(time * (1E-9) + " secs."); } }
Last modified 15 years ago
Last modified on Jan 19, 2010, 7:51:19 PM