Changes between Initial Version and Version 1 of waue/2010/0115-WordIndex


Ignore:
Timestamp:
Jan 19, 2010, 7:51:19 PM (15 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0115-WordIndex

    v1 v1  
     1 = Word Index  =
     2
     3{{{
     4#!java
     5import java.io.IOException;
     6import java.util.StringTokenizer;
     7
     8import org.apache.hadoop.conf.Configuration;
     9import org.apache.hadoop.fs.Path;
     10import org.apache.hadoop.io.LongWritable;
     11import org.apache.hadoop.io.Text;
     12import org.apache.hadoop.mapred.OutputCollector;
     13import org.apache.hadoop.mapred.Reporter;
     14import org.apache.hadoop.mapreduce.Job;
     15import org.apache.hadoop.mapreduce.Mapper;
     16import org.apache.hadoop.mapreduce.Reducer;
     17import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     18import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     19import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     20import org.apache.hadoop.util.GenericOptionsParser;
     21
     22
     23//      WordIndex
     24//      說明:
     25//              將每個字出於哪個檔案,那一行印出來
     26//
     27//      測試方法:
     28//              將此程式運作在hadoop 0.20 平台上,執行:
     29//              ---------------------------
     30//              hadoop jar WordIndex.jar <input> <output>
     31//              ---------------------------
     32//
     33//      注意:
     34//1.    在hdfs 上來源檔案的路徑為 你所指定的 <input>
     35//請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
     36//2.    運算完後,程式將執行結果放在hdfs 的輸出路徑為 你所指定的 <output>
     37
     38
     39public class WordIndex {
     40         public static class wordindexM extends
     41                        Mapper<LongWritable, Text, Text, Text> {
     42                public void map(LongWritable key, Text value, Context context)
     43                                throws IOException, InterruptedException {
     44
     45                        FileSplit fileSplit = (FileSplit) context.getInputSplit();
     46                       
     47                        Text map_key = new Text();
     48                        Text map_value = new Text();
     49                        String line = value.toString();
     50                        StringTokenizer st = new StringTokenizer(line.toLowerCase());
     51                        while (st.hasMoreTokens()) {
     52                                String word = st.nextToken();
     53                                map_key.set(word);
     54                                map_value.set(fileSplit.getPath().getName() + ":" + line);
     55                                context.write(map_key, map_value);
     56                        }
     57                }
     58        }
     59
     60
     61        static public class wordindexR extends Reducer<Text, Text, Text, Text> {
     62
     63                public void reduce(Text key, Iterable<Text> values,
     64                                OutputCollector<Text, Text> output, Reporter reporter)
     65                                throws IOException {
     66                        String v = "";
     67                        StringBuilder ret = new StringBuilder("\n");
     68                        for (Text val : values) {
     69                                v += val.toString().trim();
     70                                if (v.length() > 0)
     71                                        ret.append(v + "\n");
     72                        }
     73
     74                        output.collect((Text) key, new Text(ret.toString()));
     75                }
     76        }
     77
     78        public static void main(String[] args) throws IOException,
     79                        InterruptedException, ClassNotFoundException {
     80                // debug using
     81//              String[] argv = { "input", "output-wi" };
     82//              args = argv;
     83
     84                Configuration conf = new Configuration();
     85                String[] otherArgs = new GenericOptionsParser(conf, args)
     86                                .getRemainingArgs();
     87                if (otherArgs.length < 2) {
     88                        System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>");
     89                        return;
     90                }
     91                Job job = new Job(conf, "word index");
     92                job.setJobName("word inverted index");
     93                job.setJarByClass(WordIndex.class);
     94
     95                job.setMapOutputKeyClass(Text.class);
     96                job.setMapOutputValueClass(Text.class);
     97                job.setOutputKeyClass(Text.class);
     98                job.setOutputValueClass(Text.class);
     99
     100                job.setMapperClass(wordindexM.class);
     101                job.setReducerClass(wordindexR.class);
     102                job.setCombinerClass(wordindexR.class);
     103
     104                FileInputFormat.setInputPaths(job, args[0]);
     105                CheckAndDelete.checkAndDelete(args[1], conf);
     106                FileOutputFormat.setOutputPath(job, new Path(args[1]));
     107
     108                long start = System.nanoTime();
     109
     110                job.waitForCompletion(true);
     111
     112                long time = System.nanoTime() - start;
     113                System.err.println(time * (1E-9) + " secs.");
     114        }
     115}
     116
     117}}}