package tsmc; //TSMC serial program number 3 //0. after TSMC2Count //1. run it by MapReduce import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; public class TSMC3CalculateMR { public static class HtMap extends TableMapper<Text, Text> { public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String row = Bytes.toString(value.getValue(Bytes.toBytes("Detail"), Bytes.toBytes("Locate"))); int sum = 0; for (int i = 0; i < 4; i++) { String v = Bytes.toString(value.getValue(Bytes .toBytes("Products"), Bytes.toBytes("P" + (i + 1)))); String c = Bytes.toString(value.getValue(Bytes .toBytes("Turnover"), Bytes.toBytes("P" + (i + 1)))); if (v != null ) { if(c == null) c="0"; System.err.println("p=" + v); System.err.println("c=" + c); sum += Integer.parseInt(v) * Integer.parseInt(c); System.err.println("T" + row + ":" + "p[" + i + "]*" + "c[" + i + "] => " + v + "*" + c + "+=" + (sum)); } } context.write(new Text("T" + row), new Text(String.valueOf(sum))); } } public static class HtReduce extends TableReducer<Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String sum = ""; for (Text i : values) { sum += i.toString(); } Put put = new Put(Bytes.toBytes(key.toString())); put.add(Bytes.toBytes("Turnover"), Bytes.toBytes("Sum"), Bytes .toBytes(sum)); context.write(new Text(), put); } } public static void main(String args[]) throws Exception { String tablename = "tsmc"; Scan myScan = new Scan(); myScan.addColumn("Detail:Locate".getBytes()); myScan.addColumn("Products:P1".getBytes()); myScan.addColumn("Products:P2".getBytes()); myScan.addColumn("Products:P3".getBytes()); myScan.addColumn("Products:P4".getBytes()); myScan.addColumn("Turnover:P1".getBytes()); myScan.addColumn("Turnover:P2".getBytes()); myScan.addColumn("Turnover:P3".getBytes()); myScan.addColumn("Turnover:P4".getBytes()); Configuration conf = new Configuration(); Job job = new Job(conf, "Calculating "); job.setJarByClass(TSMC3CalculateMR.class); job.setMapperClass(HtMap.class); job.setReducerClass(HtReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TableInputFormat.class); job.setOutputFormatClass(TableOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class, Text.class, Text.class, job); TableMapReduceUtil.initTableReducerJob(tablename, HtReduce.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Last modified 15 years ago
Last modified on Feb 3, 2010, 9:57:19 PM