source: sample/hadoop-0.16/tw/org/nchc/code/WordCountFromHBase.java @ 77

Last change on this file since 77 was 27, checked in by waue, 16 years ago

test!

File size: 6.2 KB
Line 
1/**
2 * Program: WordCountFromHBase.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7
8/**
9 * Purpose :
10 *  Word counting from Hbase then store result in Hadoop file system
11 *
12 * HowToUse :
13 *  Make sure Hadoop file system are running and HBase has correct data.
14 *  Suggest to run WordCountIntoHBase first.
15 *  finally, modify these setup parameters and run.
16 *
17 * Check Result:
18 * 
19 *  inspect http://localhost:50070 by web explorer
20 */
21
22package tw.org.nchc.code;
23
24import java.io.IOException;
25import java.util.Iterator;
26import java.util.StringTokenizer;
27
28import org.apache.hadoop.fs.FileSystem;
29import org.apache.hadoop.fs.Path;
30import org.apache.hadoop.hbase.HStoreKey;
31import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32import org.apache.hadoop.hbase.mapred.TableInputFormat;
33import org.apache.hadoop.hbase.mapred.TableMap;
34import org.apache.hadoop.io.IntWritable;
35import org.apache.hadoop.io.MapWritable;
36import org.apache.hadoop.io.Text;
37import org.apache.hadoop.mapred.JobClient;
38import org.apache.hadoop.mapred.JobConf;
39import org.apache.hadoop.mapred.MapReduceBase;
40import org.apache.hadoop.mapred.OutputCollector;
41import org.apache.hadoop.mapred.Reducer;
42import org.apache.hadoop.mapred.Reporter;
43@SuppressWarnings("unused")
44
45public class WordCountFromHBase {
46  /* setup parameters */
47  // set the output path
48  static String outputPath = "counts2";
49
50  // org.apache.hadoop.hbase.mapred.TableMap<K,V>  \
51  // TableMap<K extends org.apache.hadoop.io.WritableComparable, \
52  //  V extends org.apache.hadoop.io.Writable> \
53  // Scan an HBase table to sort by a specified sort column. \
54  // If the column does not exist, the record is not passed to Reduce.;
55  private static class MapClass extends TableMap<Text, IntWritable> {
56
57    // set one as (IntWritable)1
58    private final static IntWritable one = new IntWritable(1);
59    // set column
60    private final static Text textcol = new Text(WordCountIntoHBase.colstr);
61    private Text word = new Text();   
62    // TableMap is a interface, map is a abstract method. now, we should \
63    //  inprement map() at here, format is : \
64    // map(HStoreKey key, MapWritable value,  \
65    //  OutputCollector<K,V> output, Reporter reporter) ;
66        // Call a user defined function on a single HBase record, \ 
67    //  represented by a key and its associated record value. ;
68    public void map(HStoreKey key, MapWritable cols,
69        OutputCollector<Text, IntWritable> output, Reporter reporter)
70        throws IOException {
71      //
72      // The first get() is : Writable <- get(Object key) \
73      //  get in interface Map<Writable,Writable>  ;
74      // Use ImmutableBytesWritable to downcast Writable \
75      // The second get() is : byte[] <- get() \
76      //  Get the data from the BytesWritable. ;
77      // Text.decode is parse UTF-8 code to a String ;
78      // per "line" is per row data in HTable
79      String line = Text.decode( ((ImmutableBytesWritable) cols.get(textcol) )
80          .get() );
81     
82      //let us know what is "line"
83      /*
84      RandomAccessFile raf =
85        new RandomAccessFile("/home/waue/mr-result.txt","rw");
86      raf.seek(raf.length()); // move pointer to end
87      raf.write(("\n"+line).getBytes());
88      raf.close();
89      *///end
90      // the result is the contents of merged files "
91     
92      //StringTokenizer will divide a line into a word 
93      StringTokenizer itr = new StringTokenizer(line);
94      // set every word as one
95      while (itr.hasMoreTokens()) {
96        // nextToken will return this value in String and point to next \
97        // Text.set() = Set to contain the contents of a string.
98        word.set(itr.nextToken()); 
99        // OutputCollector.collect = collect(K key, V value) \
100        //  Adds a key/value pair to the output.
101        output.collect(word, one);
102      }
103    }
104  }
105
106  // reducer: sums up all the counts
107  private static class ReduceClass extends MapReduceBase implements
108      Reducer<Text, IntWritable, Text, IntWritable> {
109
110    // reuse objects
111    private final static IntWritable SumValue = new IntWritable();
112   
113    // this sample's reduce() format is the same as map() \
114    //  reduce is a method waiting for implement \
115    //  four type in this sample is (Text , Iterator<IntWritable>, \
116    //    OutputCollector<Text, IntWritable> , Reporter ) ;
117    public void reduce(Text key, Iterator<IntWritable> values,
118        OutputCollector<Text, IntWritable> output, Reporter reporter)
119        throws IOException {
120      // sum up value
121      int sum = 0;
122      // "key" is word , "value" is sum
123      // why values.hasNext(), not key.hasNext()
124      while (values.hasNext()) { 
125        // next() will return this value and pointer to next event \
126        //  IntWritable.get() will transfer IntWritable to Int
127        sum += values.next().get(); 
128      }
129      // IntWritable.set(int) will transfer Int to IntWritable
130      SumValue.set(sum);
131      // hense we set outputPath in main, the output.collect will put
132      //  data in Hadoop
133      output.collect(key, SumValue);
134    }
135  }
136
137  private WordCountFromHBase() {
138  }
139
140  /**
141   * Runs the demo.
142   */
143  public static void main(String[] args) throws IOException {
144   
145
146    int mapTasks = 1;
147    int reduceTasks = 1;
148    // initialize job;
149    JobConf conf = new JobConf(WordCountFromHBase.class);
150    // TableMap.initJob will build HBase code \
151    //  "org.apache.hadoop.hbase.mapred.TableMap".initJob \
152    //  (Table_name,column_string,Which_class_will_use,job_configure);
153    TableMap.initJob(WordCountIntoHBase.Table_Name,
154        WordCountIntoHBase.colstr, MapClass.class, conf);
155    conf.setJobName(WordCountIntoHBase.Table_Name + "store");
156    conf.setNumMapTasks(mapTasks);
157    conf.setNumReduceTasks(reduceTasks);
158   
159    //Set the key class for the job output data.
160    conf.setOutputKeyClass(Text.class);
161    //Set the value class for job outputs.
162    conf.setOutputValueClass(IntWritable.class);
163    // MapperClass,CombinerClass,ReducerClass are essential
164    conf.setMapperClass(MapClass.class);
165    conf.setCombinerClass(ReduceClass.class);
166    conf.setReducerClass(ReduceClass.class);
167    // input is Hbase format => TableInputFormat
168    conf.setInputFormat(TableInputFormat.class);
169    conf.setOutputPath(new Path(outputPath));
170//     delete the old path with the same name
171    FileSystem.get(conf).delete(new Path(outputPath));
172    JobClient.runJob(conf);
173  }
174}
Note: See TracBrowser for help on using the repository browser.