Changes between Initial Version and Version 1 of waue/2011/0425_Itri2Count


Ignore:
Timestamp:
Apr 25, 2011, 4:19:29 PM (13 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2011/0425_Itri2Count

    v1 v1  
     1{{{
     2#!java
     3
     4package itri;
     5
     6// TRI serial program number 2
     7// 0. after Itri1LoadFile
     8// 1. build several .txt files to /tmp/income/
     9//       txt format is as:
     10//              <any>:store:product:<other>
     11//              xxx:T01:P4:000
     12//              ooo:T02:P1:bbs
     13//              oo:T03:P1:0sf0
     14// 2. put /tmp/itri/income to hdfs: /user/xxxx/income
     15// 3. run it
     16//
     17
     18import java.io.IOException;
     19
     20import org.apache.hadoop.conf.Configuration;
     21import org.apache.hadoop.fs.Path;
     22import org.apache.hadoop.hbase.client.Put;
     23import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
     24import org.apache.hadoop.hbase.mapreduce.TableReducer;
     25import org.apache.hadoop.hbase.util.Bytes;
     26import org.apache.hadoop.io.IntWritable;
     27import org.apache.hadoop.io.LongWritable;
     28import org.apache.hadoop.io.Text;
     29import org.apache.hadoop.mapreduce.Job;
     30import org.apache.hadoop.mapreduce.Mapper;
     31import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     32import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     33
     34public class Itri2Count {
     35        public static class HtMap extends
     36                        Mapper<LongWritable, Text, Text, IntWritable> {
     37                private IntWritable one = new IntWritable(1);
     38                public void map(LongWritable key, Text value, Context context)
     39                                throws IOException, InterruptedException {
     40                        String s[] = value.toString().trim().split(":");
     41                        // xxx:T01:P4:oooo => T01@P4
     42                        String str = s[1] + "@" + s[2];
     43                        context.write(new Text(str), one);
     44                }
     45        }
     46
     47        public static class HtReduce extends
     48                        TableReducer<Text, IntWritable, LongWritable> {
     49                public void reduce(Text key, Iterable<IntWritable> values,
     50                                Context context) throws IOException, InterruptedException {
     51                        int sum = 0;
     52                        for (IntWritable i : values) {
     53                                sum += i.get();
     54                        }
     55                        // T01@P4 => ( row = T01 , Column= Turnover:P4 )
     56                        String[] str = (key.toString()).split("@");
     57                        byte[] row = (str[0]).getBytes();
     58                        byte[] family = Bytes.toBytes("Turnover");
     59                        byte[] qualifier = (str[1]).getBytes();
     60                        byte[] summary = Bytes.toBytes(String.valueOf(sum));
     61                        Put put = new Put(row);
     62                        put.add(family, qualifier, summary );
     63                        context.write(new LongWritable(), put);
     64                }
     65        }
     66
     67        public static void main(String args[]) throws Exception {
     68                // debug
     69
     70                String input = "income";
     71
     72                String tablename = "itri";
     73
     74                Configuration conf = new Configuration();
     75
     76                conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
     77
     78                Job job = new Job(conf, "Count to itri");
     79
     80                job.setJarByClass(Itri2Count.class);
     81
     82                job.setMapperClass(HtMap.class);
     83                job.setReducerClass(HtReduce.class);
     84
     85                job.setMapOutputKeyClass(Text.class);
     86                job.setMapOutputValueClass(IntWritable.class);
     87
     88                job.setInputFormatClass(TextInputFormat.class);
     89
     90                job.setOutputFormatClass(TableOutputFormat.class);
     91
     92                FileInputFormat.addInputPath(job, new Path(input));
     93
     94                System.exit(job.waitForCompletion(true) ? 0 : 1);
     95        }
     96}
     97
     98}}}