Changes between Initial Version and Version 1 of waue/2010/0419


Ignore:
Timestamp:
Apr 19, 2010, 4:09:38 PM (15 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0419

    v1 v1  
     1
     2{{{
     3#!java
     4import java.io.BufferedReader;
     5import java.io.FileReader;
     6import java.io.IOException;
     7
     8import java.util.ArrayList;
     9import java.util.HashSet;
     10import java.util.List;
     11import java.util.Set;
     12import java.util.StringTokenizer;
     13
     14import org.apache.hadoop.conf.Configuration;
     15import org.apache.hadoop.conf.Configured;
     16import org.apache.hadoop.filecache.DistributedCache;
     17import org.apache.hadoop.fs.Path;
     18import org.apache.hadoop.io.IntWritable;
     19import org.apache.hadoop.io.LongWritable;
     20import org.apache.hadoop.io.Text;
     21import org.apache.hadoop.mapreduce.Counter;
     22import org.apache.hadoop.mapreduce.Job;
     23import org.apache.hadoop.mapreduce.Mapper;
     24import org.apache.hadoop.mapreduce.Reducer;
     25import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     26import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     27import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     28import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     29import org.apache.hadoop.util.StringUtils;
     30import org.apache.hadoop.util.Tool;
     31import org.apache.hadoop.util.ToolRunner;
     32
     33public class WordCount2for020 extends Configured implements Tool {
     34        // 問題一: configuration 所設定的參數值沒有傳遞到map裡
     35        // 問題二: inputFile 也沒用到
     36        // 問題三:由於configuration 的問題,以致 DistributedCache 也無法取得正確的LocalCacheFiles
     37        public static class Map extends
     38                        Mapper<LongWritable, Text, Text, IntWritable> {
     39
     40                static enum Counters {
     41                        INPUT_WORDS
     42                }
     43
     44                private final static IntWritable one = new IntWritable(1);
     45                private Text word = new Text();
     46
     47                private boolean caseSensitive = true;
     48                private Set<String> patternsToSkip = new HashSet<String>();
     49
     50                private long numRecords = 0;
     51                private String inputFile;
     52
     53                public void setup(Context context) {
     54                        System.err.println("yes yes yes !!! setup work");
     55
     56                        Configuration conf = context.getConfiguration();
     57
     58                        caseSensitive = conf.getBoolean("wordcount.case.sensitive", false);
     59
     60                        // inputFile = conf.get("map.input.file","");
     61                        // inputFile = "/user/waue/patterns";
     62                        // 此處 inputFile 之後就在也沒用到,因此在這裡處理一下
     63
     64                        boolean wsp = conf.getBoolean("wordcount.skip.patterns", true);
     65
     66                        System.err.println("caseSensitive = " + caseSensitive);
     67                        System.err.println("wordcount.skip.patterns = " + wsp);
     68
     69                        if (wsp) {
     70                                Path[] patternsFiles = new Path[0];
     71
     72                                try {
     73                                        patternsFiles = DistributedCache.getLocalCacheFiles(conf);
     74
     75                                } catch (IOException ioe) {
     76                                        System.err
     77                                                        .println("Caught exception while getting cached files: "
     78                                                                        + StringUtils.stringifyException(ioe));
     79                                }
     80                                for (Path patternsFile : patternsFiles) {
     81                                        parseSkipFile(patternsFile);
     82                                        System.err.println("parseSkipFile = " + patternsFile);
     83                                }
     84                        }
     85                }
     86
     87                private void parseSkipFile(Path patternsFile) {
     88                        try {
     89                                BufferedReader fis = new BufferedReader(new FileReader(
     90                                                patternsFile.toString()));
     91                                String pattern = null;
     92                                while ((pattern = fis.readLine()) != null) {
     93
     94                                        patternsToSkip.add(pattern);
     95                                }
     96                        } catch (IOException ioe) {
     97                                System.err
     98                                                .println("Caught exception while parsing the cached file '"
     99                                                                + patternsFile
     100                                                                + "' : "
     101                                                                + StringUtils.stringifyException(ioe));
     102                        }
     103                }
     104
     105                public void map(LongWritable key, Text value, Context context)
     106                                throws IOException, InterruptedException {
     107                        String line = (caseSensitive) ? value.toString() : value.toString()
     108                                        .toLowerCase();
     109
     110                        for (String pattern : patternsToSkip) {
     111                                line = line.replaceAll(pattern, "");
     112                        }
     113
     114                        StringTokenizer tokenizer = new StringTokenizer(line);
     115                        while (tokenizer.hasMoreTokens()) {
     116                                word.set(tokenizer.nextToken());
     117                                context.write(word, one);
     118                                Counter count = context.getCounter(Counters.INPUT_WORDS);
     119                                count.increment(1);
     120                        }
     121
     122                        if ((++numRecords % 100) == 0) {
     123                                context.setStatus("Finished processing " + numRecords
     124                                                + " records " + "from the input file: " + inputFile);
     125                        }
     126                }
     127        }
     128
     129        public static class Reduce extends
     130                        Reducer<Text, IntWritable, Text, IntWritable> {
     131                /* on V0.20, we should use Iterable to replace Iterator */
     132                // public void reduce(Text key, Iterator<IntWritable> values,
     133
     134                public void reduce(Text key, Iterable<IntWritable> values,
     135                                Context context) throws IOException, InterruptedException {
     136                        int sum = 0;
     137                        /* these three line only be used on V0.19 nor V0.20 */
     138                        // while (values.hasNext()) {
     139                        // sum += values.next().get();
     140                        // }
     141
     142                        for (IntWritable val : values) {
     143                                sum += val.get();
     144                        }
     145                        context.write(key, new IntWritable(sum));
     146                }
     147        }
     148
     149        public int run(String[] args) throws Exception {
     150                Configuration conf = new Configuration();
     151
     152                Job job = new Job(conf);
     153                job.setJarByClass(WordCount2for020.class);
     154                job.setJobName("wordcount");
     155
     156                job.setMapOutputKeyClass(Text.class);
     157                job.setMapOutputValueClass(IntWritable.class);
     158                job.setOutputKeyClass(Text.class);
     159                job.setOutputValueClass(IntWritable.class);
     160
     161                job.setMapperClass(Map.class);
     162                job.setCombinerClass(Reduce.class);
     163                job.setReducerClass(Reduce.class);
     164
     165                job.setInputFormatClass(TextInputFormat.class);
     166                job.setOutputFormatClass(TextOutputFormat.class);
     167
     168                List<String> other_args = new ArrayList<String>();
     169                for (int i = 0; i < args.length; ++i) {
     170                        if ("-skip".equals(args[i])) {
     171
     172                                DistributedCache
     173                                                .addCacheFile(new Path(args[++i]).toUri(), conf);
     174
     175                                System.err.println("cache file = " + args[i]);
     176
     177                                conf.setBoolean("wordcount.skip.patterns", true);
     178                                System.err.println("wordcount.skip.patterns = true");
     179                        } else {
     180                                other_args.add(args[i]);
     181                        }
     182                }
     183                // conf.set("mapred.job.tracker", "local");
     184                // conf.set("fs.default.name", "file:///");
     185
     186                FileInputFormat.setInputPaths(job, new Path(other_args.get(0)));
     187                FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
     188
     189                job.waitForCompletion(true);
     190
     191                return 0;
     192        }
     193
     194        public static void main(String[] args) throws Exception {
     195                String[] argv = { "-Dwordcount.case.sensitive=false",
     196                                "/user/waue/text_input", "/user/waue/output-v020", "-skip",
     197                                "/user/waue/patterns/patterns.txt" };
     198                args = argv;
     199                int res = ToolRunner.run(new Configuration(), new WordCount2for020(),
     200                                args);
     201                System.exit(res);
     202        }
     203}
     204}}}
     205
     206 = 問題 =
     207 * -skip 參數、以及 case.sensitive 的參數無用
     208
     209 = 問題研判 =
     210{{{
     211#!text
     212        // 問題一: configuration 所設定的參數值沒有傳遞到map裡
     213        // 問題二: inputFile 也沒用到
     214        // 問題三:由於configuration 的問題,以致 DistributedCache 也無法取得正確的LocalCacheFiles
     215}}}
     216
     217{{{
     21810/04/19 16:02:58 WARN conf.Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively
     219cache file = /user/waue/patterns/patterns.txt
     220wordcount.skip.patterns = true
     22110/04/19 16:02:59 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
     22210/04/19 16:02:59 INFO input.FileInputFormat: Total input paths to process : 4
     22310/04/19 16:02:59 INFO mapred.JobClient: Running job: job_201004190849_0011
     22410/04/19 16:03:00 INFO mapred.JobClient:  map 0% reduce 0%
     22510/04/19 16:03:10 INFO mapred.JobClient: Task Id : attempt_201004190849_0011_m_000000_0, Status : FAILED
     226java.lang.NullPointerException
     227        at WordCount2for020$Map.setup(WordCount2for020.java:79)
     228        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
     229        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:583)
     230        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
     231        at org.apache.hadoop.mapred.Child.main(Child.java:170)
     232}}}