Hadoop 程式 Debug
原始程式
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordIndex { public static class wordindexM extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { FileSplit fileSplit = (FileSplit) reporter.getInputSplit(); Text map_key = new Text(); Text map_value = new Text(); String line = value.toString(); StringTokenizer st = new StringTokenizer(line.toLowerCase()); while (st.hasMoreTokens()) { String word = st.nextToken(); map_key.set(word); map_value.set(fileSplit.getPath().getName() + ":" + line); output.collect(map_key, map_value); } } } static public class wordindexR extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String v = ""; StringBuilder ret = new StringBuilder("\n"); for (Text val : values) { v += val.toString().trim(); if (v.length() > 0) ret.append(v + "\n"); } output.collect((Text) key, new Text(ret.toString())); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // debug using String[] argv = { "/user/waue/input", "/user/waue/output-wi" }; args = argv; Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>"); return; } Job job = new Job(conf, "word index"); job.setJobName("word inverted index"); job.setJarByClass(WordIndex.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(wordindexM.class); job.setReducerClass(wordindexR.class); job.setCombinerClass(wordindexR.class); FileInputFormat.setInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); long start = System.nanoTime(); job.waitForCompletion(true); long time = System.nanoTime() - start; System.err.println(time * (1E-9) + " secs."); } }
遇到問題
10/01/18 20:52:39 INFO input.FileInputFormat: Total input paths to process : 2 10/01/18 20:52:39 INFO mapred.JobClient: Running job: job_201001181452_0038 10/01/18 20:52:40 INFO mapred.JobClient: map 0% reduce 0% 10/01/18 20:52:50 INFO mapred.JobClient: Task Id : attempt_201001181452_0038_m_000000_0, Status : FAILED java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
研判
public static class wordindexM extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
用了新的 org.apache.hadoop.mapreduce.Mapper 可以省略掉不用 extends .. implements ... 的宣告,
不過它預設搭配的map 實做方法是 "public void map(LongWritable? key, Text value, Context context) " 而非
"public void map(LongWritable? key, Text value, OutputCollector?<Text, Text> output, Reporter reporter) "
原因是因為 使用 org.apache.hadoop.mapreduce.Mapper 卻使用 OutputCollector?<Text, Text> output, Reporter reporter
會造成 OutputCollector?<Text, Text> 的<Text, Text>無法作用,因此 output type 會以為是要跟 input type 一致
所以程式認定 output.collect( longWritable, Text ); 但我們覺得輸出應該是 output.collect( Text , Text ) 因此不斷告訴我們
" Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable? "
解決辦法
Mapper.Context 就有宣告了getInputSplit();方法,因此使用 context 幾乎可以滿足之前的 OutputCollector? 與 Reporter
完整可以work 的程式碼
public class WordIndex { public static class wordindexM extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) context.getInputSplit(); Text map_key = new Text(); Text map_value = new Text(); String line = value.toString(); StringTokenizer st = new StringTokenizer(line.toLowerCase()); while (st.hasMoreTokens()) { String word = st.nextToken(); map_key.set(word); map_value.set(fileSplit.getPath().getName() + ":" + line); context.write(map_key, map_value); } } } static public class wordindexR extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String v = ""; StringBuilder ret = new StringBuilder("\n"); for (Text val : values) { v += val.toString().trim(); if (v.length() > 0) ret.append(v + "\n"); } output.collect((Text) key, new Text(ret.toString())); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // debug using String[] argv = { "/user/waue/input", "/user/waue/output-wi" }; args = argv; Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>"); return; } Job job = new Job(conf, "word index"); job.setJobName("word inverted index"); job.setJarByClass(WordIndex.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(wordindexM.class); job.setReducerClass(wordindexR.class); job.setCombinerClass(wordindexR.class); FileInputFormat.setInputPaths(job, args[0]); HelloHadoopV2.checkAndDelete(args[1], conf); FileOutputFormat.setOutputPath(job, new Path(args[1])); long start = System.nanoTime(); job.waitForCompletion(true); long time = System.nanoTime() - start; System.err.println(time * (1E-9) + " secs."); } }