Changes between Version 6 and Version 7 of waue/2010/0118


Ignore:
Timestamp:
Jan 19, 2010, 11:51:09 AM (13 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0118

    v6 v7  
    113113}}}
    114114
    115  * 目前認為是
     115 * 解決
    116116
    117117{{{
     
    127127
    128128不過它預設搭配的map 實做方法是 ''' "public void map(LongWritable key, Text value, Context context) " ''' 而非
     129
    129130'''  "public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) " '''
    130131
     132原因是因為 使用 org.apache.hadoop.mapreduce.Mapper 卻使用  OutputCollector<Text, Text> output, Reporter reporter
     133
     134會造成''' OutputCollector<Text, Text>''' 的<Text, Text>無法作用,因此   output type 會以為是要跟 input type 一致
     135
     136所以程式認定 output.collect( longWritable, Text );  但我們覺得輸出應該是  output.collect( Text , Text ) 因此不斷告訴我們
     137
     138" Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable "
    131139
    132140 * 解決辦法
    133141
    134 
     142Mapper.Context 就有宣告了getInputSplit();方法,因此使用 context 幾乎可以滿足之前的 ''' OutputCollector 與 Reporter '''
     143
     144{{{
     145#!java
     146public class WordIndex {
     147         public static class wordindexM extends
     148                        Mapper<LongWritable, Text, Text, Text> {
     149                public void map(LongWritable key, Text value, Context context)
     150                                throws IOException, InterruptedException {
     151
     152                        FileSplit fileSplit = (FileSplit) context.getInputSplit();
     153                       
     154                        Text map_key = new Text();
     155                        Text map_value = new Text();
     156                        String line = value.toString();
     157                        StringTokenizer st = new StringTokenizer(line.toLowerCase());
     158                        while (st.hasMoreTokens()) {
     159                                String word = st.nextToken();
     160                                map_key.set(word);
     161                                map_value.set(fileSplit.getPath().getName() + ":" + line);
     162                                context.write(map_key, map_value);
     163                        }
     164                }
     165        }
     166
     167
     168        static public class wordindexR extends Reducer<Text, Text, Text, Text> {
     169
     170                public void reduce(Text key, Iterable<Text> values,
     171                                OutputCollector<Text, Text> output, Reporter reporter)
     172                                throws IOException {
     173                        String v = "";
     174                        StringBuilder ret = new StringBuilder("\n");
     175                        for (Text val : values) {
     176                                v += val.toString().trim();
     177                                if (v.length() > 0)
     178                                        ret.append(v + "\n");
     179                        }
     180
     181                        output.collect((Text) key, new Text(ret.toString()));
     182                }
     183        }
     184
     185        public static void main(String[] args) throws IOException,
     186                        InterruptedException, ClassNotFoundException {
     187                // debug using
     188                String[] argv = { "/user/waue/input", "/user/waue/output-wi" };
     189                args = argv;
     190
     191                Configuration conf = new Configuration();
     192                String[] otherArgs = new GenericOptionsParser(conf, args)
     193                                .getRemainingArgs();
     194                if (otherArgs.length < 2) {
     195                        System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>");
     196                        return;
     197                }
     198                Job job = new Job(conf, "word index");
     199                job.setJobName("word inverted index");
     200                job.setJarByClass(WordIndex.class);
     201
     202                job.setMapOutputKeyClass(Text.class);
     203                job.setMapOutputValueClass(Text.class);
     204                job.setOutputKeyClass(Text.class);
     205                job.setOutputValueClass(Text.class);
     206
     207                job.setMapperClass(wordindexM.class);
     208                job.setReducerClass(wordindexR.class);
     209                job.setCombinerClass(wordindexR.class);
     210
     211                FileInputFormat.setInputPaths(job, args[0]);
     212                HelloHadoopV2.checkAndDelete(args[1], conf);
     213                FileOutputFormat.setOutputPath(job, new Path(args[1]));
     214
     215                long start = System.nanoTime();
     216
     217                job.waitForCompletion(true);
     218
     219                long time = System.nanoTime() - start;
     220                System.err.println(time * (1E-9) + " secs.");
     221        }
     222}
     223
     224}}}