Changes between Initial Version and Version 1 of NCHCCloudCourse100928_4_EXM3_sol


Ignore:
Timestamp:
Jul 21, 2011, 5:45:47 PM (13 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • NCHCCloudCourse100928_4_EXM3_sol

    v1 v1  
     1{{{
     2#!text
     3
     4package org.nchc.hadoop;
     5import java.io.IOException;
     6
     7import org.apache.hadoop.conf.Configuration;
     8import org.apache.hadoop.fs.Path;
     9import org.apache.hadoop.io.Text;
     10import org.apache.hadoop.mapreduce.Job;
     11import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     12import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     13import org.apache.hadoop.util.GenericOptionsParser;
     14
     15public class HelloHadoopV3 {
     16
     17  public static void main(String[] args) throws IOException,
     18      InterruptedException, ClassNotFoundException {
     19   
     20    // eclipse using
     21//    String[] argv = {"/home/hadoop/input","/home/hadoop/output-hh3"};
     22//    args = argv;
     23   
     24    String hdfs_input = "HH3_input";
     25    String hdfs_output = "HH3_output";
     26     
     27    Configuration conf = new Configuration();
     28    // 宣告取得參數
     29    String[] otherArgs = new GenericOptionsParser(conf, args)
     30        .getRemainingArgs();
     31    // 如果參數數量不為2 則印出提示訊息
     32    if (otherArgs.length != 2) {
     33      System.err
     34          .println("Usage: hadoop jar HelloHadoopV3.jar <local_input> <local_output>");
     35      System.exit(2);
     36    }
     37    Job job = new Job(conf, "Hadoop Hello World");
     38    job.setJarByClass(HelloHadoopV3.class);
     39    // set map and reduce class
     40    job.setMapperClass(); // 輸入正確的mapper
     41    job.setCombinerClass(); // 輸入正確的 class
     42    job.setReducerClass(); // 輸入正確的 reducer
     43
     44    job.setMapOutputKeyClass(Text.class);
     45    job.setMapOutputValueClass(Text.class);
     46
     47    job.setOutputKeyClass(Text.class);
     48    job.setOutputValueClass(Text.class);
     49   
     50   
     51    // 用  checkAndDelete 函式防止overhead的錯誤
     52    CheckAndDelete.checkAndDelete(hdfs_input, conf);
     53    CheckAndDelete.checkAndDelete(hdfs_output, conf);
     54   
     55    // 放檔案到hdfs
     56    PutToHdfs.putToHdfs(args[0], hdfs_input, conf);
     57   
     58    // 設定hdfs 的輸入輸出來源路定
     59    FileInputFormat.addInputPath(job, new Path(hdfs_input));
     60    FileOutputFormat.setOutputPath(job, new Path(hdfs_output));
     61
     62   
     63    long start = System.nanoTime();
     64
     65    job.waitForCompletion(true);
     66   
     67    // 把hdfs的結果取下
     68    GetFromHdfs.getFromHdfs(hdfs_output, args[1], conf);
     69
     70    boolean status = job.waitForCompletion(true);
     71    // 計算時間
     72    if (status) {
     73      System.err.println("Integrate Alert Job Finished !");
     74      long time = System.nanoTime() - start;
     75      System.err.println(time * (1E-9) + " secs.");
     76
     77    } else {
     78      System.err.println("Integrate Alert Job Failed !");
     79      System.exit(1);
     80    }
     81  }
     82}
     83
     84
     85}}}