說明
此範例使用"執行緒"一次執行兩個不同的"hadoop job"
以及在程式中使用 conf.set 來修改 hadoop 環境設定,如
conf.set("mapred.job.tracker", "local"); // for single
conf.set("fs.default.name", "file:///"); // for single
package org.nchc.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ThreadGo extends Thread {
Configuration conf ;
Job job ;
long StartTime ;
public ThreadGo() {
conf = new Configuration();
// conf.set("mapred.job.tracker", "local"); // for single
// conf.set("fs.default.name", "file:///"); // for single
StartTime = System.currentTimeMillis();
}
public void run() // throws Exception
{
try {
System.err.println(conf.get("fs.default.name"));
job.waitForCompletion(true);
System.out.println("[final] "+ job.getJobName() +" run" + (System.currentTimeMillis() - StartTime) / 1E+3 +" secs.");
} catch (IOException e) {
System.err.println("[Error] " + job.getJobName() + " = IOException");
System.err.println(e.getMessage());
} catch (InterruptedException e) {
System.err.println("[Error] " + job.getJobName() + " = InterruptedException" );
System.err.println(e.getMessage());
} catch (ClassNotFoundException e) {
System.err.println("[Error] " + job.getJobName() + " ClassNotFoundException");
System.err.println(e.getMessage());
}
}
public void SetJob1(String input, String output) throws Exception {
job = new Job(conf, "Th1_HelloWorld");
job.setJarByClass(HelloHadoopV2.class);
job.setMapperClass(HelloMapperV2.class);
job.setCombinerClass(HelloReducerV2.class);
job.setReducerClass(HelloReducerV2.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, new Path(output));
}
public void SetJob2(String input, String output) throws Exception {
// JobConf conf = new JobConf(DBWordCount.class);
job = new Job(conf, "Th2_WordCount");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
}
public static void main(String[] args) throws Exception {
long StartTime = System.currentTimeMillis();
ThreadGo thread1 = new ThreadGo();
thread1.SetJob1("./input1","./output1");
ThreadGo thread2 = new ThreadGo();
thread2.SetJob2("./input2","./output2");
thread1.start();
thread2.start();
System.out.println("[main] "+ (System.currentTimeMillis() - StartTime) / 1E+3 +" secs.");
}
}