Changes between Initial Version and Version 1 of ProductEmp


Ignore:
Timestamp:
Jan 16, 2012, 9:10:52 PM (12 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • ProductEmp

    v1 v1  
     1{{{
     2#!java
     3
     4
     5package datatable;
     6
     7/*
     8 * Info :
     9 *              http://trac.nchc.org.tw/cloud/wiki/waue/2012/0117
     10 */
     11
     12import java.io.BufferedReader;
     13import java.io.BufferedWriter;
     14import java.io.File;
     15import java.io.FileReader;
     16import java.io.FileWriter;
     17import java.io.IOException;
     18import java.util.HashMap;
     19
     20import org.apache.hadoop.conf.Configuration;
     21import org.apache.hadoop.fs.FileSystem;
     22import org.apache.hadoop.fs.Path;
     23import org.apache.hadoop.io.IntWritable;
     24import org.apache.hadoop.io.LongWritable;
     25import org.apache.hadoop.io.Text;
     26import org.apache.hadoop.mapreduce.Job;
     27import org.apache.hadoop.mapreduce.Mapper;
     28import org.apache.hadoop.mapreduce.Reducer;
     29import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     30import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     31
     32public class ProductEmp {
     33
     34        public static class DtMap extends
     35                        Mapper<LongWritable, Text, Text, IntWritable> {
     36                private IntWritable one = new IntWritable(1);
     37
     38                public void map(LongWritable key, Text value, Context context)
     39                                throws IOException, InterruptedException {
     40                        String s[] = value.toString().trim().split(":");
     41                        String str = s[1];
     42                        context.write(new Text(str), one);
     43                }
     44        }
     45
     46        public static class DtReduce extends Reducer<Text, IntWritable, Text, Text> {
     47
     48                public HashMap<String, Integer> hm_product_map = new HashMap<String, Integer>();
     49                public HashMap<String, String> hm_product_name_map = new HashMap<String, String>();
     50                private Configuration conf;
     51
     52               
     53                protected void setup(Context context) throws IOException,
     54                                InterruptedException {
     55                        // put the product list into HashMap for querying
     56                        conf = context.getConfiguration();
     57
     58                        String str = conf.get("dt_product_list");
     59                        // 取得到 編號1=價格1=名稱1, 編號2=價格2=名稱2...
     60                        String[] arr = str.split(",");
     61                        String[] tmp_arr;
     62                        for (String tmp : arr) {
     63                                tmp_arr = tmp.split("=");
     64                                hm_product_map.put(tmp_arr[0], new Integer(tmp_arr[1]));
     65                                hm_product_name_map.put(tmp_arr[0], tmp_arr[2]);
     66                        }
     67                        System.err.println(hm_product_map.toString());
     68                        System.err.println(hm_product_name_map.toString());
     69                }
     70
     71                public void reduce(Text key, Iterable<IntWritable> values,
     72                                Context context) throws IOException, InterruptedException {
     73                        // <key,value> 為 <編號, 串列(111..1)>
     74
     75                        int count = 0, price = 0, sales = 0;
     76                        // 取得每個 key 的(111..1)計數,相加,得到 數量
     77                        for (IntWritable val : values) {
     78                                count += val.get();
     79                        }
     80                        // 取得編號的單價
     81                        String id = key.toString().trim();
     82                        String p_name = "";
     83                        //// System.err.println("id=" + id + " <e> ");
     84                        if (id != null) {
     85                                price = (hm_product_map.get(id) == null) ? 0 : hm_product_map
     86                                                .get(id);
     87                                p_name = hm_product_name_map.get(id);
     88
     89                        }
     90                        //// System.err.println("price=" + price + " <e> ");
     91                        //// System.err.println("name=" + p_name + " <e> ");
     92
     93                        // 銷售額 = 數量 X 單價
     94                        if (count != 0 && price != 0)
     95                                sales = count * price;
     96
     97                        // 輸出的value 設定為 銷售額
     98
     99                        Text t_info = new Text("[\'" + id + "\',\'" + p_name + "\',");
     100                        Text t_result = new Text("\'" + price + "\',\'" + count + "\',\'"
     101                                        + sales + "\'],");
     102                        // 輸出新的 <key,value>
     103                        context.write(t_info, t_result);
     104                }
     105        }
     106
     107        static public boolean checkAndDelete(String out, Configuration conf) {
     108                Path dstPath = new Path(out);
     109                try {
     110                        FileSystem hdfs = dstPath.getFileSystem(conf);
     111                        if (hdfs.exists(dstPath)) {
     112                                hdfs.delete(dstPath, true);
     113                        }
     114
     115                } catch (IOException e) {
     116                        e.printStackTrace();
     117                        return false;
     118                }
     119                return true;
     120        }
     121
     122        static boolean putToHdfs(String src, String dst, Configuration conf) {
     123                Path dstPath = new Path(dst);
     124                try {
     125                        // 產生操作hdfs的物件
     126                        FileSystem hdfs = dstPath.getFileSystem(conf);
     127                        // 上傳
     128                        hdfs.copyFromLocalFile(false, new Path(src), new Path(dst));
     129
     130                } catch (IOException e) {
     131                        e.printStackTrace();
     132                        return false;
     133                }
     134                return true;
     135        }
     136
     137        static public boolean deleteFolder(File dir) {
     138                File filelist[] = dir.listFiles();
     139                int listlen = filelist.length;
     140                for (int i = 0; i < listlen; i++) {
     141                        if (filelist[i].isDirectory()) {
     142                                deleteFolder(filelist[i]);
     143                        } else {
     144                                filelist[i].delete();
     145                        }
     146                }
     147                return dir.delete();
     148        }
     149
     150        public static boolean download_dt_js(String from, String dst,
     151                        Configuration conf) {
     152
     153                Path hdfs_result_path = new Path(from);
     154                Path local_result_path = new Path(dst);
     155                try {
     156                        FileSystem hdfs = hdfs_result_path.getFileSystem(conf);
     157                        if (hdfs.exists(hdfs_result_path)) {
     158                                hdfs.copyFromLocalFile(hdfs_result_path, local_result_path);
     159
     160                        }
     161
     162                } catch (IOException e) {
     163                        e.printStackTrace();
     164                        return false;
     165                }
     166                return true;
     167
     168        }
     169
     170        public static boolean transfer_dt_js(String from, String dst) {
     171
     172                String b = null;
     173                String header = "var aDataSet = [";
     174                String footer = "];";
     175                File file = new File(dst);
     176                BufferedReader br;
     177                BufferedWriter bw;
     178                if (!file.exists() != false) {
     179                        try {
     180                                file.createNewFile();
     181
     182                        } catch (IOException e) {
     183                                e.printStackTrace();
     184                        }
     185                }
     186                try {
     187                        bw = new BufferedWriter(new FileWriter(file));
     188                        bw.write(header);
     189                        FileReader fr = new FileReader(from);
     190                        br = new BufferedReader(fr);
     191                        while ((b = br.readLine()) != null) {
     192                                //// System.out.println(b);
     193                                bw.write(b);
     194                                bw.newLine();
     195                                bw.flush();
     196                        }
     197                        bw.write(footer);
     198                        bw.close();
     199                        br.close();
     200                } catch (Exception e) {
     201
     202                        System.err.println(e.getMessage());
     203                        return false;
     204                }
     205                return true;
     206        }
     207
     208        public static void main(String args[]) throws Exception {
     209
     210                // (0). 設定系統環境參數
     211                String local_store_product_list = "/tmp/ProductEmp/conf/store.txt";
     212                String local_input_dir = "/tmp/ProductEmp/input";
     213                String hdfs_input_dir = "/tmp/ProductEmp/hdfs";
     214                String hdfs_output_dir = "/tmp/ProductEmp/out";
     215                String local_download_out_file = "/tmp/ProductEmp/result/tmp.log";
     216                String local_db_file = "/tmp/ProductEmp/www-web/db.js";
     217
     218                /*
     219                 * // 參數設定功能,暫不開放 String[] argc = { local_input_dir, hdfs_output_dir,
     220                 * local_store_product_list }; args = argc;
     221                 *
     222                 * if (args.length != 3) { System.err
     223                 * .println("Usage: hadoop jar ProductEmp.jar <input> <output> <store.txt>"
     224                 * ); System.exit(2); }
     225                 */
     226
     227                // (1). 設定hadoop 參數
     228                Configuration conf = new Configuration();
     229
     230                // 1.1 設定成單機測試模式
     231                conf.set("mapred.job.tracker", "local"); // for single
     232                conf.set("fs.default.name", "file:///"); // for single
     233
     234                // (2) 取得商品資訊
     235                // 2.1 : 0.編號 1.名稱 2.價格
     236                String str, product_list = "";
     237                String[] arr;
     238                BufferedReader bfr = new BufferedReader(new FileReader(new File(
     239                                local_store_product_list)));
     240                while ((str = bfr.readLine()) != null) {
     241                        arr = str.split(";");
     242
     243                        // 2.2 編號=價格=名稱
     244                        product_list += arr[0].trim() + "=" + arr[2].trim() + "="
     245                                        + arr[1].trim() + ",";
     246                }
     247                System.out.println("您設定的商品清單:" + product_list);
     248                conf.set("dt_product_list", product_list);
     249
     250                // (3) hdfs 操作
     251                // 3.1 檢查hdfs 上的結果目錄是否存在,已經在的話刪除之
     252                if (checkAndDelete(hdfs_output_dir, conf))
     253                        System.out.println("已刪除 舊的輸出結果 :" + hdfs_output_dir);
     254                if (checkAndDelete(hdfs_input_dir, conf))
     255                        System.out.println("已刪除 舊的輸入資料 :" + hdfs_input_dir);
     256
     257                // 3.2 將 local input 送上 hdfs
     258                if (putToHdfs(local_input_dir, hdfs_input_dir, conf))
     259                        System.out.println("已將 新的輸入資料 送上 hdfs :" + local_input_dir + " => "
     260                                        + hdfs_input_dir);
     261
     262                // (4) hadoop 運算設定
     263                Job job = new Job(conf, "Product Calculate");
     264                job.setJarByClass(ProductEmp.class);
     265                job.setMapperClass(DtMap.class);
     266                job.setReducerClass(DtReduce.class);
     267                job.setOutputKeyClass(Text.class);
     268                job.setOutputValueClass(IntWritable.class);
     269                FileInputFormat.addInputPath(job, new Path(hdfs_input_dir));
     270                FileOutputFormat.setOutputPath(job, new Path(hdfs_output_dir));
     271                // 4.n 送出運算指令
     272                if ( job.waitForCompletion(true) )
     273                        System.out.println("hadoop 運算完成");
     274               
     275                // (5) 下載
     276                if ( download_dt_js(hdfs_output_dir + "/part-r-00000",
     277                                local_download_out_file, conf) ){
     278                        System.out.println("從 hdfs 下載到 local 完成");
     279                }else{
     280                        System.err.println("[失敗] 從 hdfs 下載到 local 失敗");
     281                }
     282                       
     283                // (6) 改寫
     284                if (  transfer_dt_js(local_download_out_file, local_db_file)  ){
     285                        System.out.println("轉換資料格式完成,請將" + local_db_file + "放到 網頁伺服器");
     286                }else{
     287                        System.err.println("[失敗] 轉換成database失敗");
     288                }
     289
     290        }
     291}
     292
     293
     294
     295
     296
     297
     298
     299}}}