Changes between Initial Version and Version 1 of NCHCCloudCourse100928_4_EXM5


Ignore:
Timestamp:
Sep 24, 2010, 6:31:47 PM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • NCHCCloudCourse100928_4_EXM5

    v1 v1  
     1{{{
     2#!html
     3<div style="text-align: center; color:#151B8D"><big style="font-weight: bold;"><big><big>
     4Hadoop 進階課程
     5</big></big></big></div> <div style="text-align: center; color:#7E2217"><big style="font-weight: bold;"><big>
     6範例練習
     7</big></big></div>
     8}}}
     9
     10[wiki:NCHCCloudCourse100928_4_EXM4 上一關 < ] 第五關 [wiki:NCHCCloudCourse100928_4_EXM6 > 下一關]
     11
     12
     13{{{
     14#!java
     15package org.nchc.hadoop;
     16import java.io.BufferedReader;
     17import java.io.FileReader;
     18import java.io.IOException;
     19import java.util.ArrayList;
     20import java.util.HashSet;
     21import java.util.Iterator;
     22import java.util.List;
     23import java.util.Set;
     24import java.util.StringTokenizer;
     25
     26import org.apache.hadoop.conf.Configuration;
     27import org.apache.hadoop.conf.Configured;
     28import org.apache.hadoop.filecache.DistributedCache;
     29import org.apache.hadoop.fs.Path;
     30import org.apache.hadoop.io.IntWritable;
     31import org.apache.hadoop.io.LongWritable;
     32import org.apache.hadoop.io.Text;
     33import org.apache.hadoop.mapred.FileInputFormat;
     34import org.apache.hadoop.mapred.FileOutputFormat;
     35import org.apache.hadoop.mapred.JobClient;
     36import org.apache.hadoop.mapred.JobConf;
     37import org.apache.hadoop.mapred.MapReduceBase;
     38import org.apache.hadoop.mapred.Mapper;
     39import org.apache.hadoop.mapred.OutputCollector;
     40import org.apache.hadoop.mapred.Reducer;
     41import org.apache.hadoop.mapred.Reporter;
     42import org.apache.hadoop.mapred.TextInputFormat;
     43import org.apache.hadoop.mapred.TextOutputFormat;
     44import org.apache.hadoop.util.GenericOptionsParser;
     45import org.apache.hadoop.util.StringUtils;
     46import org.apache.hadoop.util.Tool;
     47import org.apache.hadoop.util.ToolRunner;
     48
     49//WordCountV2
     50//說明:
     51//      用於字數統計,並且增加略過大小寫辨識、符號篩除等功能
     52//
     53//測試方法:
     54//      將此程式運作在hadoop 0.20 平台上,執行:
     55//      ---------------------------
     56//      hadoop jar WordCountV2.jar -Dwordcount.case.sensitive=false \
     57//      <input> <output> -skip patterns/patterns.txt
     58//      ---------------------------
     59//
     60//注意:
     61//1.    在hdfs 上來源檔案的路徑為 你所指定的 <input>
     62//      請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
     63//2.    運算完後,程式將執行結果放在hdfs 的輸出路徑為 你所指定的 <output>
     64//3.    請建立一個資料夾 pattern 並在裡面放置pattern.txt,內容如下(一行一個,前置提示符號\)
     65//                      \.
     66//              \,
     67//              \!
     68
     69@SuppressWarnings("deprecation")
     70public class WordCountV2 extends Configured implements Tool {
     71
     72        public static class Map extends MapReduceBase implements
     73                        Mapper<LongWritable, Text, Text, IntWritable> {
     74
     75                static enum Counters {
     76                        INPUT_WORDS
     77                }
     78
     79                private final static IntWritable one = new IntWritable(1);
     80                private Text word = new Text();
     81
     82                private boolean caseSensitive = true;
     83                private Set<String> patternsToSkip = new HashSet<String>();
     84
     85                private long numRecords = 0;
     86                private String inputFile;
     87
     88                public void configure(JobConf job) {
     89                        caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
     90                        inputFile = job.get("map.input.file");
     91
     92                        if (job.getBoolean("wordcount.skip.patterns", false)) {
     93                                Path[] patternsFiles = new Path[0];
     94                                try {
     95                                        patternsFiles = DistributedCache.getLocalCacheFiles(job);
     96                                } catch (IOException ioe) {
     97                                        System.err
     98                                                        .println("Caught exception while getting cached files: "
     99                                                                        + StringUtils.stringifyException(ioe));
     100                                }
     101                                for (Path patternsFile : patternsFiles) {
     102                                        parseSkipFile(patternsFile);
     103                                }
     104                        }
     105                }
     106
     107                private void parseSkipFile(Path patternsFile) {
     108                        try {
     109                                BufferedReader fis = new BufferedReader(new FileReader(
     110                                                patternsFile.toString()));
     111                                String pattern = null;
     112                                while ((pattern = fis.readLine()) != null) {
     113                                        patternsToSkip.add(pattern);
     114                                }
     115                        } catch (IOException ioe) {
     116                                System.err
     117                                                .println("Caught exception while parsing the cached file '"
     118                                                                + patternsFile
     119                                                                + "' : "
     120                                                                + StringUtils.stringifyException(ioe));
     121                        }
     122                }
     123
     124                public void map(LongWritable key, Text value,
     125                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     126                                throws IOException {
     127                        String line = (caseSensitive) ? value.toString() : value.toString()
     128                                        .toLowerCase();
     129
     130                        for (String pattern : patternsToSkip) {
     131                                line = line.replaceAll(pattern, "");
     132                        }
     133
     134                        StringTokenizer tokenizer = new StringTokenizer(line);
     135                        while (tokenizer.hasMoreTokens()) {
     136                                word.set(tokenizer.nextToken());
     137                                output.collect(word, one);
     138                                reporter.incrCounter(Counters.INPUT_WORDS, 1);
     139                        }
     140
     141                        if ((++numRecords % 100) == 0) {
     142                                reporter.setStatus("Finished processing " + numRecords
     143                                                + " records " + "from the input file: " + inputFile);
     144                        }
     145                }
     146        }
     147
     148        public static class Reduce extends MapReduceBase implements
     149                        Reducer<Text, IntWritable, Text, IntWritable> {
     150                public void reduce(Text key, Iterator<IntWritable> values,
     151                                OutputCollector<Text, IntWritable> output, Reporter reporter)
     152                                throws IOException {
     153                        int sum = 0;
     154                        while (values.hasNext()) {
     155                                sum += values.next().get();
     156                        }
     157                        output.collect(key, new IntWritable(sum));
     158                }
     159        }
     160
     161        public int run(String[] args) throws Exception {
     162
     163                JobConf conf = new JobConf(getConf(), WordCount.class);
     164                conf.setJobName("wordcount");
     165                String[] otherArgs = new GenericOptionsParser(conf, args)
     166                                .getRemainingArgs();
     167                if (otherArgs.length < 2) {
     168                        System.out.println("WordCountV2 [-Dwordcount.case.sensitive=<false|true>] \\ ");
     169                        System.out.println("            <inDir> <outDir> [-skip Pattern_file]");
     170                        return 0;
     171                }
     172                conf.setOutputKeyClass(Text.class);
     173                conf.setOutputValueClass(IntWritable.class);
     174
     175                conf.setMapperClass(Map.class);
     176                conf.setCombinerClass(Reduce.class);
     177                conf.setReducerClass(Reduce.class);
     178
     179                conf.setInputFormat(TextInputFormat.class);
     180                conf.setOutputFormat(TextOutputFormat.class);
     181
     182                List<String> other_args = new ArrayList<String>();
     183                for (int i = 0; i < args.length; ++i) {
     184                        if ("-skip".equals(args[i])) {
     185                                DistributedCache
     186                                                .addCacheFile(new Path(args[++i]).toUri(), conf);
     187                                conf.setBoolean("wordcount.skip.patterns", true);
     188                        } else {
     189                                other_args.add(args[i]);
     190                        }
     191                }
     192
     193                FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
     194                FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
     195                CheckAndDelete.checkAndDelete(other_args.get(1), conf);
     196                JobClient.runJob(conf);
     197                return 0;
     198        }
     199
     200        public static void main(String[] args) throws Exception {
     201//              String[] argv = { "-Dwordcount.case.sensitive=false", "/user/hadooper/input",
     202//                              "/user/hadooper/output-wc2", "-skip", "/user/hadooper/patterns/patterns.txt" };
     203//              args = argv;
     204
     205                int res = ToolRunner.run(new Configuration(), new WordCountV2(), args);
     206                System.exit(res);
     207        }
     208}
     209
     210}}}
     211
     212