wiki:ProductEmp

package datatable;

/*
 * Info :
 *    http://trac.nchc.org.tw/cloud/wiki/waue/2012/0117
 */

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ProductEmp {

  public static class DtMap extends
      Mapper<LongWritable, Text, Text, IntWritable> {
    private IntWritable one = new IntWritable(1);

    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String s[] = value.toString().trim().split(":");
      String str = s[1];
      context.write(new Text(str), one);
    }
  }

  public static class DtReduce extends Reducer<Text, IntWritable, Text, Text> {

    public HashMap<String, Integer> hm_product_map = new HashMap<String, Integer>();
    public HashMap<String, String> hm_product_name_map = new HashMap<String, String>();
    private Configuration conf;

    
    protected void setup(Context context) throws IOException,
        InterruptedException {
      // put the product list into HashMap for querying
      conf = context.getConfiguration();

      String str = conf.get("dt_product_list");
      // 取得到 編號1=價格1=名稱1, 編號2=價格2=名稱2...
      String[] arr = str.split(",");
      String[] tmp_arr;
      for (String tmp : arr) {
        tmp_arr = tmp.split("=");
        hm_product_map.put(tmp_arr[0], new Integer(tmp_arr[1]));
        hm_product_name_map.put(tmp_arr[0], tmp_arr[2]);
      }
      System.err.println(hm_product_map.toString());
      System.err.println(hm_product_name_map.toString());
    }

    public void reduce(Text key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {
      // <key,value> 為 <編號, 串列(111..1)>

      int count = 0, price = 0, sales = 0;
      // 取得每個 key 的(111..1)計數,相加,得到 數量
      for (IntWritable val : values) {
        count += val.get();
      }
      // 取得編號的單價
      String id = key.toString().trim();
      String p_name = "";
      //// System.err.println("id=" + id + " <e> ");
      if (id != null) {
        price = (hm_product_map.get(id) == null) ? 0 : hm_product_map
            .get(id);
        p_name = hm_product_name_map.get(id);

      }
      //// System.err.println("price=" + price + " <e> ");
      //// System.err.println("name=" + p_name + " <e> ");

      // 銷售額 = 數量 X 單價
      if (count != 0 && price != 0)
        sales = count * price;

      // 輸出的value 設定為 銷售額

      Text t_info = new Text("[\'" + id + "\',\'" + p_name + "\',");
      Text t_result = new Text("\'" + price + "\',\'" + count + "\',\'"
          + sales + "\'],");
      // 輸出新的 <key,value> 
      context.write(t_info, t_result);
    }
  }

  static public boolean checkAndDelete(String out, Configuration conf) {
    Path dstPath = new Path(out);
    try {
      FileSystem hdfs = dstPath.getFileSystem(conf);
      if (hdfs.exists(dstPath)) {
        hdfs.delete(dstPath, true);
      }

    } catch (IOException e) {
      e.printStackTrace();
      return false;
    }
    return true;
  }

  static boolean putToHdfs(String src, String dst, Configuration conf) {
    Path dstPath = new Path(dst);
    try {
      // 產生操作hdfs的物件
      FileSystem hdfs = dstPath.getFileSystem(conf);
      // 上傳
      hdfs.copyFromLocalFile(false, new Path(src), new Path(dst));

    } catch (IOException e) {
      e.printStackTrace();
      return false;
    }
    return true;
  }

  static public boolean deleteFolder(File dir) {
    File filelist[] = dir.listFiles();
    int listlen = filelist.length;
    for (int i = 0; i < listlen; i++) {
      if (filelist[i].isDirectory()) {
        deleteFolder(filelist[i]);
      } else {
        filelist[i].delete();
      }
    }
    return dir.delete();
  }

  public static boolean download_dt_js(String from, String dst,
      Configuration conf) {

    Path hdfs_result_path = new Path(from);
    Path local_result_path = new Path(dst);
    try {
      FileSystem hdfs = hdfs_result_path.getFileSystem(conf);
      if (hdfs.exists(hdfs_result_path)) {
        hdfs.copyFromLocalFile(hdfs_result_path, local_result_path);

      }

    } catch (IOException e) {
      e.printStackTrace();
      return false;
    }
    return true;

  }

  public static boolean transfer_dt_js(String from, String dst) {

    String b = null;
    String header = "var aDataSet = [";
    String footer = "];";
    File file = new File(dst);
    BufferedReader br;
    BufferedWriter bw;
    if (!file.exists() != false) {
      try {
        file.createNewFile();

      } catch (IOException e) {
        e.printStackTrace();
      }
    }
    try {
      bw = new BufferedWriter(new FileWriter(file));
      bw.write(header);
      FileReader fr = new FileReader(from);
      br = new BufferedReader(fr);
      while ((b = br.readLine()) != null) {
        //// System.out.println(b);
        bw.write(b);
        bw.newLine();
        bw.flush();
      }
      bw.write(footer);
      bw.close();
      br.close();
    } catch (Exception e) {

      System.err.println(e.getMessage());
      return false;
    }
    return true;
  }

  public static void main(String args[]) throws Exception {

    // (0). 設定系統環境參數

    /*  // for linux
      String local_store_product_list = "/tmp/ProductEmp/conf/store.txt";
      String local_input_dir = "/tmp/ProductEmp/input";
      String hdfs_input_dir = "/tmp/ProductEmp/hdfs";
      String hdfs_output_dir = "/tmp/ProductEmp/out";
      String local_download_out_file = "/tmp/ProductEmp/result/tmp.log";
      String local_db_file = "/tmp/ProductEmp/www-web/db.js";
  */
    // for windows
      String local_store_product_list = "C:\\hadoop4win\\tmp\\ProductEmp\\conf\\store.txt";
      String local_input_dir = "C:\\hadoop4win\\tmp\\ProductEmp\\input";
      String hdfs_input_dir = "C:\\hadoop4win\\tmp\\ProductEmp\\hdfs";
      String hdfs_output_dir = "C:\\hadoop4win\\tmp\\ProductEmp\\out";
      String local_download_out_file = "C:\\hadoop4win\\tmp\\ProductEmp\\result\\tmp.log";
      String local_db_file = "C:\\hadoop4win\\tmp\\ProductEmp\\www-web\\db.js";
    



    // (1). 設定hadoop 參數
    Configuration conf = new Configuration();

    // 1.1 設定成單機測試模式
    conf.set("mapred.job.tracker", "local"); // for single
    conf.set("fs.default.name", "file:///"); // for single

    // (2) 取得商品資訊
    // 2.1 : 0.編號 1.名稱 2.價格
    String str, product_list = "";
    String[] arr;
    BufferedReader bfr = new BufferedReader(new FileReader(new File(
        local_store_product_list)));
    while ((str = bfr.readLine()) != null) {
      arr = str.split(";");

      // 2.2 編號=價格=名稱
      product_list += arr[0].trim() + "=" + arr[2].trim() + "="
          + arr[1].trim() + ",";
    }
    System.out.println("您設定的商品清單:" + product_list);
    conf.set("dt_product_list", product_list);

    // (3) hdfs 操作
    // 3.1 檢查hdfs 上的結果目錄是否存在,已經在的話刪除之
    if (checkAndDelete(hdfs_output_dir, conf))
      System.out.println("已刪除 舊的輸出結果 :" + hdfs_output_dir);
    if (checkAndDelete(hdfs_input_dir, conf))
      System.out.println("已刪除 舊的輸入資料 :" + hdfs_input_dir);

    // 3.2 將 local input 送上 hdfs
    if (putToHdfs(local_input_dir, hdfs_input_dir, conf))
      System.out.println("已將 新的輸入資料 送上 hdfs :" + local_input_dir + " => "
          + hdfs_input_dir);

    // (4) hadoop 運算設定
    Job job = new Job(conf, "Product Calculate");
    job.setJarByClass(ProductEmp.class);
    job.setMapperClass(DtMap.class);
    job.setReducerClass(DtReduce.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(hdfs_input_dir));
    FileOutputFormat.setOutputPath(job, new Path(hdfs_output_dir));
    // 4.n 送出運算指令
    if ( job.waitForCompletion(true) )
      System.out.println("hadoop 運算完成");
    
    // (5) 下載
    if ( download_dt_js(hdfs_output_dir + "/part-r-00000",
        local_download_out_file, conf) ){
      System.out.println("從 hdfs 下載到 local 完成");
    }else{
      System.err.println("[失敗] 從 hdfs 下載到 local 失敗");
    }
      
    // (6) 改寫
    if (  transfer_dt_js(local_download_out_file, local_db_file)  ){
      System.out.println("轉換資料格式完成,請將" + local_db_file + "放到 網頁伺服器");
    }else{
      System.err.println("[失敗] 轉換成database失敗");
    }

  }
}







Last modified 12 years ago Last modified on Jan 16, 2012, 10:49:15 PM