Changes between Initial Version and Version 1 of waue/2010/0115-WordCountV2


Ignore:
Timestamp:
Jan 19, 2010, 7:49:55 PM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0115-WordCountV2

    v1 v1  
     1 = Word Count V2 =
     2
     3{{{
     4#!java
     5import java.io.BufferedReader;
     6import java.io.FileReader;
     7import java.io.IOException;
     8import java.util.ArrayList;
     9import java.util.HashSet;
     10import java.util.Iterator;
     11import java.util.List;
     12import java.util.Set;
     13import java.util.StringTokenizer;
     14
     15import org.apache.hadoop.conf.Configuration;
     16import org.apache.hadoop.conf.Configured;
     17import org.apache.hadoop.filecache.DistributedCache;
     18import org.apache.hadoop.fs.Path;
     19import org.apache.hadoop.io.IntWritable;
     20import org.apache.hadoop.io.LongWritable;
     21import org.apache.hadoop.io.Text;
     22import org.apache.hadoop.mapred.FileInputFormat;
     23import org.apache.hadoop.mapred.FileOutputFormat;
     24import org.apache.hadoop.mapred.JobClient;
     25import org.apache.hadoop.mapred.JobConf;
     26import org.apache.hadoop.mapred.MapReduceBase;
     27import org.apache.hadoop.mapred.Mapper;
     28import org.apache.hadoop.mapred.OutputCollector;
     29import org.apache.hadoop.mapred.Reducer;
     30import org.apache.hadoop.mapred.Reporter;
     31import org.apache.hadoop.mapred.TextInputFormat;
     32import org.apache.hadoop.mapred.TextOutputFormat;
     33import org.apache.hadoop.util.GenericOptionsParser;
     34import org.apache.hadoop.util.StringUtils;
     35import org.apache.hadoop.util.Tool;
     36import org.apache.hadoop.util.ToolRunner;
     37
     38//WordCountV2
     39//說明:
     40//      用於字數統計,並且增加略過大小寫辨識、符號篩除等功能
     41//
     42//測試方法:
     43//      將此程式運作在hadoop 0.20 平台上,執行:
     44//      ---------------------------
     45//      hadoop jar WordCountV2.jar -Dwordcount.case.sensitive=false \
     46//      <input> <output> -skip patterns/patterns.txt
     47//      ---------------------------
     48//
     49//注意:
     50//1.    在hdfs 上來源檔案的路徑為 你所指定的 <input>
     51//      請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
     52//2.    運算完後,程式將執行結果放在hdfs 的輸出路徑為 你所指定的 <output>
     53//3.    請建立一個資料夾 pattern 並在裡面放置pattern.txt,內容如下(一行一個,前置提示符號\)
     54//                      \.
     55//              \,
     56//              \!
     57
     58@SuppressWarnings("deprecation")
     59public class WordCountV2 extends Configured implements Tool {
     60
     61        public static class Map extends MapReduceBase implements
     62                        Mapper<LongWritable, Text, Text, IntWritable> {
     63
     64                static enum Counters {
     65                        INPUT_WORDS
     66                }
     67
     68                private final static IntWritable one = new IntWritable(1);
     69                private Text word = new Text();
     70
     71                private boolean caseSensitive = true;
     72                private Set<String> patternsToSkip = new HashSet<String>();
     73
     74                private long numRecords = 0;
     75                private String inputFile;
     76
     77                public void configure(JobConf job) {
     78                        caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
     79                        inputFile = job.get("map.input.file");
     80
     81                        if (job.getBoolean("wordcount.skip.patterns", false)) {
     82                                Path[] patternsFiles = new Path[0];
     83                                try {
     84                                        patternsFiles = DistributedCache.getLocalCacheFiles(job);
     85                                } catch (IOException ioe) {
     86                                        System.err
     87                                                        .println("Caught exception while getting cached files: "
     88                                                                        + StringUtils.stringifyException(ioe));
     89                                }
     90                                for (Path patternsFile : patternsFiles) {
     91                                        parseSkipFile(patternsFile);
     92                                }
     93                        }
     94                }
     95
     96                private void parseSkipFile(Path patternsFile) {
     97                        try {
     98                                BufferedReader fis = new BufferedReader(new FileReader(
     99                                                patternsFile.toString()));
     100                                String pattern = null;
     101                                while ((pattern = fis.readLine()) != null) {
     102                                        patternsToSkip.add(pattern);
     103                                }
     104                        } catch (IOException ioe) {
     105                                System.err
     106                                                .println("Caught exception while parsing the cached file '"
     107                                                                + patternsFile
     108                                                                + "' : "
     109                                                                + StringUtils.stringifyException(ioe));
     110                        }
     111                }
     112
     113                public void map(LongWritable key, Text value,
     114                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     115                                throws IOException {
     116                        String line = (caseSensitive) ? value.toString() : value.toString()
     117                                        .toLowerCase();
     118
     119                        for (String pattern : patternsToSkip) {
     120                                line = line.replaceAll(pattern, "");
     121                        }
     122
     123                        StringTokenizer tokenizer = new StringTokenizer(line);
     124                        while (tokenizer.hasMoreTokens()) {
     125                                word.set(tokenizer.nextToken());
     126                                output.collect(word, one);
     127                                reporter.incrCounter(Counters.INPUT_WORDS, 1);
     128                        }
     129
     130                        if ((++numRecords % 100) == 0) {
     131                                reporter.setStatus("Finished processing " + numRecords
     132                                                + " records " + "from the input file: " + inputFile);
     133                        }
     134                }
     135        }
     136
     137        public static class Reduce extends MapReduceBase implements
     138                        Reducer<Text, IntWritable, Text, IntWritable> {
     139                public void reduce(Text key, Iterator<IntWritable> values,
     140                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     141                                throws IOException {
     142                        int sum = 0;
     143                        while (values.hasNext()) {
     144                                sum += values.next().get();
     145                        }
     146                        output.collect(key, new IntWritable(sum));
     147                }
     148        }
     149
     150        public int run(String[] args) throws Exception {
     151
     152                JobConf conf = new JobConf(getConf(), WordCount.class);
     153                conf.setJobName("wordcount");
     154                String[] otherArgs = new GenericOptionsParser(conf, args)
     155                                .getRemainingArgs();
     156                if (otherArgs.length < 2) {
     157                        System.out.println("WordCountV2 [-Dwordcount.case.sensitive=<false|true>] \\ ");
     158                        System.out.println("            <inDir> <outDir> [-skip Pattern_file]");
     159                        return 0;
     160                }
     161                conf.setOutputKeyClass(Text.class);
     162                conf.setOutputValueClass(IntWritable.class);
     163
     164                conf.setMapperClass(Map.class);
     165                conf.setCombinerClass(Reduce.class);
     166                conf.setReducerClass(Reduce.class);
     167
     168                conf.setInputFormat(TextInputFormat.class);
     169                conf.setOutputFormat(TextOutputFormat.class);
     170
     171                List<String> other_args = new ArrayList<String>();
     172                for (int i = 0; i < args.length; ++i) {
     173                        if ("-skip".equals(args[i])) {
     174                                DistributedCache
     175                                                .addCacheFile(new Path(args[++i]).toUri(), conf);
     176                                conf.setBoolean("wordcount.skip.patterns", true);
     177                        } else {
     178                                other_args.add(args[i]);
     179                        }
     180                }
     181
     182                FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
     183                FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
     184                CheckAndDelete.checkAndDelete(other_args.get(1), conf);
     185                JobClient.runJob(conf);
     186                return 0;
     187        }
     188
     189        public static void main(String[] args) throws Exception {
     190//              String[] argv = { "-Dwordcount.case.sensitive=false", "input",
     191//                              "output-wc2", "-skip", "patterns/patterns.txt" };
     192//              args = argv;
     193
     194                int res = ToolRunner.run(new Configuration(), new WordCountV2(), args);
     195                System.exit(res);
     196        }
     197}
     198
     199}}}