| | 1 | {{{ |
| | 2 | #!java |
| | 3 | |
| | 4 | |
| | 5 | package datatable; |
| | 6 | |
| | 7 | /* |
| | 8 | * Info : |
| | 9 | * http://trac.nchc.org.tw/cloud/wiki/waue/2012/0117 |
| | 10 | */ |
| | 11 | |
| | 12 | import java.io.BufferedReader; |
| | 13 | import java.io.BufferedWriter; |
| | 14 | import java.io.File; |
| | 15 | import java.io.FileReader; |
| | 16 | import java.io.FileWriter; |
| | 17 | import java.io.IOException; |
| | 18 | import java.util.HashMap; |
| | 19 | |
| | 20 | import org.apache.hadoop.conf.Configuration; |
| | 21 | import org.apache.hadoop.fs.FileSystem; |
| | 22 | import org.apache.hadoop.fs.Path; |
| | 23 | import org.apache.hadoop.io.IntWritable; |
| | 24 | import org.apache.hadoop.io.LongWritable; |
| | 25 | import org.apache.hadoop.io.Text; |
| | 26 | import org.apache.hadoop.mapreduce.Job; |
| | 27 | import org.apache.hadoop.mapreduce.Mapper; |
| | 28 | import org.apache.hadoop.mapreduce.Reducer; |
| | 29 | import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| | 30 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| | 31 | |
| | 32 | public class ProductEmp { |
| | 33 | |
| | 34 | public static class DtMap extends |
| | 35 | Mapper<LongWritable, Text, Text, IntWritable> { |
| | 36 | private IntWritable one = new IntWritable(1); |
| | 37 | |
| | 38 | public void map(LongWritable key, Text value, Context context) |
| | 39 | throws IOException, InterruptedException { |
| | 40 | String s[] = value.toString().trim().split(":"); |
| | 41 | String str = s[1]; |
| | 42 | context.write(new Text(str), one); |
| | 43 | } |
| | 44 | } |
| | 45 | |
| | 46 | public static class DtReduce extends Reducer<Text, IntWritable, Text, Text> { |
| | 47 | |
| | 48 | public HashMap<String, Integer> hm_product_map = new HashMap<String, Integer>(); |
| | 49 | public HashMap<String, String> hm_product_name_map = new HashMap<String, String>(); |
| | 50 | private Configuration conf; |
| | 51 | |
| | 52 | |
| | 53 | protected void setup(Context context) throws IOException, |
| | 54 | InterruptedException { |
| | 55 | // put the product list into HashMap for querying |
| | 56 | conf = context.getConfiguration(); |
| | 57 | |
| | 58 | String str = conf.get("dt_product_list"); |
| | 59 | // 取得到 編號1=價格1=名稱1, 編號2=價格2=名稱2... |
| | 60 | String[] arr = str.split(","); |
| | 61 | String[] tmp_arr; |
| | 62 | for (String tmp : arr) { |
| | 63 | tmp_arr = tmp.split("="); |
| | 64 | hm_product_map.put(tmp_arr[0], new Integer(tmp_arr[1])); |
| | 65 | hm_product_name_map.put(tmp_arr[0], tmp_arr[2]); |
| | 66 | } |
| | 67 | System.err.println(hm_product_map.toString()); |
| | 68 | System.err.println(hm_product_name_map.toString()); |
| | 69 | } |
| | 70 | |
| | 71 | public void reduce(Text key, Iterable<IntWritable> values, |
| | 72 | Context context) throws IOException, InterruptedException { |
| | 73 | // <key,value> 為 <編號, 串列(111..1)> |
| | 74 | |
| | 75 | int count = 0, price = 0, sales = 0; |
| | 76 | // 取得每個 key 的(111..1)計數,相加,得到 數量 |
| | 77 | for (IntWritable val : values) { |
| | 78 | count += val.get(); |
| | 79 | } |
| | 80 | // 取得編號的單價 |
| | 81 | String id = key.toString().trim(); |
| | 82 | String p_name = ""; |
| | 83 | //// System.err.println("id=" + id + " <e> "); |
| | 84 | if (id != null) { |
| | 85 | price = (hm_product_map.get(id) == null) ? 0 : hm_product_map |
| | 86 | .get(id); |
| | 87 | p_name = hm_product_name_map.get(id); |
| | 88 | |
| | 89 | } |
| | 90 | //// System.err.println("price=" + price + " <e> "); |
| | 91 | //// System.err.println("name=" + p_name + " <e> "); |
| | 92 | |
| | 93 | // 銷售額 = 數量 X 單價 |
| | 94 | if (count != 0 && price != 0) |
| | 95 | sales = count * price; |
| | 96 | |
| | 97 | // 輸出的value 設定為 銷售額 |
| | 98 | |
| | 99 | Text t_info = new Text("[\'" + id + "\',\'" + p_name + "\',"); |
| | 100 | Text t_result = new Text("\'" + price + "\',\'" + count + "\',\'" |
| | 101 | + sales + "\'],"); |
| | 102 | // 輸出新的 <key,value> |
| | 103 | context.write(t_info, t_result); |
| | 104 | } |
| | 105 | } |
| | 106 | |
| | 107 | static public boolean checkAndDelete(String out, Configuration conf) { |
| | 108 | Path dstPath = new Path(out); |
| | 109 | try { |
| | 110 | FileSystem hdfs = dstPath.getFileSystem(conf); |
| | 111 | if (hdfs.exists(dstPath)) { |
| | 112 | hdfs.delete(dstPath, true); |
| | 113 | } |
| | 114 | |
| | 115 | } catch (IOException e) { |
| | 116 | e.printStackTrace(); |
| | 117 | return false; |
| | 118 | } |
| | 119 | return true; |
| | 120 | } |
| | 121 | |
| | 122 | static boolean putToHdfs(String src, String dst, Configuration conf) { |
| | 123 | Path dstPath = new Path(dst); |
| | 124 | try { |
| | 125 | // 產生操作hdfs的物件 |
| | 126 | FileSystem hdfs = dstPath.getFileSystem(conf); |
| | 127 | // 上傳 |
| | 128 | hdfs.copyFromLocalFile(false, new Path(src), new Path(dst)); |
| | 129 | |
| | 130 | } catch (IOException e) { |
| | 131 | e.printStackTrace(); |
| | 132 | return false; |
| | 133 | } |
| | 134 | return true; |
| | 135 | } |
| | 136 | |
| | 137 | static public boolean deleteFolder(File dir) { |
| | 138 | File filelist[] = dir.listFiles(); |
| | 139 | int listlen = filelist.length; |
| | 140 | for (int i = 0; i < listlen; i++) { |
| | 141 | if (filelist[i].isDirectory()) { |
| | 142 | deleteFolder(filelist[i]); |
| | 143 | } else { |
| | 144 | filelist[i].delete(); |
| | 145 | } |
| | 146 | } |
| | 147 | return dir.delete(); |
| | 148 | } |
| | 149 | |
| | 150 | public static boolean download_dt_js(String from, String dst, |
| | 151 | Configuration conf) { |
| | 152 | |
| | 153 | Path hdfs_result_path = new Path(from); |
| | 154 | Path local_result_path = new Path(dst); |
| | 155 | try { |
| | 156 | FileSystem hdfs = hdfs_result_path.getFileSystem(conf); |
| | 157 | if (hdfs.exists(hdfs_result_path)) { |
| | 158 | hdfs.copyFromLocalFile(hdfs_result_path, local_result_path); |
| | 159 | |
| | 160 | } |
| | 161 | |
| | 162 | } catch (IOException e) { |
| | 163 | e.printStackTrace(); |
| | 164 | return false; |
| | 165 | } |
| | 166 | return true; |
| | 167 | |
| | 168 | } |
| | 169 | |
| | 170 | public static boolean transfer_dt_js(String from, String dst) { |
| | 171 | |
| | 172 | String b = null; |
| | 173 | String header = "var aDataSet = ["; |
| | 174 | String footer = "];"; |
| | 175 | File file = new File(dst); |
| | 176 | BufferedReader br; |
| | 177 | BufferedWriter bw; |
| | 178 | if (!file.exists() != false) { |
| | 179 | try { |
| | 180 | file.createNewFile(); |
| | 181 | |
| | 182 | } catch (IOException e) { |
| | 183 | e.printStackTrace(); |
| | 184 | } |
| | 185 | } |
| | 186 | try { |
| | 187 | bw = new BufferedWriter(new FileWriter(file)); |
| | 188 | bw.write(header); |
| | 189 | FileReader fr = new FileReader(from); |
| | 190 | br = new BufferedReader(fr); |
| | 191 | while ((b = br.readLine()) != null) { |
| | 192 | //// System.out.println(b); |
| | 193 | bw.write(b); |
| | 194 | bw.newLine(); |
| | 195 | bw.flush(); |
| | 196 | } |
| | 197 | bw.write(footer); |
| | 198 | bw.close(); |
| | 199 | br.close(); |
| | 200 | } catch (Exception e) { |
| | 201 | |
| | 202 | System.err.println(e.getMessage()); |
| | 203 | return false; |
| | 204 | } |
| | 205 | return true; |
| | 206 | } |
| | 207 | |
| | 208 | public static void main(String args[]) throws Exception { |
| | 209 | |
| | 210 | // (0). 設定系統環境參數 |
| | 211 | String local_store_product_list = "/tmp/ProductEmp/conf/store.txt"; |
| | 212 | String local_input_dir = "/tmp/ProductEmp/input"; |
| | 213 | String hdfs_input_dir = "/tmp/ProductEmp/hdfs"; |
| | 214 | String hdfs_output_dir = "/tmp/ProductEmp/out"; |
| | 215 | String local_download_out_file = "/tmp/ProductEmp/result/tmp.log"; |
| | 216 | String local_db_file = "/tmp/ProductEmp/www-web/db.js"; |
| | 217 | |
| | 218 | /* |
| | 219 | * // 參數設定功能,暫不開放 String[] argc = { local_input_dir, hdfs_output_dir, |
| | 220 | * local_store_product_list }; args = argc; |
| | 221 | * |
| | 222 | * if (args.length != 3) { System.err |
| | 223 | * .println("Usage: hadoop jar ProductEmp.jar <input> <output> <store.txt>" |
| | 224 | * ); System.exit(2); } |
| | 225 | */ |
| | 226 | |
| | 227 | // (1). 設定hadoop 參數 |
| | 228 | Configuration conf = new Configuration(); |
| | 229 | |
| | 230 | // 1.1 設定成單機測試模式 |
| | 231 | conf.set("mapred.job.tracker", "local"); // for single |
| | 232 | conf.set("fs.default.name", "file:///"); // for single |
| | 233 | |
| | 234 | // (2) 取得商品資訊 |
| | 235 | // 2.1 : 0.編號 1.名稱 2.價格 |
| | 236 | String str, product_list = ""; |
| | 237 | String[] arr; |
| | 238 | BufferedReader bfr = new BufferedReader(new FileReader(new File( |
| | 239 | local_store_product_list))); |
| | 240 | while ((str = bfr.readLine()) != null) { |
| | 241 | arr = str.split(";"); |
| | 242 | |
| | 243 | // 2.2 編號=價格=名稱 |
| | 244 | product_list += arr[0].trim() + "=" + arr[2].trim() + "=" |
| | 245 | + arr[1].trim() + ","; |
| | 246 | } |
| | 247 | System.out.println("您設定的商品清單:" + product_list); |
| | 248 | conf.set("dt_product_list", product_list); |
| | 249 | |
| | 250 | // (3) hdfs 操作 |
| | 251 | // 3.1 檢查hdfs 上的結果目錄是否存在,已經在的話刪除之 |
| | 252 | if (checkAndDelete(hdfs_output_dir, conf)) |
| | 253 | System.out.println("已刪除 舊的輸出結果 :" + hdfs_output_dir); |
| | 254 | if (checkAndDelete(hdfs_input_dir, conf)) |
| | 255 | System.out.println("已刪除 舊的輸入資料 :" + hdfs_input_dir); |
| | 256 | |
| | 257 | // 3.2 將 local input 送上 hdfs |
| | 258 | if (putToHdfs(local_input_dir, hdfs_input_dir, conf)) |
| | 259 | System.out.println("已將 新的輸入資料 送上 hdfs :" + local_input_dir + " => " |
| | 260 | + hdfs_input_dir); |
| | 261 | |
| | 262 | // (4) hadoop 運算設定 |
| | 263 | Job job = new Job(conf, "Product Calculate"); |
| | 264 | job.setJarByClass(ProductEmp.class); |
| | 265 | job.setMapperClass(DtMap.class); |
| | 266 | job.setReducerClass(DtReduce.class); |
| | 267 | job.setOutputKeyClass(Text.class); |
| | 268 | job.setOutputValueClass(IntWritable.class); |
| | 269 | FileInputFormat.addInputPath(job, new Path(hdfs_input_dir)); |
| | 270 | FileOutputFormat.setOutputPath(job, new Path(hdfs_output_dir)); |
| | 271 | // 4.n 送出運算指令 |
| | 272 | if ( job.waitForCompletion(true) ) |
| | 273 | System.out.println("hadoop 運算完成"); |
| | 274 | |
| | 275 | // (5) 下載 |
| | 276 | if ( download_dt_js(hdfs_output_dir + "/part-r-00000", |
| | 277 | local_download_out_file, conf) ){ |
| | 278 | System.out.println("從 hdfs 下載到 local 完成"); |
| | 279 | }else{ |
| | 280 | System.err.println("[失敗] 從 hdfs 下載到 local 失敗"); |
| | 281 | } |
| | 282 | |
| | 283 | // (6) 改寫 |
| | 284 | if ( transfer_dt_js(local_download_out_file, local_db_file) ){ |
| | 285 | System.out.println("轉換資料格式完成,請將" + local_db_file + "放到 網頁伺服器"); |
| | 286 | }else{ |
| | 287 | System.err.println("[失敗] 轉換成database失敗"); |
| | 288 | } |
| | 289 | |
| | 290 | } |
| | 291 | } |
| | 292 | |
| | 293 | |
| | 294 | |
| | 295 | |
| | 296 | |
| | 297 | |
| | 298 | |
| | 299 | }}} |