package datatable;
/*
* Info :
* http://trac.nchc.org.tw/cloud/wiki/waue/2012/0117
*/
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ProductEmp {
public static class DtMap extends
Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String s[] = value.toString().trim().split(":");
String str = s[1];
context.write(new Text(str), one);
}
}
public static class DtReduce extends Reducer<Text, IntWritable, Text, Text> {
public HashMap<String, Integer> hm_product_map = new HashMap<String, Integer>();
public HashMap<String, String> hm_product_name_map = new HashMap<String, String>();
private Configuration conf;
protected void setup(Context context) throws IOException,
InterruptedException {
// put the product list into HashMap for querying
conf = context.getConfiguration();
String str = conf.get("dt_product_list");
// 取得到 編號1=價格1=名稱1, 編號2=價格2=名稱2...
String[] arr = str.split(",");
String[] tmp_arr;
for (String tmp : arr) {
tmp_arr = tmp.split("=");
hm_product_map.put(tmp_arr[0], new Integer(tmp_arr[1]));
hm_product_name_map.put(tmp_arr[0], tmp_arr[2]);
}
System.err.println(hm_product_map.toString());
System.err.println(hm_product_name_map.toString());
}
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// <key,value> 為 <編號, 串列(111..1)>
int count = 0, price = 0, sales = 0;
// 取得每個 key 的(111..1)計數,相加,得到 數量
for (IntWritable val : values) {
count += val.get();
}
// 取得編號的單價
String id = key.toString().trim();
String p_name = "";
//// System.err.println("id=" + id + " <e> ");
if (id != null) {
price = (hm_product_map.get(id) == null) ? 0 : hm_product_map
.get(id);
p_name = hm_product_name_map.get(id);
}
//// System.err.println("price=" + price + " <e> ");
//// System.err.println("name=" + p_name + " <e> ");
// 銷售額 = 數量 X 單價
if (count != 0 && price != 0)
sales = count * price;
// 輸出的value 設定為 銷售額
Text t_info = new Text("[\'" + id + "\',\'" + p_name + "\',");
Text t_result = new Text("\'" + price + "\',\'" + count + "\',\'"
+ sales + "\'],");
// 輸出新的 <key,value>
context.write(t_info, t_result);
}
}
static public boolean checkAndDelete(String out, Configuration conf) {
Path dstPath = new Path(out);
try {
FileSystem hdfs = dstPath.getFileSystem(conf);
if (hdfs.exists(dstPath)) {
hdfs.delete(dstPath, true);
}
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
static boolean putToHdfs(String src, String dst, Configuration conf) {
Path dstPath = new Path(dst);
try {
// 產生操作hdfs的物件
FileSystem hdfs = dstPath.getFileSystem(conf);
// 上傳
hdfs.copyFromLocalFile(false, new Path(src), new Path(dst));
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
static public boolean deleteFolder(File dir) {
File filelist[] = dir.listFiles();
int listlen = filelist.length;
for (int i = 0; i < listlen; i++) {
if (filelist[i].isDirectory()) {
deleteFolder(filelist[i]);
} else {
filelist[i].delete();
}
}
return dir.delete();
}
public static boolean download_dt_js(String from, String dst,
Configuration conf) {
Path hdfs_result_path = new Path(from);
Path local_result_path = new Path(dst);
try {
FileSystem hdfs = hdfs_result_path.getFileSystem(conf);
if (hdfs.exists(hdfs_result_path)) {
hdfs.copyFromLocalFile(hdfs_result_path, local_result_path);
}
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public static boolean transfer_dt_js(String from, String dst) {
String b = null;
String header = "var aDataSet = [";
String footer = "];";
File file = new File(dst);
BufferedReader br;
BufferedWriter bw;
if (!file.exists() != false) {
try {
file.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
bw = new BufferedWriter(new FileWriter(file));
bw.write(header);
FileReader fr = new FileReader(from);
br = new BufferedReader(fr);
while ((b = br.readLine()) != null) {
//// System.out.println(b);
bw.write(b);
bw.newLine();
bw.flush();
}
bw.write(footer);
bw.close();
br.close();
} catch (Exception e) {
System.err.println(e.getMessage());
return false;
}
return true;
}
public static void main(String args[]) throws Exception {
// (0). 設定系統環境參數
String local_store_product_list = "/tmp/ProductEmp/conf/store.txt";
String local_input_dir = "/tmp/ProductEmp/input";
String hdfs_input_dir = "/tmp/ProductEmp/hdfs";
String hdfs_output_dir = "/tmp/ProductEmp/out";
String local_download_out_file = "/tmp/ProductEmp/result/tmp.log";
String local_db_file = "/tmp/ProductEmp/www-web/db.js";
/*
* // 參數設定功能,暫不開放 String[] argc = { local_input_dir, hdfs_output_dir,
* local_store_product_list }; args = argc;
*
* if (args.length != 3) { System.err
* .println("Usage: hadoop jar ProductEmp.jar <input> <output> <store.txt>"
* ); System.exit(2); }
*/
// (1). 設定hadoop 參數
Configuration conf = new Configuration();
// 1.1 設定成單機測試模式
conf.set("mapred.job.tracker", "local"); // for single
conf.set("fs.default.name", "file:///"); // for single
// (2) 取得商品資訊
// 2.1 : 0.編號 1.名稱 2.價格
String str, product_list = "";
String[] arr;
BufferedReader bfr = new BufferedReader(new FileReader(new File(
local_store_product_list)));
while ((str = bfr.readLine()) != null) {
arr = str.split(";");
// 2.2 編號=價格=名稱
product_list += arr[0].trim() + "=" + arr[2].trim() + "="
+ arr[1].trim() + ",";
}
System.out.println("您設定的商品清單:" + product_list);
conf.set("dt_product_list", product_list);
// (3) hdfs 操作
// 3.1 檢查hdfs 上的結果目錄是否存在,已經在的話刪除之
if (checkAndDelete(hdfs_output_dir, conf))
System.out.println("已刪除 舊的輸出結果 :" + hdfs_output_dir);
if (checkAndDelete(hdfs_input_dir, conf))
System.out.println("已刪除 舊的輸入資料 :" + hdfs_input_dir);
// 3.2 將 local input 送上 hdfs
if (putToHdfs(local_input_dir, hdfs_input_dir, conf))
System.out.println("已將 新的輸入資料 送上 hdfs :" + local_input_dir + " => "
+ hdfs_input_dir);
// (4) hadoop 運算設定
Job job = new Job(conf, "Product Calculate");
job.setJarByClass(ProductEmp.class);
job.setMapperClass(DtMap.class);
job.setReducerClass(DtReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(hdfs_input_dir));
FileOutputFormat.setOutputPath(job, new Path(hdfs_output_dir));
// 4.n 送出運算指令
if ( job.waitForCompletion(true) )
System.out.println("hadoop 運算完成");
// (5) 下載
if ( download_dt_js(hdfs_output_dir + "/part-r-00000",
local_download_out_file, conf) ){
System.out.println("從 hdfs 下載到 local 完成");
}else{
System.err.println("[失敗] 從 hdfs 下載到 local 失敗");
}
// (6) 改寫
if ( transfer_dt_js(local_download_out_file, local_db_file) ){
System.out.println("轉換資料格式完成,請將" + local_db_file + "放到 網頁伺服器");
}else{
System.err.println("[失敗] 轉換成database失敗");
}
}
}