package itri;
// TRI serial program number 2
// 0. after Itri1LoadFile
// 1. build several .txt files to /tmp/income/
// txt format is as:
// <any>:store:product:<other>
// xxx:T01:P4:000
// ooo:T02:P1:bbs
// oo:T03:P1:0sf0
// 2. put /tmp/itri/income to hdfs: /user/xxxx/income
// 3. run it
//
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class Itri2Count {
public static class HtMap extends
Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String s[] = value.toString().trim().split(":");
// xxx:T01:P4:oooo => T01@P4
String str = s[1] + "@" + s[2];
context.write(new Text(str), one);
}
}
public static class HtReduce extends
TableReducer<Text, IntWritable, LongWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
// T01@P4 => ( row = T01 , Column= Turnover:P4 )
String[] str = (key.toString()).split("@");
byte[] row = (str[0]).getBytes();
byte[] family = Bytes.toBytes("Turnover");
byte[] qualifier = (str[1]).getBytes();
byte[] summary = Bytes.toBytes(String.valueOf(sum));
Put put = new Put(row);
put.add(family, qualifier, summary );
context.write(new LongWritable(), put);
}
}
public static void main(String args[]) throws Exception {
// debug
String input = "income";
String tablename = "itri";
Configuration conf = new Configuration();
conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
Job job = new Job(conf, "Count to itri");
job.setJarByClass(Itri2Count.class);
job.setMapperClass(HtMap.class);
job.setReducerClass(HtReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}