|  | 1 | {{{ | 
                          |  | 2 | #!html | 
                          |  | 3 | <div style="text-align: center; color:#151B8D"><big style="font-weight: bold;"><big><big> | 
                          |  | 4 | Hadoop 進階課程 | 
                          |  | 5 | </big></big></big></div> <div style="text-align: center; color:#7E2217"><big style="font-weight: bold;"><big> | 
                          |  | 6 | 範例練習 | 
                          |  | 7 | </big></big></div> | 
                          |  | 8 | }}} | 
                          |  | 9 |  | 
                          |  | 10 | [wiki:NCHCCloudCourse100928_4_EXM4 上一關 < ] 第五關 [wiki:NCHCCloudCourse100928_4_EXM6 > 下一關] | 
                          |  | 11 |  | 
                          |  | 12 |  | 
                          |  | 13 | {{{ | 
                          |  | 14 | #!java | 
                          |  | 15 | package org.nchc.hadoop; | 
                          |  | 16 | import java.io.BufferedReader; | 
                          |  | 17 | import java.io.FileReader; | 
                          |  | 18 | import java.io.IOException; | 
                          |  | 19 | import java.util.ArrayList; | 
                          |  | 20 | import java.util.HashSet; | 
                          |  | 21 | import java.util.Iterator; | 
                          |  | 22 | import java.util.List; | 
                          |  | 23 | import java.util.Set; | 
                          |  | 24 | import java.util.StringTokenizer; | 
                          |  | 25 |  | 
                          |  | 26 | import org.apache.hadoop.conf.Configuration; | 
                          |  | 27 | import org.apache.hadoop.conf.Configured; | 
                          |  | 28 | import org.apache.hadoop.filecache.DistributedCache; | 
                          |  | 29 | import org.apache.hadoop.fs.Path; | 
                          |  | 30 | import org.apache.hadoop.io.IntWritable; | 
                          |  | 31 | import org.apache.hadoop.io.LongWritable; | 
                          |  | 32 | import org.apache.hadoop.io.Text; | 
                          |  | 33 | import org.apache.hadoop.mapred.FileInputFormat; | 
                          |  | 34 | import org.apache.hadoop.mapred.FileOutputFormat; | 
                          |  | 35 | import org.apache.hadoop.mapred.JobClient; | 
                          |  | 36 | import org.apache.hadoop.mapred.JobConf; | 
                          |  | 37 | import org.apache.hadoop.mapred.MapReduceBase; | 
                          |  | 38 | import org.apache.hadoop.mapred.Mapper; | 
                          |  | 39 | import org.apache.hadoop.mapred.OutputCollector; | 
                          |  | 40 | import org.apache.hadoop.mapred.Reducer; | 
                          |  | 41 | import org.apache.hadoop.mapred.Reporter; | 
                          |  | 42 | import org.apache.hadoop.mapred.TextInputFormat; | 
                          |  | 43 | import org.apache.hadoop.mapred.TextOutputFormat; | 
                          |  | 44 | import org.apache.hadoop.util.GenericOptionsParser; | 
                          |  | 45 | import org.apache.hadoop.util.StringUtils; | 
                          |  | 46 | import org.apache.hadoop.util.Tool; | 
                          |  | 47 | import org.apache.hadoop.util.ToolRunner; | 
                          |  | 48 |  | 
                          |  | 49 | //WordCountV2 | 
                          |  | 50 | //說明: | 
                          |  | 51 | //      用於字數統計,並且增加略過大小寫辨識、符號篩除等功能 | 
                          |  | 52 | // | 
                          |  | 53 | //測試方法: | 
                          |  | 54 | //      將此程式運作在hadoop 0.20 平台上,執行: | 
                          |  | 55 | //      --------------------------- | 
                          |  | 56 | //      hadoop jar WordCountV2.jar -Dwordcount.case.sensitive=false \ | 
                          |  | 57 | //      <input> <output> -skip patterns/patterns.txt | 
                          |  | 58 | //      --------------------------- | 
                          |  | 59 | // | 
                          |  | 60 | //注意: | 
                          |  | 61 | //1.    在hdfs 上來源檔案的路徑為 你所指定的 <input> | 
                          |  | 62 | //      請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾 | 
                          |  | 63 | //2.    運算完後,程式將執行結果放在hdfs 的輸出路徑為 你所指定的 <output> | 
                          |  | 64 | //3.    請建立一個資料夾 pattern 並在裡面放置pattern.txt,內容如下(一行一個,前置提示符號\) | 
                          |  | 65 | //                      \. | 
                          |  | 66 | //              \, | 
                          |  | 67 | //              \! | 
                          |  | 68 |  | 
                          |  | 69 | @SuppressWarnings("deprecation") | 
                          |  | 70 | public class WordCountV2 extends Configured implements Tool { | 
                          |  | 71 |  | 
                          |  | 72 | public static class Map extends MapReduceBase implements | 
                          |  | 73 | Mapper<LongWritable, Text, Text, IntWritable> { | 
                          |  | 74 |  | 
                          |  | 75 | static enum Counters { | 
                          |  | 76 | INPUT_WORDS | 
                          |  | 77 | } | 
                          |  | 78 |  | 
                          |  | 79 | private final static IntWritable one = new IntWritable(1); | 
                          |  | 80 | private Text word = new Text(); | 
                          |  | 81 |  | 
                          |  | 82 | private boolean caseSensitive = true; | 
                          |  | 83 | private Set<String> patternsToSkip = new HashSet<String>(); | 
                          |  | 84 |  | 
                          |  | 85 | private long numRecords = 0; | 
                          |  | 86 | private String inputFile; | 
                          |  | 87 |  | 
                          |  | 88 | public void configure(JobConf job) { | 
                          |  | 89 | caseSensitive = job.getBoolean("wordcount.case.sensitive", true); | 
                          |  | 90 | inputFile = job.get("map.input.file"); | 
                          |  | 91 |  | 
                          |  | 92 | if (job.getBoolean("wordcount.skip.patterns", false)) { | 
                          |  | 93 | Path[] patternsFiles = new Path[0]; | 
                          |  | 94 | try { | 
                          |  | 95 | patternsFiles = DistributedCache.getLocalCacheFiles(job); | 
                          |  | 96 | } catch (IOException ioe) { | 
                          |  | 97 | System.err | 
                          |  | 98 | .println("Caught exception while getting cached files: " | 
                          |  | 99 | + StringUtils.stringifyException(ioe)); | 
                          |  | 100 | } | 
                          |  | 101 | for (Path patternsFile : patternsFiles) { | 
                          |  | 102 | parseSkipFile(patternsFile); | 
                          |  | 103 | } | 
                          |  | 104 | } | 
                          |  | 105 | } | 
                          |  | 106 |  | 
                          |  | 107 | private void parseSkipFile(Path patternsFile) { | 
                          |  | 108 | try { | 
                          |  | 109 | BufferedReader fis = new BufferedReader(new FileReader( | 
                          |  | 110 | patternsFile.toString())); | 
                          |  | 111 | String pattern = null; | 
                          |  | 112 | while ((pattern = fis.readLine()) != null) { | 
                          |  | 113 | patternsToSkip.add(pattern); | 
                          |  | 114 | } | 
                          |  | 115 | } catch (IOException ioe) { | 
                          |  | 116 | System.err | 
                          |  | 117 | .println("Caught exception while parsing the cached file '" | 
                          |  | 118 | + patternsFile | 
                          |  | 119 | + "' : " | 
                          |  | 120 | + StringUtils.stringifyException(ioe)); | 
                          |  | 121 | } | 
                          |  | 122 | } | 
                          |  | 123 |  | 
                          |  | 124 | public void map(LongWritable key, Text value, | 
                          |  | 125 | OutputCollector<Text, IntWritable> output, Reporter reporter) | 
                          |  | 126 | throws IOException { | 
                          |  | 127 | String line = (caseSensitive) ? value.toString() : value.toString() | 
                          |  | 128 | .toLowerCase(); | 
                          |  | 129 |  | 
                          |  | 130 | for (String pattern : patternsToSkip) { | 
                          |  | 131 | line = line.replaceAll(pattern, ""); | 
                          |  | 132 | } | 
                          |  | 133 |  | 
                          |  | 134 | StringTokenizer tokenizer = new StringTokenizer(line); | 
                          |  | 135 | while (tokenizer.hasMoreTokens()) { | 
                          |  | 136 | word.set(tokenizer.nextToken()); | 
                          |  | 137 | output.collect(word, one); | 
                          |  | 138 | reporter.incrCounter(Counters.INPUT_WORDS, 1); | 
                          |  | 139 | } | 
                          |  | 140 |  | 
                          |  | 141 | if ((++numRecords % 100) == 0) { | 
                          |  | 142 | reporter.setStatus("Finished processing " + numRecords | 
                          |  | 143 | + " records " + "from the input file: " + inputFile); | 
                          |  | 144 | } | 
                          |  | 145 | } | 
                          |  | 146 | } | 
                          |  | 147 |  | 
                          |  | 148 | public static class Reduce extends MapReduceBase implements | 
                          |  | 149 | Reducer<Text, IntWritable, Text, IntWritable> { | 
                          |  | 150 | public void reduce(Text key, Iterator<IntWritable> values, | 
                          |  | 151 | OutputCollector<Text, IntWritable> output, Reporter reporter) | 
                          |  | 152 | throws IOException { | 
                          |  | 153 | int sum = 0; | 
                          |  | 154 | while (values.hasNext()) { | 
                          |  | 155 | sum += values.next().get(); | 
                          |  | 156 | } | 
                          |  | 157 | output.collect(key, new IntWritable(sum)); | 
                          |  | 158 | } | 
                          |  | 159 | } | 
                          |  | 160 |  | 
                          |  | 161 | public int run(String[] args) throws Exception { | 
                          |  | 162 |  | 
                          |  | 163 | JobConf conf = new JobConf(getConf(), WordCount.class); | 
                          |  | 164 | conf.setJobName("wordcount"); | 
                          |  | 165 | String[] otherArgs = new GenericOptionsParser(conf, args) | 
                          |  | 166 | .getRemainingArgs(); | 
                          |  | 167 | if (otherArgs.length < 2) { | 
                          |  | 168 | System.out.println("WordCountV2 [-Dwordcount.case.sensitive=<false|true>] \\ "); | 
                          |  | 169 | System.out.println("            <inDir> <outDir> [-skip Pattern_file]"); | 
                          |  | 170 | return 0; | 
                          |  | 171 | } | 
                          |  | 172 | conf.setOutputKeyClass(Text.class); | 
                          |  | 173 | conf.setOutputValueClass(IntWritable.class); | 
                          |  | 174 |  | 
                          |  | 175 | conf.setMapperClass(Map.class); | 
                          |  | 176 | conf.setCombinerClass(Reduce.class); | 
                          |  | 177 | conf.setReducerClass(Reduce.class); | 
                          |  | 178 |  | 
                          |  | 179 | conf.setInputFormat(TextInputFormat.class); | 
                          |  | 180 | conf.setOutputFormat(TextOutputFormat.class); | 
                          |  | 181 |  | 
                          |  | 182 | List<String> other_args = new ArrayList<String>(); | 
                          |  | 183 | for (int i = 0; i < args.length; ++i) { | 
                          |  | 184 | if ("-skip".equals(args[i])) { | 
                          |  | 185 | DistributedCache | 
                          |  | 186 | .addCacheFile(new Path(args[++i]).toUri(), conf); | 
                          |  | 187 | conf.setBoolean("wordcount.skip.patterns", true); | 
                          |  | 188 | } else { | 
                          |  | 189 | other_args.add(args[i]); | 
                          |  | 190 | } | 
                          |  | 191 | } | 
                          |  | 192 |  | 
                          |  | 193 | FileInputFormat.setInputPaths(conf, new Path(other_args.get(0))); | 
                          |  | 194 | FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); | 
                          |  | 195 | CheckAndDelete.checkAndDelete(other_args.get(1), conf); | 
                          |  | 196 | JobClient.runJob(conf); | 
                          |  | 197 | return 0; | 
                          |  | 198 | } | 
                          |  | 199 |  | 
                          |  | 200 | public static void main(String[] args) throws Exception { | 
                          |  | 201 | //              String[] argv = { "-Dwordcount.case.sensitive=false", "/user/hadooper/input", | 
                          |  | 202 | //                              "/user/hadooper/output-wc2", "-skip", "/user/hadooper/patterns/patterns.txt" }; | 
                          |  | 203 | //              args = argv; | 
                          |  | 204 |  | 
                          |  | 205 | int res = ToolRunner.run(new Configuration(), new WordCountV2(), args); | 
                          |  | 206 | System.exit(res); | 
                          |  | 207 | } | 
                          |  | 208 | } | 
                          |  | 209 |  | 
                          |  | 210 | }}} | 
                          |  | 211 |  | 
                          |  | 212 |  |