package itri;
//ITRI serial program number 3
//0. after Itri2Count
//1. run it by MapReduce
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class Itri3CalculateMR {
public static class HtMap extends TableMapper<Text, Text> {
public void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
String row = Bytes.toString(value.getValue(Bytes.toBytes("Detail"),
Bytes.toBytes("Locate")));
int sum = 0;
for (int i = 0; i < 4; i++) {
String v = Bytes.toString(value.getValue(Bytes
.toBytes("Products"), Bytes.toBytes("P" + (i + 1))));
String c = Bytes.toString(value.getValue(Bytes
.toBytes("Turnover"), Bytes.toBytes("P" + (i + 1))));
if (v != null ) {
if(c == null) c="0";
System.err.println("p=" + v);
System.err.println("c=" + c);
sum += Integer.parseInt(v) * Integer.parseInt(c);
System.err.println("T" + row + ":" + "p[" + i + "]*" + "c["
+ i + "] => " + v + "*" + c + "+="
+ (sum));
}
}
context.write(new Text("T" + row), new Text(String.valueOf(sum)));
}
}
public static class HtReduce extends TableReducer<Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String sum = "";
for (Text i : values) {
sum += i.toString();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes("Turnover"), Bytes.toBytes("Sum"), Bytes
.toBytes(sum));
context.write(new Text(), put);
}
}
public static void main(String args[]) throws Exception {
String tablename = "itri";
Scan myScan = new Scan();
myScan.addColumn("Detail:Locate".getBytes());
myScan.addColumn("Products:P1".getBytes());
myScan.addColumn("Products:P2".getBytes());
myScan.addColumn("Products:P3".getBytes());
myScan.addColumn("Products:P4".getBytes());
myScan.addColumn("Turnover:P1".getBytes());
myScan.addColumn("Turnover:P2".getBytes());
myScan.addColumn("Turnover:P3".getBytes());
myScan.addColumn("Turnover:P4".getBytes());
Configuration conf = new Configuration();
Job job = new Job(conf, "Calculating ");
job.setJarByClass(Itri3CalculateMR.class);
job.setMapperClass(HtMap.class);
job.setReducerClass(HtReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class,
Text.class, Text.class, job);
TableMapReduceUtil.initTableReducerJob(tablename, HtReduce.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}