wiki:waue/2010/0115-WordIndex

Word Index

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


//  WordIndex
//  說明: 
//    將每個字出於哪個檔案,那一行印出來
//
//  測試方法:
//    將此程式運作在hadoop 0.20 平台上,執行:
//    ---------------------------
//    hadoop jar WordIndex.jar <input> <output>
//    ---------------------------
//
//  注意:
//1.  在hdfs 上來源檔案的路徑為 你所指定的 <input>
//請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
//2.  運算完後,程式將執行結果放在hdfs 的輸出路徑為 你所指定的 <output>


public class WordIndex {
   public static class wordindexM extends
      Mapper<LongWritable, Text, Text, Text> {
    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

      FileSplit fileSplit = (FileSplit) context.getInputSplit();
      
      Text map_key = new Text();
      Text map_value = new Text();
      String line = value.toString();
      StringTokenizer st = new StringTokenizer(line.toLowerCase());
      while (st.hasMoreTokens()) {
        String word = st.nextToken();
        map_key.set(word);
        map_value.set(fileSplit.getPath().getName() + ":" + line);
        context.write(map_key, map_value);
      }
    }
  }


  static public class wordindexR extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {
      String v = "";
      StringBuilder ret = new StringBuilder("\n");
      for (Text val : values) {
        v += val.toString().trim();
        if (v.length() > 0)
          ret.append(v + "\n");
      }

      output.collect((Text) key, new Text(ret.toString()));
    }
  }

  public static void main(String[] args) throws IOException,
      InterruptedException, ClassNotFoundException {
    // debug using
//    String[] argv = { "input", "output-wi" };
//    args = argv;

    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
        .getRemainingArgs();
    if (otherArgs.length < 2) {
      System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>");
      return;
    }
    Job job = new Job(conf, "word index");
    job.setJobName("word inverted index");
    job.setJarByClass(WordIndex.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(wordindexM.class);
    job.setReducerClass(wordindexR.class);
    job.setCombinerClass(wordindexR.class);

    FileInputFormat.setInputPaths(job, args[0]);
    CheckAndDelete.checkAndDelete(args[1], conf);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    long start = System.nanoTime();

    job.waitForCompletion(true);

    long time = System.nanoTime() - start;
    System.err.println(time * (1E-9) + " secs.");
  }
}

Last modified 15 years ago Last modified on Jan 19, 2010, 7:51:19 PM