Hadoop 進階課程
範例練習
說明
此範例使用"執行緒"一次執行兩個不同的"hadoop job" 以及在程式中使用 conf.set 來修改 hadoop 環境設定,如 conf.set("mapred.job.tracker", "local"); // for single conf.set("fs.default.name", "file:///"); // for single
ThreadGo.java
package org.nchc.hadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ThreadGo extends Thread { Configuration conf ; Job job ; long StartTime ; public ThreadGo() { conf = new Configuration(); // conf.set("mapred.job.tracker", "local"); // for single // conf.set("fs.default.name", "file:///"); // for single StartTime = System.currentTimeMillis(); } public void run() // throws Exception { try { System.err.println(conf.get("fs.default.name")); job.waitForCompletion(true); System.out.println("[final] "+ job.getJobName() +" run" + (System.currentTimeMillis() - StartTime) / 1E+3 +" secs."); } catch (IOException e) { System.err.println("[Error] " + job.getJobName() + " = IOException"); System.err.println(e.getMessage()); } catch (InterruptedException e) { System.err.println("[Error] " + job.getJobName() + " = InterruptedException" ); System.err.println(e.getMessage()); } catch (ClassNotFoundException e) { System.err.println("[Error] " + job.getJobName() + " ClassNotFoundException"); System.err.println(e.getMessage()); } } public void SetJob1(String input, String output) throws Exception { job = new Job(conf, "Th1_HelloWorld"); job.setJarByClass(HelloHadoopV2.class); job.setMapperClass(HelloMapperV2.class); job.setCombinerClass(HelloReducerV2.class); job.setReducerClass(HelloReducerV2.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, new Path(output)); } public void SetJob2(String input, String output) throws Exception { // JobConf conf = new JobConf(DBWordCount.class); job = new Job(conf, "Th2_WordCount"); job.setJarByClass(WordCount.class); job.setMapperClass(WordCount.TokenizerMapper.class); job.setCombinerClass(WordCount.IntSumReducer.class); job.setReducerClass(WordCount.IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); } public static void main(String[] args) throws Exception { long StartTime = System.currentTimeMillis(); ThreadGo thread1 = new ThreadGo(); thread1.SetJob1("./input1","./output1"); ThreadGo thread2 = new ThreadGo(); thread2.SetJob2("./input2","./output2"); thread1.start(); thread2.start(); System.out.println("[main] "+ (System.currentTimeMillis() - StartTime) / 1E+3 +" secs."); } }
Last modified 14 years ago
Last modified on Jul 20, 2011, 1:54:12 PM