[[PageOutline]] = Hadoop 程式 Debug = == 原始程式 == {{{ #!java import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordIndex { public static class wordindexM extends Mapper { public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { FileSplit fileSplit = (FileSplit) reporter.getInputSplit(); Text map_key = new Text(); Text map_value = new Text(); String line = value.toString(); StringTokenizer st = new StringTokenizer(line.toLowerCase()); while (st.hasMoreTokens()) { String word = st.nextToken(); map_key.set(word); map_value.set(fileSplit.getPath().getName() + ":" + line); output.collect(map_key, map_value); } } } static public class wordindexR extends Reducer { public void reduce(Text key, Iterable values, OutputCollector output, Reporter reporter) throws IOException { String v = ""; StringBuilder ret = new StringBuilder("\n"); for (Text val : values) { v += val.toString().trim(); if (v.length() > 0) ret.append(v + "\n"); } output.collect((Text) key, new Text(ret.toString())); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // debug using String[] argv = { "/user/waue/input", "/user/waue/output-wi" }; args = argv; Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.out.println("hadoop jar WordIndex.jar "); return; } Job job = new Job(conf, "word index"); job.setJobName("word inverted index"); job.setJarByClass(WordIndex.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(wordindexM.class); job.setReducerClass(wordindexR.class); job.setCombinerClass(wordindexR.class); FileInputFormat.setInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); long start = System.nanoTime(); job.waitForCompletion(true); long time = System.nanoTime() - start; System.err.println(time * (1E-9) + " secs."); } } }}} == 遇到問題 == {{{ #!text 10/01/18 20:52:39 INFO input.FileInputFormat: Total input paths to process : 2 10/01/18 20:52:39 INFO mapred.JobClient: Running job: job_201001181452_0038 10/01/18 20:52:40 INFO mapred.JobClient: map 0% reduce 0% 10/01/18 20:52:50 INFO mapred.JobClient: Task Id : attempt_201001181452_0038_m_000000_0, Status : FAILED java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable }}} == 研判 == {{{ #!java public static class wordindexM extends Mapper { public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { }}} 用了新的 org.apache.hadoop.mapreduce.Mapper 可以省略掉不用 extends .. implements ... 的宣告, 不過它預設搭配的map 實做方法是 ''' "public void map(LongWritable key, Text value, Context context) " ''' 而非 ''' "public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) " ''' 原因是因為 使用 org.apache.hadoop.mapreduce.Mapper 卻使用 OutputCollector output, Reporter reporter 會造成''' OutputCollector''' 的無法作用,因此 output type 會以為是要跟 input type 一致 所以程式認定 output.collect( longWritable, Text ); 但我們覺得輸出應該是 output.collect( Text , Text ) 因此不斷告訴我們 " Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable " == 解決辦法 == Mapper.Context 就有宣告了getInputSplit();方法,因此使用 context 幾乎可以滿足之前的 ''' OutputCollector 與 Reporter ''' === 完整可以work 的程式碼 === {{{ #!java public class WordIndex { public static class wordindexM extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) context.getInputSplit(); Text map_key = new Text(); Text map_value = new Text(); String line = value.toString(); StringTokenizer st = new StringTokenizer(line.toLowerCase()); while (st.hasMoreTokens()) { String word = st.nextToken(); map_key.set(word); map_value.set(fileSplit.getPath().getName() + ":" + line); context.write(map_key, map_value); } } } static public class wordindexR extends Reducer { public void reduce(Text key, Iterable values, OutputCollector output, Reporter reporter) throws IOException { String v = ""; StringBuilder ret = new StringBuilder("\n"); for (Text val : values) { v += val.toString().trim(); if (v.length() > 0) ret.append(v + "\n"); } output.collect((Text) key, new Text(ret.toString())); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // debug using String[] argv = { "/user/waue/input", "/user/waue/output-wi" }; args = argv; Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.out.println("hadoop jar WordIndex.jar "); return; } Job job = new Job(conf, "word index"); job.setJobName("word inverted index"); job.setJarByClass(WordIndex.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(wordindexM.class); job.setReducerClass(wordindexR.class); job.setCombinerClass(wordindexR.class); FileInputFormat.setInputPaths(job, args[0]); HelloHadoopV2.checkAndDelete(args[1], conf); FileOutputFormat.setOutputPath(job, new Path(args[1])); long start = System.nanoTime(); job.waitForCompletion(true); long time = System.nanoTime() - start; System.err.println(time * (1E-9) + " secs."); } } }}}