wiki:waue/2011/0425_Itri2Count
package itri;

// TRI serial program number 2 
// 0. after Itri1LoadFile 
// 1. build several .txt files to /tmp/income/
//   txt format is as: 
//    <any>:store:product:<other>
//    xxx:T01:P4:000
//    ooo:T02:P1:bbs
//    oo:T03:P1:0sf0
// 2. put /tmp/itri/income to hdfs: /user/xxxx/income
// 3. run it
// 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class Itri2Count {
  public static class HtMap extends
      Mapper<LongWritable, Text, Text, IntWritable> {
    private IntWritable one = new IntWritable(1);
    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String s[] = value.toString().trim().split(":");
      // xxx:T01:P4:oooo => T01@P4
      String str = s[1] + "@" + s[2];
      context.write(new Text(str), one);
    }
  }

  public static class HtReduce extends
      TableReducer<Text, IntWritable, LongWritable> {
    public void reduce(Text key, Iterable<IntWritable> values,
        Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable i : values) {
        sum += i.get();
      }
      // T01@P4 => ( row = T01 , Column= Turnover:P4 )
      String[] str = (key.toString()).split("@");
      byte[] row = (str[0]).getBytes();
      byte[] family = Bytes.toBytes("Turnover");
      byte[] qualifier = (str[1]).getBytes();
      byte[] summary = Bytes.toBytes(String.valueOf(sum));
      Put put = new Put(row);
      put.add(family, qualifier, summary );
      context.write(new LongWritable(), put);
    }
  }

  public static void main(String args[]) throws Exception {
    // debug

    String input = "income";

    String tablename = "itri";

    Configuration conf = new Configuration();

    conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);

    Job job = new Job(conf, "Count to itri");

    job.setJarByClass(Itri2Count.class);

    job.setMapperClass(HtMap.class);
    job.setReducerClass(HtReduce.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setInputFormatClass(TextInputFormat.class);

    job.setOutputFormatClass(TableOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(input));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Last modified 14 years ago Last modified on Apr 25, 2011, 4:19:29 PM