| 1 | {{{ |
| 2 | #!java |
| 3 | |
| 4 | |
| 5 | package datatable; |
| 6 | |
| 7 | /* |
| 8 | * Info : |
| 9 | * http://trac.nchc.org.tw/cloud/wiki/waue/2012/0117 |
| 10 | */ |
| 11 | |
| 12 | import java.io.BufferedReader; |
| 13 | import java.io.BufferedWriter; |
| 14 | import java.io.File; |
| 15 | import java.io.FileReader; |
| 16 | import java.io.FileWriter; |
| 17 | import java.io.IOException; |
| 18 | import java.util.HashMap; |
| 19 | |
| 20 | import org.apache.hadoop.conf.Configuration; |
| 21 | import org.apache.hadoop.fs.FileSystem; |
| 22 | import org.apache.hadoop.fs.Path; |
| 23 | import org.apache.hadoop.io.IntWritable; |
| 24 | import org.apache.hadoop.io.LongWritable; |
| 25 | import org.apache.hadoop.io.Text; |
| 26 | import org.apache.hadoop.mapreduce.Job; |
| 27 | import org.apache.hadoop.mapreduce.Mapper; |
| 28 | import org.apache.hadoop.mapreduce.Reducer; |
| 29 | import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| 30 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| 31 | |
| 32 | public class ProductEmp { |
| 33 | |
| 34 | public static class DtMap extends |
| 35 | Mapper<LongWritable, Text, Text, IntWritable> { |
| 36 | private IntWritable one = new IntWritable(1); |
| 37 | |
| 38 | public void map(LongWritable key, Text value, Context context) |
| 39 | throws IOException, InterruptedException { |
| 40 | String s[] = value.toString().trim().split(":"); |
| 41 | String str = s[1]; |
| 42 | context.write(new Text(str), one); |
| 43 | } |
| 44 | } |
| 45 | |
| 46 | public static class DtReduce extends Reducer<Text, IntWritable, Text, Text> { |
| 47 | |
| 48 | public HashMap<String, Integer> hm_product_map = new HashMap<String, Integer>(); |
| 49 | public HashMap<String, String> hm_product_name_map = new HashMap<String, String>(); |
| 50 | private Configuration conf; |
| 51 | |
| 52 | |
| 53 | protected void setup(Context context) throws IOException, |
| 54 | InterruptedException { |
| 55 | // put the product list into HashMap for querying |
| 56 | conf = context.getConfiguration(); |
| 57 | |
| 58 | String str = conf.get("dt_product_list"); |
| 59 | // 取得到 編號1=價格1=名稱1, 編號2=價格2=名稱2... |
| 60 | String[] arr = str.split(","); |
| 61 | String[] tmp_arr; |
| 62 | for (String tmp : arr) { |
| 63 | tmp_arr = tmp.split("="); |
| 64 | hm_product_map.put(tmp_arr[0], new Integer(tmp_arr[1])); |
| 65 | hm_product_name_map.put(tmp_arr[0], tmp_arr[2]); |
| 66 | } |
| 67 | System.err.println(hm_product_map.toString()); |
| 68 | System.err.println(hm_product_name_map.toString()); |
| 69 | } |
| 70 | |
| 71 | public void reduce(Text key, Iterable<IntWritable> values, |
| 72 | Context context) throws IOException, InterruptedException { |
| 73 | // <key,value> 為 <編號, 串列(111..1)> |
| 74 | |
| 75 | int count = 0, price = 0, sales = 0; |
| 76 | // 取得每個 key 的(111..1)計數,相加,得到 數量 |
| 77 | for (IntWritable val : values) { |
| 78 | count += val.get(); |
| 79 | } |
| 80 | // 取得編號的單價 |
| 81 | String id = key.toString().trim(); |
| 82 | String p_name = ""; |
| 83 | //// System.err.println("id=" + id + " <e> "); |
| 84 | if (id != null) { |
| 85 | price = (hm_product_map.get(id) == null) ? 0 : hm_product_map |
| 86 | .get(id); |
| 87 | p_name = hm_product_name_map.get(id); |
| 88 | |
| 89 | } |
| 90 | //// System.err.println("price=" + price + " <e> "); |
| 91 | //// System.err.println("name=" + p_name + " <e> "); |
| 92 | |
| 93 | // 銷售額 = 數量 X 單價 |
| 94 | if (count != 0 && price != 0) |
| 95 | sales = count * price; |
| 96 | |
| 97 | // 輸出的value 設定為 銷售額 |
| 98 | |
| 99 | Text t_info = new Text("[\'" + id + "\',\'" + p_name + "\',"); |
| 100 | Text t_result = new Text("\'" + price + "\',\'" + count + "\',\'" |
| 101 | + sales + "\'],"); |
| 102 | // 輸出新的 <key,value> |
| 103 | context.write(t_info, t_result); |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | static public boolean checkAndDelete(String out, Configuration conf) { |
| 108 | Path dstPath = new Path(out); |
| 109 | try { |
| 110 | FileSystem hdfs = dstPath.getFileSystem(conf); |
| 111 | if (hdfs.exists(dstPath)) { |
| 112 | hdfs.delete(dstPath, true); |
| 113 | } |
| 114 | |
| 115 | } catch (IOException e) { |
| 116 | e.printStackTrace(); |
| 117 | return false; |
| 118 | } |
| 119 | return true; |
| 120 | } |
| 121 | |
| 122 | static boolean putToHdfs(String src, String dst, Configuration conf) { |
| 123 | Path dstPath = new Path(dst); |
| 124 | try { |
| 125 | // 產生操作hdfs的物件 |
| 126 | FileSystem hdfs = dstPath.getFileSystem(conf); |
| 127 | // 上傳 |
| 128 | hdfs.copyFromLocalFile(false, new Path(src), new Path(dst)); |
| 129 | |
| 130 | } catch (IOException e) { |
| 131 | e.printStackTrace(); |
| 132 | return false; |
| 133 | } |
| 134 | return true; |
| 135 | } |
| 136 | |
| 137 | static public boolean deleteFolder(File dir) { |
| 138 | File filelist[] = dir.listFiles(); |
| 139 | int listlen = filelist.length; |
| 140 | for (int i = 0; i < listlen; i++) { |
| 141 | if (filelist[i].isDirectory()) { |
| 142 | deleteFolder(filelist[i]); |
| 143 | } else { |
| 144 | filelist[i].delete(); |
| 145 | } |
| 146 | } |
| 147 | return dir.delete(); |
| 148 | } |
| 149 | |
| 150 | public static boolean download_dt_js(String from, String dst, |
| 151 | Configuration conf) { |
| 152 | |
| 153 | Path hdfs_result_path = new Path(from); |
| 154 | Path local_result_path = new Path(dst); |
| 155 | try { |
| 156 | FileSystem hdfs = hdfs_result_path.getFileSystem(conf); |
| 157 | if (hdfs.exists(hdfs_result_path)) { |
| 158 | hdfs.copyFromLocalFile(hdfs_result_path, local_result_path); |
| 159 | |
| 160 | } |
| 161 | |
| 162 | } catch (IOException e) { |
| 163 | e.printStackTrace(); |
| 164 | return false; |
| 165 | } |
| 166 | return true; |
| 167 | |
| 168 | } |
| 169 | |
| 170 | public static boolean transfer_dt_js(String from, String dst) { |
| 171 | |
| 172 | String b = null; |
| 173 | String header = "var aDataSet = ["; |
| 174 | String footer = "];"; |
| 175 | File file = new File(dst); |
| 176 | BufferedReader br; |
| 177 | BufferedWriter bw; |
| 178 | if (!file.exists() != false) { |
| 179 | try { |
| 180 | file.createNewFile(); |
| 181 | |
| 182 | } catch (IOException e) { |
| 183 | e.printStackTrace(); |
| 184 | } |
| 185 | } |
| 186 | try { |
| 187 | bw = new BufferedWriter(new FileWriter(file)); |
| 188 | bw.write(header); |
| 189 | FileReader fr = new FileReader(from); |
| 190 | br = new BufferedReader(fr); |
| 191 | while ((b = br.readLine()) != null) { |
| 192 | //// System.out.println(b); |
| 193 | bw.write(b); |
| 194 | bw.newLine(); |
| 195 | bw.flush(); |
| 196 | } |
| 197 | bw.write(footer); |
| 198 | bw.close(); |
| 199 | br.close(); |
| 200 | } catch (Exception e) { |
| 201 | |
| 202 | System.err.println(e.getMessage()); |
| 203 | return false; |
| 204 | } |
| 205 | return true; |
| 206 | } |
| 207 | |
| 208 | public static void main(String args[]) throws Exception { |
| 209 | |
| 210 | // (0). 設定系統環境參數 |
| 211 | String local_store_product_list = "/tmp/ProductEmp/conf/store.txt"; |
| 212 | String local_input_dir = "/tmp/ProductEmp/input"; |
| 213 | String hdfs_input_dir = "/tmp/ProductEmp/hdfs"; |
| 214 | String hdfs_output_dir = "/tmp/ProductEmp/out"; |
| 215 | String local_download_out_file = "/tmp/ProductEmp/result/tmp.log"; |
| 216 | String local_db_file = "/tmp/ProductEmp/www-web/db.js"; |
| 217 | |
| 218 | /* |
| 219 | * // 參數設定功能,暫不開放 String[] argc = { local_input_dir, hdfs_output_dir, |
| 220 | * local_store_product_list }; args = argc; |
| 221 | * |
| 222 | * if (args.length != 3) { System.err |
| 223 | * .println("Usage: hadoop jar ProductEmp.jar <input> <output> <store.txt>" |
| 224 | * ); System.exit(2); } |
| 225 | */ |
| 226 | |
| 227 | // (1). 設定hadoop 參數 |
| 228 | Configuration conf = new Configuration(); |
| 229 | |
| 230 | // 1.1 設定成單機測試模式 |
| 231 | conf.set("mapred.job.tracker", "local"); // for single |
| 232 | conf.set("fs.default.name", "file:///"); // for single |
| 233 | |
| 234 | // (2) 取得商品資訊 |
| 235 | // 2.1 : 0.編號 1.名稱 2.價格 |
| 236 | String str, product_list = ""; |
| 237 | String[] arr; |
| 238 | BufferedReader bfr = new BufferedReader(new FileReader(new File( |
| 239 | local_store_product_list))); |
| 240 | while ((str = bfr.readLine()) != null) { |
| 241 | arr = str.split(";"); |
| 242 | |
| 243 | // 2.2 編號=價格=名稱 |
| 244 | product_list += arr[0].trim() + "=" + arr[2].trim() + "=" |
| 245 | + arr[1].trim() + ","; |
| 246 | } |
| 247 | System.out.println("您設定的商品清單:" + product_list); |
| 248 | conf.set("dt_product_list", product_list); |
| 249 | |
| 250 | // (3) hdfs 操作 |
| 251 | // 3.1 檢查hdfs 上的結果目錄是否存在,已經在的話刪除之 |
| 252 | if (checkAndDelete(hdfs_output_dir, conf)) |
| 253 | System.out.println("已刪除 舊的輸出結果 :" + hdfs_output_dir); |
| 254 | if (checkAndDelete(hdfs_input_dir, conf)) |
| 255 | System.out.println("已刪除 舊的輸入資料 :" + hdfs_input_dir); |
| 256 | |
| 257 | // 3.2 將 local input 送上 hdfs |
| 258 | if (putToHdfs(local_input_dir, hdfs_input_dir, conf)) |
| 259 | System.out.println("已將 新的輸入資料 送上 hdfs :" + local_input_dir + " => " |
| 260 | + hdfs_input_dir); |
| 261 | |
| 262 | // (4) hadoop 運算設定 |
| 263 | Job job = new Job(conf, "Product Calculate"); |
| 264 | job.setJarByClass(ProductEmp.class); |
| 265 | job.setMapperClass(DtMap.class); |
| 266 | job.setReducerClass(DtReduce.class); |
| 267 | job.setOutputKeyClass(Text.class); |
| 268 | job.setOutputValueClass(IntWritable.class); |
| 269 | FileInputFormat.addInputPath(job, new Path(hdfs_input_dir)); |
| 270 | FileOutputFormat.setOutputPath(job, new Path(hdfs_output_dir)); |
| 271 | // 4.n 送出運算指令 |
| 272 | if ( job.waitForCompletion(true) ) |
| 273 | System.out.println("hadoop 運算完成"); |
| 274 | |
| 275 | // (5) 下載 |
| 276 | if ( download_dt_js(hdfs_output_dir + "/part-r-00000", |
| 277 | local_download_out_file, conf) ){ |
| 278 | System.out.println("從 hdfs 下載到 local 完成"); |
| 279 | }else{ |
| 280 | System.err.println("[失敗] 從 hdfs 下載到 local 失敗"); |
| 281 | } |
| 282 | |
| 283 | // (6) 改寫 |
| 284 | if ( transfer_dt_js(local_download_out_file, local_db_file) ){ |
| 285 | System.out.println("轉換資料格式完成,請將" + local_db_file + "放到 網頁伺服器"); |
| 286 | }else{ |
| 287 | System.err.println("[失敗] 轉換成database失敗"); |
| 288 | } |
| 289 | |
| 290 | } |
| 291 | } |
| 292 | |
| 293 | |
| 294 | |
| 295 | |
| 296 | |
| 297 | |
| 298 | |
| 299 | }}} |