wiki:waue/2011/0425_Itri3CalculateMR

Version 1 (modified by waue, 14 years ago) (diff)

--

package itri;

//ITRI serial program number 3 
//0. after Itri2Count 
//1. run it by MapReduce


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class Itri3CalculateMR {
  public static class HtMap extends TableMapper<Text, Text> {

    public void map(ImmutableBytesWritable key, Result value,
        Context context) throws IOException, InterruptedException {

      String row = Bytes.toString(value.getValue(Bytes.toBytes("Detail"),
          Bytes.toBytes("Locate")));

      int sum = 0;

      for (int i = 0; i < 4; i++) {
        String v = Bytes.toString(value.getValue(Bytes
            .toBytes("Products"), Bytes.toBytes("P" + (i + 1))));
        String c = Bytes.toString(value.getValue(Bytes
            .toBytes("Turnover"), Bytes.toBytes("P" + (i + 1))));
        if (v != null ) {
          if(c == null) c="0";
          System.err.println("p=" + v);
          System.err.println("c=" + c);

          sum += Integer.parseInt(v) * Integer.parseInt(c);
          System.err.println("T" + row + ":" + "p[" + i + "]*" + "c["
              + i + "] => " + v + "*" + c + "+="
              + (sum));
          
        }

      }

      context.write(new Text("T" + row), new Text(String.valueOf(sum)));

    }
  }


  public static class HtReduce extends TableReducer<Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
      String sum = "";
      for (Text i : values) {
        sum += i.toString();
      }

      Put put = new Put(Bytes.toBytes(key.toString()));

      put.add(Bytes.toBytes("Turnover"), Bytes.toBytes("Sum"), Bytes
          .toBytes(sum));

      context.write(new Text(), put);

    }
  }

  public static void main(String args[]) throws Exception {

    String tablename = "itri";

    Scan myScan = new Scan();
    myScan.addColumn("Detail:Locate".getBytes());
    myScan.addColumn("Products:P1".getBytes());
    myScan.addColumn("Products:P2".getBytes());
    myScan.addColumn("Products:P3".getBytes());
    myScan.addColumn("Products:P4".getBytes());
    myScan.addColumn("Turnover:P1".getBytes());
    myScan.addColumn("Turnover:P2".getBytes());
    myScan.addColumn("Turnover:P3".getBytes());
    myScan.addColumn("Turnover:P4".getBytes());

    Configuration conf = new Configuration();

    Job job = new Job(conf, "Calculating ");

    job.setJarByClass(Itri3CalculateMR.class);

    job.setMapperClass(HtMap.class);
    job.setReducerClass(HtReduce.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setInputFormatClass(TableInputFormat.class);
    job.setOutputFormatClass(TableOutputFormat.class);

    TableMapReduceUtil.initTableMapperJob(tablename, myScan, HtMap.class,
        Text.class, Text.class, job);
    TableMapReduceUtil.initTableReducerJob(tablename, HtReduce.class, job);

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }
}