wiki:waue/2010/0118

Hadoop 程式 Debug

原始程式

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordIndex {
   public static class wordindexM extends
      Mapper<LongWritable, Text, Text, Text> {
    public void map(LongWritable key, Text value,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {

      FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
      Text map_key = new Text();
      Text map_value = new Text();
      String line = value.toString();
      StringTokenizer st = new StringTokenizer(line.toLowerCase());
      while (st.hasMoreTokens()) {
        String word = st.nextToken();
        map_key.set(word);
        map_value.set(fileSplit.getPath().getName() + ":" + line);
        output.collect(map_key, map_value);
      }
    }
  }


  static public class wordindexR extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {
      String v = "";
      StringBuilder ret = new StringBuilder("\n");
      for (Text val : values) {
        v += val.toString().trim();
        if (v.length() > 0)
          ret.append(v + "\n");
      }

      output.collect((Text) key, new Text(ret.toString()));
    }
  }

  public static void main(String[] args) throws IOException,
      InterruptedException, ClassNotFoundException {
    // debug using
    String[] argv = { "/user/waue/input", "/user/waue/output-wi" };
    args = argv;

    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
        .getRemainingArgs();
    if (otherArgs.length < 2) {
      System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>");
      return;
    }
    Job job = new Job(conf, "word index");
    job.setJobName("word inverted index");
    job.setJarByClass(WordIndex.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(wordindexM.class);
    job.setReducerClass(wordindexR.class);
    job.setCombinerClass(wordindexR.class);

    FileInputFormat.setInputPaths(job, args[0]);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    long start = System.nanoTime();

    job.waitForCompletion(true);

    long time = System.nanoTime() - start;
    System.err.println(time * (1E-9) + " secs.");
  }
}

遇到問題

10/01/18 20:52:39 INFO input.FileInputFormat: Total input paths to process : 2
10/01/18 20:52:39 INFO mapred.JobClient: Running job: job_201001181452_0038
10/01/18 20:52:40 INFO mapred.JobClient:  map 0% reduce 0%
10/01/18 20:52:50 INFO mapred.JobClient: Task Id : attempt_201001181452_0038_m_000000_0, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable

研判

   public static class wordindexM extends
      Mapper<LongWritable, Text, Text, Text> {
    public void map(LongWritable key, Text value,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {

用了新的 org.apache.hadoop.mapreduce.Mapper 可以省略掉不用 extends .. implements ... 的宣告,

不過它預設搭配的map 實做方法是 "public void map(LongWritable? key, Text value, Context context) " 而非

"public void map(LongWritable? key, Text value, OutputCollector?<Text, Text> output, Reporter reporter) "

原因是因為 使用 org.apache.hadoop.mapreduce.Mapper 卻使用 OutputCollector?<Text, Text> output, Reporter reporter

會造成 OutputCollector?<Text, Text> 的<Text, Text>無法作用,因此 output type 會以為是要跟 input type 一致

所以程式認定 output.collect( longWritable, Text ); 但我們覺得輸出應該是 output.collect( Text , Text ) 因此不斷告訴我們

" Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable? "

解決辦法

Mapper.Context 就有宣告了getInputSplit();方法,因此使用 context 幾乎可以滿足之前的 OutputCollector? 與 Reporter

完整可以work 的程式碼

public class WordIndex {
   public static class wordindexM extends
      Mapper<LongWritable, Text, Text, Text> {
    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

      FileSplit fileSplit = (FileSplit) context.getInputSplit();
      
      Text map_key = new Text();
      Text map_value = new Text();
      String line = value.toString();
      StringTokenizer st = new StringTokenizer(line.toLowerCase());
      while (st.hasMoreTokens()) {
        String word = st.nextToken();
        map_key.set(word);
        map_value.set(fileSplit.getPath().getName() + ":" + line);
        context.write(map_key, map_value);
      }
    }
  }


  static public class wordindexR extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {
      String v = "";
      StringBuilder ret = new StringBuilder("\n");
      for (Text val : values) {
        v += val.toString().trim();
        if (v.length() > 0)
          ret.append(v + "\n");
      }

      output.collect((Text) key, new Text(ret.toString()));
    }
  }

  public static void main(String[] args) throws IOException,
      InterruptedException, ClassNotFoundException {
    // debug using
    String[] argv = { "/user/waue/input", "/user/waue/output-wi" };
    args = argv;

    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
        .getRemainingArgs();
    if (otherArgs.length < 2) {
      System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>");
      return;
    }
    Job job = new Job(conf, "word index");
    job.setJobName("word inverted index");
    job.setJarByClass(WordIndex.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(wordindexM.class);
    job.setReducerClass(wordindexR.class);
    job.setCombinerClass(wordindexR.class);

    FileInputFormat.setInputPaths(job, args[0]);
    HelloHadoopV2.checkAndDelete(args[1], conf);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    long start = System.nanoTime();

    job.waitForCompletion(true);

    long time = System.nanoTime() - start;
    System.err.println(time * (1E-9) + " secs.");
  }
}

Last modified 14 years ago Last modified on Jan 19, 2010, 11:53:28 AM