Changes between Version 6 and Version 7 of NCHCCloudCourse100928_4_EXM5
- Timestamp:
- Aug 2, 2011, 4:36:03 PM (13 years ago)
Legend:
- Unmodified
- Added
- Removed
- Modified
-
NCHCCloudCourse100928_4_EXM5
v6 v7 13 13 {{{ 14 14 #!text 15 WordCountV2 15 WordCountV20 16 16 說明: 17 用於字數統計,並且增加略過大小寫辨識、符號篩除等功能 17 用於字數統計,並且增加略過大小寫辨識、符號篩除等功能 [已全改為 hadoop 0.20 API ] 18 18 19 19 測試方法: 20 20 將此程式運作在hadoop 0.20 平台上,執行: 21 21 --------------------------- 22 hadoop jar WordCountV2.jar -Dwordcount.case.sensitive=false \ 23 <input> <output> -skip patterns/patterns.txt 22 hadoop jar WordCountV2.jar "/home/nchc/input" "/home/nchc/output-wc2" "-c" "-skip" "/home/nchc/patterns/patterns.txt" 24 23 --------------------------- 25 24 26 25 注意: 27 1. 在hdfs 上來源檔案的路徑為 你所指定的 <input> 28 請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾 29 2. 運算完後,程式將執行結果放在hdfs 的輸出路徑為 你所指定的 <output> 30 3. 請建立一個資料夾 pattern 並在裡面放置pattern.txt,內容如下(一行一個,前置提示符號\) 26 1. 以在程式內設定<input> <output> 路徑為local 的 "/home/nchc/input" "/home/nchc/output-wc2" 27 2. 若要測試 skip功能,請建立一個"/home/nchc/patterns/patterns.txt" 檔,內容如下(一行一個,前置提示符號\) 31 28 \. 32 29 \, 33 30 \! 34 31 3. 若要測試過濾大小寫功能,請加入 -c 參數(有-c 代表 "不考慮大小寫" ) 32 4. 注意 DistributedCache , setup() , conf 參數傳遞於 main, mapper, setup 中 35 33 }}} 36 34 … … 39 37 {{{ 40 38 #!java 39 41 40 package org.nchc.hadoop; 41 42 42 import java.io.BufferedReader; 43 43 import java.io.FileReader; 44 44 import java.io.IOException; 45 import java.util.ArrayList;46 45 import java.util.HashSet; 47 import java.util.Iterator;48 import java.util.List;49 46 import java.util.Set; 50 47 import java.util.StringTokenizer; 51 48 52 49 import org.apache.hadoop.conf.Configuration; 53 import org.apache.hadoop.conf.Configured;54 50 import org.apache.hadoop.filecache.DistributedCache; 55 51 import org.apache.hadoop.fs.Path; … … 57 53 import org.apache.hadoop.io.LongWritable; 58 54 import org.apache.hadoop.io.Text; 59 import org.apache.hadoop.mapred.FileInputFormat; 60 import org.apache.hadoop.mapred.FileOutputFormat; 61 import org.apache.hadoop.mapred.JobClient; 62 import org.apache.hadoop.mapred.JobConf; 63 import org.apache.hadoop.mapred.MapReduceBase; 64 import org.apache.hadoop.mapred.Mapper; 65 import org.apache.hadoop.mapred.OutputCollector; 66 import org.apache.hadoop.mapred.Reducer; 67 import org.apache.hadoop.mapred.Reporter; 68 import org.apache.hadoop.mapred.TextInputFormat; 69 import org.apache.hadoop.mapred.TextOutputFormat; 70 import org.apache.hadoop.util.GenericOptionsParser; 55 import org.apache.hadoop.mapreduce.Job; 56 import org.apache.hadoop.mapreduce.Mapper; 57 import org.apache.hadoop.mapreduce.Reducer; 58 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 59 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 71 60 import org.apache.hadoop.util.StringUtils; 72 import org.apache.hadoop.util.Tool;73 import org.apache.hadoop.util.ToolRunner;74 61 75 @SuppressWarnings("deprecation") 76 public class WordCountV2 extends Configured implements Tool { 62 public class WordCountV20 { 77 63 78 public static class Map extends MapReduceBase implements64 public static class Map extends 79 65 Mapper<LongWritable, Text, Text, IntWritable> { 80 81 66 static enum Counters { 82 67 INPUT_WORDS … … 88 73 private boolean caseSensitive = true; 89 74 private Set<String> patternsToSkip = new HashSet<String>(); 90 91 private long numRecords = 0;92 private String inputFile;93 94 public void configure(JobConf job) {95 caseSensitive = job.getBoolean("wordcount.case.sensitive", true);96 inputFile = job.get("map.input.file");97 98 if (job.getBoolean("wordcount.skip.patterns", false)) {99 Path[] patternsFiles = new Path[0];100 try {101 patternsFiles = DistributedCache.getLocalCacheFiles(job);102 } catch (IOException ioe) {103 System.err104 .println("Caught exception while getting cached files: "105 + StringUtils.stringifyException(ioe));106 }107 for (Path patternsFile : patternsFiles) {108 parseSkipFile(patternsFile);109 }110 }111 }112 75 113 76 private void parseSkipFile(Path patternsFile) { … … 128 91 } 129 92 130 public void map(LongWritable key, Text value, 131 OutputCollector<Text, IntWritable> output, Reporter reporter) 132 throws IOException { 93 @Override 94 public void setup(Context context) { 95 Configuration conf = context.getConfiguration(); 96 caseSensitive = conf.getBoolean("wordcount.case.sensitive", true); 97 98 if (conf.getBoolean("wordcount.skip.patterns", false)) { 99 Path[] patternsFiles = new Path[0]; 100 try { 101 patternsFiles = DistributedCache.getLocalCacheFiles(conf); 102 } catch (IOException ioe) { 103 System.err 104 .println("Caught exception while getting cached files: " 105 + StringUtils.stringifyException(ioe)); 106 } 107 for (Path patternsFile : patternsFiles) { 108 parseSkipFile(patternsFile); 109 } 110 } 111 } 112 113 @Override 114 public void map(LongWritable key, Text value, Context context) 115 throws IOException, InterruptedException { 116 133 117 String line = (caseSensitive) ? value.toString() : value.toString() 134 118 .toLowerCase(); … … 141 125 while (tokenizer.hasMoreTokens()) { 142 126 word.set(tokenizer.nextToken()); 143 output.collect(word, one);144 reporter.incrCounter(Counters.INPUT_WORDS, 1); 127 context.write(word, one); 128 145 129 } 146 130 147 if ((++numRecords % 100) == 0) {148 reporter.setStatus("Finished processing " + numRecords149 + " records " + "from the input file: " + inputFile);150 }151 131 } 152 132 } 153 133 154 public static class Reduce extends MapReduceBase implements134 public static class Reduce extends 155 135 Reducer<Text, IntWritable, Text, IntWritable> { 156 public void reduce(Text key, Iterator<IntWritable> values, 157 OutputCollector<Text, IntWritable> output, Reporter reporter) 158 throws IOException { 136 private IntWritable result = new IntWritable(); 137 138 @Override 139 public void reduce(Text key, Iterable<IntWritable> values, 140 Context context) throws IOException, InterruptedException { 159 141 int sum = 0; 160 while (values.hasNext()) {161 sum += val ues.next().get();142 for (IntWritable val : values) { 143 sum += val.get(); 162 144 } 163 output.collect(key, new IntWritable(sum)); 145 result.set(sum); 146 context.write(key, result); 164 147 } 165 148 } 166 149 167 public int run(String[] args) throws Exception { 150 public static void main(String[] args) throws Exception { 151 String[] argv = { "/home/nchc/input", "/home/nchc/output-wc2", "-c", "-skip", 152 "/home/nchc/patterns/patterns.txt" }; 168 153 169 JobConf conf = new JobConf(getConf(), WordCount.class); 170 conf.setJobName("wordcount"); 171 String[] otherArgs = new GenericOptionsParser(conf, args) 172 .getRemainingArgs(); 173 if (otherArgs.length < 2) { 174 System.out.println("WordCountV2 [-Dwordcount.case.sensitive=<false|true>] \\ "); 175 System.out.println(" <inDir> <outDir> [-skip Pattern_file]"); 176 return 0; 154 args = argv; 155 Configuration conf = new Configuration(); 156 conf.set("mapred.job.tracker", "local"); // for single 157 conf.set("fs.default.name", "file:///"); // for single 158 159 if (args.length < 2) { 160 System.err 161 .println("Usage: hadoop jar WordCount.jar <input> <output> [-c] [-skip <path>]"); 162 System.exit(2); 177 163 } 178 conf.setOutputKeyClass(Text.class);179 conf.setOutputValueClass(IntWritable.class);180 164 181 conf.setMapperClass(Map.class);182 conf.setCombinerClass(Reduce.class);183 conf.setReducerClass(Reduce.class);184 185 conf.setInputFormat(TextInputFormat.class);186 conf.setOutputFormat(TextOutputFormat.class);187 188 List<String> other_args = new ArrayList<String>();189 165 for (int i = 0; i < args.length; ++i) { 190 166 if ("-skip".equals(args[i])) { … … 192 168 .addCacheFile(new Path(args[++i]).toUri(), conf); 193 169 conf.setBoolean("wordcount.skip.patterns", true); 194 } else { 195 other_args.add(args[i]); 170 } 171 if ("-c".equals(args[i])){ 172 conf.setBoolean("wordcount.case.sensitive", false); 196 173 } 197 174 } 198 175 199 FileInputFormat.setInputPaths(conf, new Path(other_args.get(0))); 200 FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); 201 CheckAndDelete.checkAndDelete(other_args.get(1), conf); 202 JobClient.runJob(conf); 203 return 0; 204 } 176 CheckAndDelete.checkAndDelete(args[1], conf); 177 Job job = new Job(conf, "Word Count"); 178 job.setJarByClass(WordCountV20.class); 179 job.setMapperClass(Map.class); 180 job.setCombinerClass(Reduce.class); 181 job.setReducerClass(Reduce.class); 182 job.setOutputKeyClass(Text.class); 183 job.setOutputValueClass(IntWritable.class); 184 FileInputFormat.addInputPath(job, new Path(args[0])); 185 FileOutputFormat.setOutputPath(job, new Path(args[1])); 205 186 206 public static void main(String[] args) throws Exception { 207 // String[] argv = { "-Dwordcount.case.sensitive=false", "/user/hadoop/input", 208 // "/user/hadoop/output-wc2", "-skip", "/user/hadoop/patterns/patterns.txt" }; 209 // args = argv; 210 211 int res = ToolRunner.run(new Configuration(), new WordCountV2(), args); 212 System.exit(res); 187 System.exit(job.waitForCompletion(true) ? 0 : 1); 213 188 } 214 189 } 215 190 216 191 }}} 217 218 219 [wiki:wc2 v.0.20]