Hadoop 進階課程
範例練習
說明
WordIndex 說明: 將每個字出於哪個檔案,那一行印出來 測試方法: 將此程式運作在hadoop 0.20 平台上,執行: --------------------------- hadoop jar WordIndex.jar <input> <output> --------------------------- 注意: 1. 在hdfs 上來源檔案的路徑為 你所指定的 <input> 請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾 2. 運算完後,程式將執行結果放在hdfs 的輸出路徑為 你所指定的 <output>
WordIndex.java
package org.nchc.hadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordIndex { public static class wordindexM extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) context.getInputSplit(); Text map_key = new Text(); Text map_value = new Text(); String line = value.toString(); StringTokenizer st = new StringTokenizer(line.toLowerCase()); while (st.hasMoreTokens()) { String word = st.nextToken(); map_key.set(word); map_value.set(fileSplit.getPath().getName() + ":" + line); context.write(map_key, map_value); } } } static public class wordindexR extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String v = ""; StringBuilder ret = new StringBuilder("\n"); for (Text val : values) { v += val.toString().trim(); if (v.length() > 0) ret.append(v + "\n"); } output.collect((Text) key, new Text(ret.toString())); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // debug using // String[] argv = { "/user/hadoop/input", "/user/hadoop/output-wi" }; // args = argv; Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>"); return; } Job job = new Job(conf, "word index"); job.setJobName("word inverted index"); job.setJarByClass(WordIndex.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(wordindexM.class); job.setReducerClass(wordindexR.class); job.setCombinerClass(wordindexR.class); FileInputFormat.setInputPaths(job, args[0]); CheckAndDelete.checkAndDelete(args[1], conf); FileOutputFormat.setOutputPath(job, new Path(args[1])); long start = System.nanoTime(); job.waitForCompletion(true); long time = System.nanoTime() - start; System.err.println(time * (1E-9) + " secs."); } }
Last modified 14 years ago
Last modified on Sep 29, 2010, 10:09:27 AM