|   | 1 |  = Word Count V2 = | 
                  
                          |   | 2 |  | 
                  
                          |   | 3 | {{{ | 
                  
                          |   | 4 | #!java | 
                  
                          |   | 5 | import java.io.BufferedReader; | 
                  
                          |   | 6 | import java.io.FileReader; | 
                  
                          |   | 7 | import java.io.IOException; | 
                  
                          |   | 8 | import java.util.ArrayList; | 
                  
                          |   | 9 | import java.util.HashSet; | 
                  
                          |   | 10 | import java.util.Iterator; | 
                  
                          |   | 11 | import java.util.List; | 
                  
                          |   | 12 | import java.util.Set; | 
                  
                          |   | 13 | import java.util.StringTokenizer; | 
                  
                          |   | 14 |  | 
                  
                          |   | 15 | import org.apache.hadoop.conf.Configuration; | 
                  
                          |   | 16 | import org.apache.hadoop.conf.Configured; | 
                  
                          |   | 17 | import org.apache.hadoop.filecache.DistributedCache; | 
                  
                          |   | 18 | import org.apache.hadoop.fs.Path; | 
                  
                          |   | 19 | import org.apache.hadoop.io.IntWritable; | 
                  
                          |   | 20 | import org.apache.hadoop.io.LongWritable; | 
                  
                          |   | 21 | import org.apache.hadoop.io.Text; | 
                  
                          |   | 22 | import org.apache.hadoop.mapred.FileInputFormat; | 
                  
                          |   | 23 | import org.apache.hadoop.mapred.FileOutputFormat; | 
                  
                          |   | 24 | import org.apache.hadoop.mapred.JobClient; | 
                  
                          |   | 25 | import org.apache.hadoop.mapred.JobConf; | 
                  
                          |   | 26 | import org.apache.hadoop.mapred.MapReduceBase; | 
                  
                          |   | 27 | import org.apache.hadoop.mapred.Mapper; | 
                  
                          |   | 28 | import org.apache.hadoop.mapred.OutputCollector; | 
                  
                          |   | 29 | import org.apache.hadoop.mapred.Reducer; | 
                  
                          |   | 30 | import org.apache.hadoop.mapred.Reporter; | 
                  
                          |   | 31 | import org.apache.hadoop.mapred.TextInputFormat; | 
                  
                          |   | 32 | import org.apache.hadoop.mapred.TextOutputFormat; | 
                  
                          |   | 33 | import org.apache.hadoop.util.GenericOptionsParser; | 
                  
                          |   | 34 | import org.apache.hadoop.util.StringUtils; | 
                  
                          |   | 35 | import org.apache.hadoop.util.Tool; | 
                  
                          |   | 36 | import org.apache.hadoop.util.ToolRunner; | 
                  
                          |   | 37 |  | 
                  
                          |   | 38 | //WordCountV2 | 
                  
                          |   | 39 | //說明:  | 
                  
                          |   | 40 | //      用於字數統計,並且增加略過大小寫辨識、符號篩除等功能 | 
                  
                          |   | 41 | // | 
                  
                          |   | 42 | //測試方法: | 
                  
                          |   | 43 | //      將此程式運作在hadoop 0.20 平台上,執行: | 
                  
                          |   | 44 | //      --------------------------- | 
                  
                          |   | 45 | //      hadoop jar WordCountV2.jar -Dwordcount.case.sensitive=false \ | 
                  
                          |   | 46 | //      <input> <output> -skip patterns/patterns.txt | 
                  
                          |   | 47 | //      --------------------------- | 
                  
                          |   | 48 | // | 
                  
                          |   | 49 | //注意: | 
                  
                          |   | 50 | //1.    在hdfs 上來源檔案的路徑為 你所指定的 <input> | 
                  
                          |   | 51 | //      請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾 | 
                  
                          |   | 52 | //2.    運算完後,程式將執行結果放在hdfs 的輸出路徑為 你所指定的 <output> | 
                  
                          |   | 53 | //3.    請建立一個資料夾 pattern 並在裡面放置pattern.txt,內容如下(一行一個,前置提示符號\) | 
                  
                          |   | 54 | //                      \. | 
                  
                          |   | 55 | //              \, | 
                  
                          |   | 56 | //              \! | 
                  
                          |   | 57 |  | 
                  
                          |   | 58 | @SuppressWarnings("deprecation") | 
                  
                          |   | 59 | public class WordCountV2 extends Configured implements Tool { | 
                  
                          |   | 60 |  | 
                  
                          |   | 61 |         public static class Map extends MapReduceBase implements | 
                  
                          |   | 62 |                         Mapper<LongWritable, Text, Text, IntWritable> { | 
                  
                          |   | 63 |  | 
                  
                          |   | 64 |                 static enum Counters { | 
                  
                          |   | 65 |                         INPUT_WORDS | 
                  
                          |   | 66 |                 } | 
                  
                          |   | 67 |  | 
                  
                          |   | 68 |                 private final static IntWritable one = new IntWritable(1); | 
                  
                          |   | 69 |                 private Text word = new Text(); | 
                  
                          |   | 70 |  | 
                  
                          |   | 71 |                 private boolean caseSensitive = true; | 
                  
                          |   | 72 |                 private Set<String> patternsToSkip = new HashSet<String>(); | 
                  
                          |   | 73 |  | 
                  
                          |   | 74 |                 private long numRecords = 0; | 
                  
                          |   | 75 |                 private String inputFile; | 
                  
                          |   | 76 |  | 
                  
                          |   | 77 |                 public void configure(JobConf job) { | 
                  
                          |   | 78 |                         caseSensitive = job.getBoolean("wordcount.case.sensitive", true); | 
                  
                          |   | 79 |                         inputFile = job.get("map.input.file"); | 
                  
                          |   | 80 |  | 
                  
                          |   | 81 |                         if (job.getBoolean("wordcount.skip.patterns", false)) { | 
                  
                          |   | 82 |                                 Path[] patternsFiles = new Path[0]; | 
                  
                          |   | 83 |                                 try { | 
                  
                          |   | 84 |                                         patternsFiles = DistributedCache.getLocalCacheFiles(job); | 
                  
                          |   | 85 |                                 } catch (IOException ioe) { | 
                  
                          |   | 86 |                                         System.err | 
                  
                          |   | 87 |                                                         .println("Caught exception while getting cached files: " | 
                  
                          |   | 88 |                                                                         + StringUtils.stringifyException(ioe)); | 
                  
                          |   | 89 |                                 } | 
                  
                          |   | 90 |                                 for (Path patternsFile : patternsFiles) { | 
                  
                          |   | 91 |                                         parseSkipFile(patternsFile); | 
                  
                          |   | 92 |                                 } | 
                  
                          |   | 93 |                         } | 
                  
                          |   | 94 |                 } | 
                  
                          |   | 95 |  | 
                  
                          |   | 96 |                 private void parseSkipFile(Path patternsFile) { | 
                  
                          |   | 97 |                         try { | 
                  
                          |   | 98 |                                 BufferedReader fis = new BufferedReader(new FileReader( | 
                  
                          |   | 99 |                                                 patternsFile.toString())); | 
                  
                          |   | 100 |                                 String pattern = null; | 
                  
                          |   | 101 |                                 while ((pattern = fis.readLine()) != null) { | 
                  
                          |   | 102 |                                         patternsToSkip.add(pattern); | 
                  
                          |   | 103 |                                 } | 
                  
                          |   | 104 |                         } catch (IOException ioe) { | 
                  
                          |   | 105 |                                 System.err | 
                  
                          |   | 106 |                                                 .println("Caught exception while parsing the cached file '" | 
                  
                          |   | 107 |                                                                 + patternsFile | 
                  
                          |   | 108 |                                                                 + "' : " | 
                  
                          |   | 109 |                                                                 + StringUtils.stringifyException(ioe)); | 
                  
                          |   | 110 |                         } | 
                  
                          |   | 111 |                 } | 
                  
                          |   | 112 |  | 
                  
                          |   | 113 |                 public void map(LongWritable key, Text value, | 
                  
                          |   | 114 |                                 OutputCollector<Text, IntWritable> output, Reporter reporter) | 
                  
                          |   | 115 |                                 throws IOException { | 
                  
                          |   | 116 |                         String line = (caseSensitive) ? value.toString() : value.toString() | 
                  
                          |   | 117 |                                         .toLowerCase(); | 
                  
                          |   | 118 |  | 
                  
                          |   | 119 |                         for (String pattern : patternsToSkip) { | 
                  
                          |   | 120 |                                 line = line.replaceAll(pattern, ""); | 
                  
                          |   | 121 |                         } | 
                  
                          |   | 122 |  | 
                  
                          |   | 123 |                         StringTokenizer tokenizer = new StringTokenizer(line); | 
                  
                          |   | 124 |                         while (tokenizer.hasMoreTokens()) { | 
                  
                          |   | 125 |                                 word.set(tokenizer.nextToken()); | 
                  
                          |   | 126 |                                 output.collect(word, one); | 
                  
                          |   | 127 |                                 reporter.incrCounter(Counters.INPUT_WORDS, 1); | 
                  
                          |   | 128 |                         } | 
                  
                          |   | 129 |  | 
                  
                          |   | 130 |                         if ((++numRecords % 100) == 0) { | 
                  
                          |   | 131 |                                 reporter.setStatus("Finished processing " + numRecords | 
                  
                          |   | 132 |                                                 + " records " + "from the input file: " + inputFile); | 
                  
                          |   | 133 |                         } | 
                  
                          |   | 134 |                 } | 
                  
                          |   | 135 |         } | 
                  
                          |   | 136 |  | 
                  
                          |   | 137 |         public static class Reduce extends MapReduceBase implements | 
                  
                          |   | 138 |                         Reducer<Text, IntWritable, Text, IntWritable> { | 
                  
                          |   | 139 |                 public void reduce(Text key, Iterator<IntWritable> values, | 
                  
                          |   | 140 |                                 OutputCollector<Text, IntWritable> output, Reporter reporter) | 
                  
                          |   | 141 |                                 throws IOException { | 
                  
                          |   | 142 |                         int sum = 0; | 
                  
                          |   | 143 |                         while (values.hasNext()) { | 
                  
                          |   | 144 |                                 sum += values.next().get(); | 
                  
                          |   | 145 |                         } | 
                  
                          |   | 146 |                         output.collect(key, new IntWritable(sum)); | 
                  
                          |   | 147 |                 } | 
                  
                          |   | 148 |         } | 
                  
                          |   | 149 |  | 
                  
                          |   | 150 |         public int run(String[] args) throws Exception { | 
                  
                          |   | 151 |  | 
                  
                          |   | 152 |                 JobConf conf = new JobConf(getConf(), WordCount.class); | 
                  
                          |   | 153 |                 conf.setJobName("wordcount"); | 
                  
                          |   | 154 |                 String[] otherArgs = new GenericOptionsParser(conf, args) | 
                  
                          |   | 155 |                                 .getRemainingArgs(); | 
                  
                          |   | 156 |                 if (otherArgs.length < 2) { | 
                  
                          |   | 157 |                         System.out.println("WordCountV2 [-Dwordcount.case.sensitive=<false|true>] \\ "); | 
                  
                          |   | 158 |                         System.out.println("            <inDir> <outDir> [-skip Pattern_file]"); | 
                  
                          |   | 159 |                         return 0; | 
                  
                          |   | 160 |                 } | 
                  
                          |   | 161 |                 conf.setOutputKeyClass(Text.class); | 
                  
                          |   | 162 |                 conf.setOutputValueClass(IntWritable.class); | 
                  
                          |   | 163 |  | 
                  
                          |   | 164 |                 conf.setMapperClass(Map.class); | 
                  
                          |   | 165 |                 conf.setCombinerClass(Reduce.class); | 
                  
                          |   | 166 |                 conf.setReducerClass(Reduce.class); | 
                  
                          |   | 167 |  | 
                  
                          |   | 168 |                 conf.setInputFormat(TextInputFormat.class); | 
                  
                          |   | 169 |                 conf.setOutputFormat(TextOutputFormat.class); | 
                  
                          |   | 170 |  | 
                  
                          |   | 171 |                 List<String> other_args = new ArrayList<String>(); | 
                  
                          |   | 172 |                 for (int i = 0; i < args.length; ++i) { | 
                  
                          |   | 173 |                         if ("-skip".equals(args[i])) { | 
                  
                          |   | 174 |                                 DistributedCache | 
                  
                          |   | 175 |                                                 .addCacheFile(new Path(args[++i]).toUri(), conf); | 
                  
                          |   | 176 |                                 conf.setBoolean("wordcount.skip.patterns", true); | 
                  
                          |   | 177 |                         } else { | 
                  
                          |   | 178 |                                 other_args.add(args[i]); | 
                  
                          |   | 179 |                         } | 
                  
                          |   | 180 |                 } | 
                  
                          |   | 181 |  | 
                  
                          |   | 182 |                 FileInputFormat.setInputPaths(conf, new Path(other_args.get(0))); | 
                  
                          |   | 183 |                 FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); | 
                  
                          |   | 184 |                 CheckAndDelete.checkAndDelete(other_args.get(1), conf); | 
                  
                          |   | 185 |                 JobClient.runJob(conf); | 
                  
                          |   | 186 |                 return 0; | 
                  
                          |   | 187 |         } | 
                  
                          |   | 188 |  | 
                  
                          |   | 189 |         public static void main(String[] args) throws Exception { | 
                  
                          |   | 190 | //              String[] argv = { "-Dwordcount.case.sensitive=false", "input", | 
                  
                          |   | 191 | //                              "output-wc2", "-skip", "patterns/patterns.txt" }; | 
                  
                          |   | 192 | //              args = argv; | 
                  
                          |   | 193 |  | 
                  
                          |   | 194 |                 int res = ToolRunner.run(new Configuration(), new WordCountV2(), args); | 
                  
                          |   | 195 |                 System.exit(res); | 
                  
                          |   | 196 |         } | 
                  
                          |   | 197 | } | 
                  
                          |   | 198 |  | 
                  
                          |   | 199 | }}} |