Changes between Initial Version and Version 1 of waue/2010/0115


Ignore:
Timestamp:
Jan 19, 2010, 4:28:47 PM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0115

    v1 v1  
     1 = 將 wordcount2 改成 0.20 版 =
     2
     3 == 前言 ==
     4按照hadoop 0.20 官方網頁的 wordcount v2 .
     5[[http://hadoop.apache.org/common/docs/r0.20.1/mapred_tutorial.html#Example%3A+WordCount+v1.0 ]]
     6
     7最需要給的地方是 ''' " extends MapReduceBase implements Mapper" ''' 原因是在hadoop 0.20時,mapreducebase 此class已經被deprecated,
     8
     9因此應改寫如 ''' " extends Mapper" '''
     10
     11然而最主要不能改變的原因是,程式中很重要的功能 [http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/filecache/DistributedCache.html DistributedCache ]  以及 -Dwordcount.skip.patterns 等功能寫於 configure() 函數內。 此configure() 繼承自 MapReduceBase,
     12
     13因此若整個程式改成hadoop 0.20 的 " extends Mapper" ''',則有些功能將不知是否能使用
     14
     15
     16 == 原始程式碼 ==
     17
     18{{{
     19#!java
     20public class WordCountV2 extends Configured implements Tool {
     21
     22        public static class Map extends MapReduceBase implements
     23                        Mapper<LongWritable, Text, Text, IntWritable> {
     24
     25                static enum Counters {
     26                        INPUT_WORDS
     27                }
     28
     29                private final static IntWritable one = new IntWritable(1);
     30                private Text word = new Text();
     31
     32                private boolean caseSensitive = true;
     33                private Set<String> patternsToSkip = new HashSet<String>();
     34
     35                private long numRecords = 0;
     36                private String inputFile;
     37
     38                public void configure(JobConf job) {
     39                        caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
     40                        inputFile = job.get("map.input.file");
     41
     42                        if (job.getBoolean("wordcount.skip.patterns", false)) {
     43                                Path[] patternsFiles = new Path[0];
     44                                try {
     45                                        patternsFiles = DistributedCache.getLocalCacheFiles(job);
     46                                } catch (IOException ioe) {
     47                                        System.err
     48                                                        .println("Caught exception while getting cached files: "
     49                                                                        + StringUtils.stringifyException(ioe));
     50                                }
     51                                for (Path patternsFile : patternsFiles) {
     52                                        parseSkipFile(patternsFile);
     53                                }
     54                        }
     55                }
     56
     57                private void parseSkipFile(Path patternsFile) {
     58                        try {
     59                                BufferedReader fis = new BufferedReader(new FileReader(
     60                                                patternsFile.toString()));
     61                                String pattern = null;
     62                                while ((pattern = fis.readLine()) != null) {
     63                                        patternsToSkip.add(pattern);
     64                                }
     65                        } catch (IOException ioe) {
     66                                System.err
     67                                                .println("Caught exception while parsing the cached file '"
     68                                                                + patternsFile
     69                                                                + "' : "
     70                                                                + StringUtils.stringifyException(ioe));
     71                        }
     72                }
     73
     74                public void map(LongWritable key, Text value,
     75                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     76                                throws IOException {
     77                        String line = (caseSensitive) ? value.toString() : value.toString()
     78                                        .toLowerCase();
     79
     80                        for (String pattern : patternsToSkip) {
     81                                line = line.replaceAll(pattern, "");
     82                        }
     83
     84                        StringTokenizer tokenizer = new StringTokenizer(line);
     85                        while (tokenizer.hasMoreTokens()) {
     86                                word.set(tokenizer.nextToken());
     87                                output.collect(word, one);
     88                                reporter.incrCounter(Counters.INPUT_WORDS, 1);
     89                        }
     90
     91                        if ((++numRecords % 100) == 0) {
     92                                reporter.setStatus("Finished processing " + numRecords
     93                                                + " records " + "from the input file: " + inputFile);
     94                        }
     95                }
     96        }
     97
     98        public static class Reduce extends MapReduceBase implements
     99                        Reducer<Text, IntWritable, Text, IntWritable> {
     100                public void reduce(Text key, Iterator<IntWritable> values,
     101                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     102                                throws IOException {
     103                        int sum = 0;
     104                        while (values.hasNext()) {
     105                                sum += values.next().get();
     106                        }
     107                        output.collect(key, new IntWritable(sum));
     108                }
     109        }
     110
     111        public int run(String[] args) throws Exception {
     112
     113
     114                JobConf conf = new JobConf(getConf(), WordCount.class);
     115                conf.setJobName("wordcount");
     116
     117                conf.setOutputKeyClass(Text.class);
     118                conf.setOutputValueClass(IntWritable.class);
     119
     120                conf.setMapperClass(Map.class);
     121                conf.setCombinerClass(Reduce.class);
     122                conf.setReducerClass(Reduce.class);
     123
     124                conf.setInputFormat(TextInputFormat.class);
     125                conf.setOutputFormat(TextOutputFormat.class);
     126
     127                List<String> other_args = new ArrayList<String>();
     128                for (int i = 0; i < args.length; ++i) {
     129                        if ("-skip".equals(args[i])) {
     130                                DistributedCache
     131                                                .addCacheFile(new Path(args[++i]).toUri(), conf);
     132                                conf.setBoolean("wordcount.skip.patterns", true);
     133                        } else {
     134                                other_args.add(args[i]);
     135                        }
     136                }
     137
     138                FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
     139                FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
     140                CheckAndDelete.checkAndDelete(other_args.get(1), conf);
     141                JobClient.runJob(conf);
     142                return 0;
     143        }
     144
     145        public static void main(String[] args) throws Exception {
     146                String[] argv = { "-Dwordcount.case.sensitive=false",
     147                                "/user/waue/input", "/user/waue/output-wc2", "-skip",
     148                                "/user/waue/patterns/patterns.txt" };
     149                args = argv;
     150                int res = ToolRunner.run(new Configuration(), new WordCountV2(), args);
     151                System.exit(res);
     152        }
     153}
     154}}}
     155
     156 == 原始程式碼 ==