wiki:NCHCCloudCourse100928_4_EXM8

Version 3 (modified by waue, 13 years ago) (diff)

--

說明

此範例使用"執行緒"一次執行兩個不同的"hadoop job"
以及在程式中使用 conf.set 來修改 hadoop 環境設定,如
  conf.set("mapred.job.tracker", "local"); // for single
  conf.set("fs.default.name", "file:///"); // for single

ThreadGo?.java

package org.nchc.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ThreadGo extends Thread {
  Configuration conf ;
  Job job ;
  long StartTime ;
  public ThreadGo() {
    conf = new Configuration();
//    conf.set("mapred.job.tracker", "local"); // for single
//    conf.set("fs.default.name", "file:///"); // for single
    StartTime = System.currentTimeMillis();
  }
  
  public void run() // throws Exception
  {
      try {
        System.err.println(conf.get("fs.default.name"));
        job.waitForCompletion(true);
        System.out.println("[final] "+ job.getJobName() +" run" + (System.currentTimeMillis() - StartTime) / 1E+3 +" secs.");

      } catch (IOException e) {
        System.err.println("[Error] " + job.getJobName() + " = IOException");
        System.err.println(e.getMessage());
      } catch (InterruptedException e) {
        System.err.println("[Error] " + job.getJobName() + " = InterruptedException" );
        System.err.println(e.getMessage());
      } catch (ClassNotFoundException e) {
        System.err.println("[Error] " + job.getJobName() + " ClassNotFoundException");
        System.err.println(e.getMessage());
      }
  }

  public void SetJob1(String input, String output) throws Exception {
    job = new Job(conf, "Th1_HelloWorld");
    job.setJarByClass(HelloHadoopV2.class);
    job.setMapperClass(HelloMapperV2.class);

    job.setCombinerClass(HelloReducerV2.class);
    job.setReducerClass(HelloReducerV2.class);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.setInputPaths(job, input);
    FileOutputFormat.setOutputPath(job, new Path(output));
  }

  public void SetJob2(String input, String output) throws Exception {
    // JobConf conf = new JobConf(DBWordCount.class);
    job = new Job(conf, "Th2_WordCount");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(WordCount.TokenizerMapper.class);
    job.setCombinerClass(WordCount.IntSumReducer.class);
    job.setReducerClass(WordCount.IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(input));
    FileOutputFormat.setOutputPath(job, new Path(output));
  }

  public static void main(String[] args) throws Exception {
    
    long StartTime = System.currentTimeMillis();
    ThreadGo thread1 = new ThreadGo();
    thread1.SetJob1("./input1","./output1");
    
    ThreadGo thread2 = new ThreadGo();
    thread2.SetJob2("./input2","./output2");

    thread1.start();
    thread2.start();
    System.out.println("[main] "+  (System.currentTimeMillis() - StartTime) / 1E+3 +" secs.");

  }
}