Changes between Version 2 and Version 3 of waue/2010/0115


Ignore:
Timestamp:
Jan 19, 2010, 5:14:19 PM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0115

    v2 v3  
    152152}}}
    153153
    154 
     154 == 修改 ==
     155
     156此程式碼用hadoop 0.20的api 將 deprecated 修正,然而卻出現以下錯誤
     157
     158{{{
     159#!text
     160
     161 Exception in thread "main" java.lang.NullPointerException
     162at WordCountV020.run(WordCountV020.java:127)
     163at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
     164at WordCountV020.main(WordCountV020.java:147)
     165
     166}}}
     167
     168=== 修改後原始碼 ===
     169
     170{{{
     171#!java
     172import java.io.BufferedReader;
     173import java.io.FileReader;
     174import java.io.IOException;
     175import java.util.ArrayList;
     176import java.util.HashSet;
     177import java.util.Iterator;
     178import java.util.List;
     179import java.util.Set;
     180import java.util.StringTokenizer;
     181
     182import org.apache.hadoop.conf.Configuration;
     183import org.apache.hadoop.conf.Configured;
     184import org.apache.hadoop.filecache.DistributedCache;
     185import org.apache.hadoop.fs.Path;
     186import org.apache.hadoop.io.IntWritable;
     187import org.apache.hadoop.io.LongWritable;
     188import org.apache.hadoop.io.Text;
     189import org.apache.hadoop.mapreduce.Job;
     190import org.apache.hadoop.mapreduce.Mapper;
     191import org.apache.hadoop.mapreduce.Reducer;
     192import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     193import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     194import org.apache.hadoop.util.StringUtils;
     195import org.apache.hadoop.util.Tool;
     196import org.apache.hadoop.util.ToolRunner;
     197
     198public class WordCountV020 extends Configured implements Tool {
     199
     200        static boolean caseSensitive = true;
     201        static Set<String> patternsToSkip = new HashSet<String>();
     202
     203        static void parseSkipFile(Path patternsFile) {
     204                try {
     205                        BufferedReader fis = new BufferedReader(new FileReader(patternsFile
     206                                        .toString()));
     207                        String pattern = null;
     208                        while ((pattern = fis.readLine()) != null) {
     209                                patternsToSkip.add(pattern);
     210                        }
     211                } catch (IOException ioe) {
     212                        System.err
     213                                        .println("Caught exception while parsing the cached file '"
     214                                                        + patternsFile + "' : "
     215                                                        + StringUtils.stringifyException(ioe));
     216                }
     217        }
     218
     219        public static class Map extends
     220                        Mapper<LongWritable, Text, Text, IntWritable> {
     221
     222                static enum Counters {
     223                        INPUT_WORDS
     224                }
     225
     226                private final static IntWritable one = new IntWritable(1);
     227                private Text word = new Text();
     228
     229                public void map(LongWritable key, Text value, Context context)
     230                                throws IOException, InterruptedException {
     231
     232                        String line = (caseSensitive) ? value.toString() : value.toString()
     233                                        .toLowerCase();
     234
     235                        for (String pattern : patternsToSkip) {
     236                                line = line.replaceAll(pattern, "");
     237                        }
     238
     239                        StringTokenizer tokenizer = new StringTokenizer(line);
     240                        while (tokenizer.hasMoreTokens()) {
     241                                word.set(tokenizer.nextToken());
     242                                context.write(word, one);
     243
     244                        }
     245
     246                }
     247        }
     248
     249        public static class Reduce extends
     250                        Reducer<Text, IntWritable, Text, IntWritable> {
     251                public void reduce(Text key, Iterator<IntWritable> values, Context context)
     252                                throws IOException, InterruptedException {
     253                        int sum = 0;
     254                        while (values.hasNext()) {
     255                                sum += values.next().get();
     256                        }
     257                        context.write(key, new IntWritable(sum));
     258                }
     259        }
     260
     261        public int run(String[] args) throws Exception {
     262
     263                Configuration conf = new Configuration();
     264                // 宣告job 取得conf 並設定名稱 Hadoop Hello World
     265                Job job = new Job(conf, "Hadoop Hello World");
     266                // 設定此運算的主程式
     267                job.setJarByClass(HelloHadoop.class);
     268
     269                job.setOutputKeyClass(Text.class);
     270                job.setOutputValueClass(IntWritable.class);
     271
     272                job.setMapperClass(Map.class);
     273                job.setCombinerClass(Reduce.class);
     274                job.setReducerClass(Reduce.class);
     275
     276                List<String> other_args = new ArrayList<String>();
     277                for (int i = 0; i < args.length; ++i) {
     278                        if ("-skip".equals(args[i])) {
     279                                DistributedCache
     280                                                .addCacheFile(new Path(args[++i]).toUri(), conf);
     281                                conf.setBoolean("wordcount.skip.patterns", true);
     282                        } else {
     283                                other_args.add(args[i]);
     284                        }
     285                }
     286
     287                caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
     288
     289                if (conf.getBoolean("wordcount.skip.patterns", false)) {
     290                        Path[] patternsFiles = new Path[0];
     291                        try {
     292                                patternsFiles = DistributedCache.getLocalCacheFiles(conf);
     293                               
     294                        } catch (IOException ioe) {
     295                                System.err
     296                                                .println("Caught exception while getting cached files: "
     297                                                                + StringUtils.stringifyException(ioe));
     298                        }
     299                        for (Path patternsFile : patternsFiles) {
     300                                parseSkipFile(patternsFile);
     301                        }
     302                }
     303
     304                FileInputFormat.setInputPaths(job, new Path(other_args.get(0)));
     305                // 設定輸出路徑
     306                FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
     307
     308                CheckAndDelete.checkAndDelete(other_args.get(1), conf);
     309                job.waitForCompletion(true);
     310                return 0;
     311        }
     312
     313        public static void main(String[] args) throws Exception {
     314                String[] argv = { "-Dwordcount.case.sensitive=false",
     315                                "/user/waue/input", "/user/waue/output-v020", "-skip",
     316                                "/user/waue/patterns/patterns.txt" };
     317                args = argv;
     318                int res = ToolRunner
     319                                .run(new Configuration(), new WordCountV020(), args);
     320                System.exit(res);
     321        }
     322}
     323}}}