package datatable; /* * Info : * http://trac.nchc.org.tw/cloud/wiki/waue/2012/0117 */ import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ProductEmp { public static class DtMap extends Mapper<LongWritable, Text, Text, IntWritable> { private IntWritable one = new IntWritable(1); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s[] = value.toString().trim().split(":"); String str = s[1]; context.write(new Text(str), one); } } public static class DtReduce extends Reducer<Text, IntWritable, Text, Text> { public HashMap<String, Integer> hm_product_map = new HashMap<String, Integer>(); public HashMap<String, String> hm_product_name_map = new HashMap<String, String>(); private Configuration conf; protected void setup(Context context) throws IOException, InterruptedException { // put the product list into HashMap for querying conf = context.getConfiguration(); String str = conf.get("dt_product_list"); // 取得到 編號1=價格1=名稱1, 編號2=價格2=名稱2... String[] arr = str.split(","); String[] tmp_arr; for (String tmp : arr) { tmp_arr = tmp.split("="); hm_product_map.put(tmp_arr[0], new Integer(tmp_arr[1])); hm_product_name_map.put(tmp_arr[0], tmp_arr[2]); } System.err.println(hm_product_map.toString()); System.err.println(hm_product_name_map.toString()); } public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // <key,value> 為 <編號, 串列(111..1)> int count = 0, price = 0, sales = 0; // 取得每個 key 的(111..1)計數,相加,得到 數量 for (IntWritable val : values) { count += val.get(); } // 取得編號的單價 String id = key.toString().trim(); String p_name = ""; //// System.err.println("id=" + id + " <e> "); if (id != null) { price = (hm_product_map.get(id) == null) ? 0 : hm_product_map .get(id); p_name = hm_product_name_map.get(id); } //// System.err.println("price=" + price + " <e> "); //// System.err.println("name=" + p_name + " <e> "); // 銷售額 = 數量 X 單價 if (count != 0 && price != 0) sales = count * price; // 輸出的value 設定為 銷售額 Text t_info = new Text("[\'" + id + "\',\'" + p_name + "\',"); Text t_result = new Text("\'" + price + "\',\'" + count + "\',\'" + sales + "\'],"); // 輸出新的 <key,value> context.write(t_info, t_result); } } static public boolean checkAndDelete(String out, Configuration conf) { Path dstPath = new Path(out); try { FileSystem hdfs = dstPath.getFileSystem(conf); if (hdfs.exists(dstPath)) { hdfs.delete(dstPath, true); } } catch (IOException e) { e.printStackTrace(); return false; } return true; } static boolean putToHdfs(String src, String dst, Configuration conf) { Path dstPath = new Path(dst); try { // 產生操作hdfs的物件 FileSystem hdfs = dstPath.getFileSystem(conf); // 上傳 hdfs.copyFromLocalFile(false, new Path(src), new Path(dst)); } catch (IOException e) { e.printStackTrace(); return false; } return true; } static public boolean deleteFolder(File dir) { File filelist[] = dir.listFiles(); int listlen = filelist.length; for (int i = 0; i < listlen; i++) { if (filelist[i].isDirectory()) { deleteFolder(filelist[i]); } else { filelist[i].delete(); } } return dir.delete(); } public static boolean download_dt_js(String from, String dst, Configuration conf) { Path hdfs_result_path = new Path(from); Path local_result_path = new Path(dst); try { FileSystem hdfs = hdfs_result_path.getFileSystem(conf); if (hdfs.exists(hdfs_result_path)) { hdfs.copyFromLocalFile(hdfs_result_path, local_result_path); } } catch (IOException e) { e.printStackTrace(); return false; } return true; } public static boolean transfer_dt_js(String from, String dst) { String b = null; String header = "var aDataSet = ["; String footer = "];"; File file = new File(dst); BufferedReader br; BufferedWriter bw; if (!file.exists() != false) { try { file.createNewFile(); } catch (IOException e) { e.printStackTrace(); } } try { bw = new BufferedWriter(new FileWriter(file)); bw.write(header); FileReader fr = new FileReader(from); br = new BufferedReader(fr); while ((b = br.readLine()) != null) { //// System.out.println(b); bw.write(b); bw.newLine(); bw.flush(); } bw.write(footer); bw.close(); br.close(); } catch (Exception e) { System.err.println(e.getMessage()); return false; } return true; } public static void main(String args[]) throws Exception { // (0). 設定系統環境參數 /* // for linux String local_store_product_list = "/tmp/ProductEmp/conf/store.txt"; String local_input_dir = "/tmp/ProductEmp/input"; String hdfs_input_dir = "/tmp/ProductEmp/hdfs"; String hdfs_output_dir = "/tmp/ProductEmp/out"; String local_download_out_file = "/tmp/ProductEmp/result/tmp.log"; String local_db_file = "/tmp/ProductEmp/www-web/db.js"; */ // for windows String local_store_product_list = "C:\\hadoop4win\\tmp\\ProductEmp\\conf\\store.txt"; String local_input_dir = "C:\\hadoop4win\\tmp\\ProductEmp\\input"; String hdfs_input_dir = "C:\\hadoop4win\\tmp\\ProductEmp\\hdfs"; String hdfs_output_dir = "C:\\hadoop4win\\tmp\\ProductEmp\\out"; String local_download_out_file = "C:\\hadoop4win\\tmp\\ProductEmp\\result\\tmp.log"; String local_db_file = "C:\\hadoop4win\\tmp\\ProductEmp\\www-web\\db.js"; // (1). 設定hadoop 參數 Configuration conf = new Configuration(); // 1.1 設定成單機測試模式 conf.set("mapred.job.tracker", "local"); // for single conf.set("fs.default.name", "file:///"); // for single // (2) 取得商品資訊 // 2.1 : 0.編號 1.名稱 2.價格 String str, product_list = ""; String[] arr; BufferedReader bfr = new BufferedReader(new FileReader(new File( local_store_product_list))); while ((str = bfr.readLine()) != null) { arr = str.split(";"); // 2.2 編號=價格=名稱 product_list += arr[0].trim() + "=" + arr[2].trim() + "=" + arr[1].trim() + ","; } System.out.println("您設定的商品清單:" + product_list); conf.set("dt_product_list", product_list); // (3) hdfs 操作 // 3.1 檢查hdfs 上的結果目錄是否存在,已經在的話刪除之 if (checkAndDelete(hdfs_output_dir, conf)) System.out.println("已刪除 舊的輸出結果 :" + hdfs_output_dir); if (checkAndDelete(hdfs_input_dir, conf)) System.out.println("已刪除 舊的輸入資料 :" + hdfs_input_dir); // 3.2 將 local input 送上 hdfs if (putToHdfs(local_input_dir, hdfs_input_dir, conf)) System.out.println("已將 新的輸入資料 送上 hdfs :" + local_input_dir + " => " + hdfs_input_dir); // (4) hadoop 運算設定 Job job = new Job(conf, "Product Calculate"); job.setJarByClass(ProductEmp.class); job.setMapperClass(DtMap.class); job.setReducerClass(DtReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(hdfs_input_dir)); FileOutputFormat.setOutputPath(job, new Path(hdfs_output_dir)); // 4.n 送出運算指令 if ( job.waitForCompletion(true) ) System.out.println("hadoop 運算完成"); // (5) 下載 if ( download_dt_js(hdfs_output_dir + "/part-r-00000", local_download_out_file, conf) ){ System.out.println("從 hdfs 下載到 local 完成"); }else{ System.err.println("[失敗] 從 hdfs 下載到 local 失敗"); } // (6) 改寫 if ( transfer_dt_js(local_download_out_file, local_db_file) ){ System.out.println("轉換資料格式完成,請將" + local_db_file + "放到 網頁伺服器"); }else{ System.err.println("[失敗] 轉換成database失敗"); } } }
Last modified 13 years ago
Last modified on Jan 16, 2012, 10:49:15 PM