Version 5 (modified by waue, 13 years ago) (diff) |
---|
Hadoop 進階課程
範例
說明
HelloHadoopV3 說明: 此程式碼再利用了 HelloHadoopV2 的 map , reduce 檔,並且 自動將檔案上傳到hdfs上運算並自動取回結果,還有 提示訊息 、參數輸入 與 印出運算時間 的功能 測試方法: 將此程式運作在hadoop 0.20 平台上,執行: --------------------------- hadoop jar HelloHadoopV3.jar /home/$yourname/input /home/$yourname/output-hh3 --------------------------- 注意: 1. 第一個輸入的參數是在local 的 輸入資料夾,請確認此資料夾內有資料並無子目錄 2. 第二個輸入的參數是在local 的 運算結果資料夾,由程式產生不用事先建立,若有請刪除之
HelloHadoopV3.java
package org.nchc.hadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class HelloHadoopV3 { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // eclipse using // String[] argv = {"/home/hadoop/input","/home/hadoop/output-hh3"}; // args = argv; String hdfs_input = "HH3_input"; String hdfs_output = "HH3_output"; Configuration conf = new Configuration(); // 宣告取得參數 String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); // 如果參數數量不為2 則印出提示訊息 if (otherArgs.length != 2) { System.err .println("Usage: hadoop jar HelloHadoopV3.jar <local_input> <local_output>"); System.exit(2); } Job job = new Job(conf, "Hadoop Hello World"); job.setJarByClass(HelloHadoopV3.class); // set map and reduce class job.setMapperClass(HelloMapperV2.class); job.setCombinerClass(HelloReducerV2.class); job.setReducerClass(HelloReducerV2.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 用 checkAndDelete 函式防止overhead的錯誤 CheckAndDelete.checkAndDelete(hdfs_input, conf); CheckAndDelete.checkAndDelete(hdfs_output, conf); // 放檔案到hdfs PutToHdfs.putToHdfs(args[0], hdfs_input, conf); // 設定hdfs 的輸入輸出來源路定 FileInputFormat.addInputPath(job, new Path(hdfs_input)); FileOutputFormat.setOutputPath(job, new Path(hdfs_output)); long start = System.nanoTime(); job.waitForCompletion(true); // 把hdfs的結果取下 GetFromHdfs.getFromHdfs(hdfs_output, args[1], conf); boolean status = job.waitForCompletion(true); // 計算時間 if (status) { System.err.println("Integrate Alert Job Finished !"); long time = System.nanoTime() - start; System.err.println(time * (1E-9) + " secs."); } else { System.err.println("Integrate Alert Job Failed !"); System.exit(1); } } }