source: sample/WordCountFromHBase.java @ 11

Last change on this file since 11 was 9, checked in by waue, 16 years ago

comment

File size: 6.3 KB
Line 
1/**
2 * Program: WordCountFromHBase.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 06/13/2008
6 */
7
8/**
9 * Purpose :
10 *  Word counting from Hbase then store result in Hadoop file system
11 *
12 * HowToUse :
13 *  Make sure Hadoop file system are running and HBase has correct data.
14 *  Suggest to run WordCountIntoHBase first.
15 *  finally, modify these setup parameters and run.
16 *
17 * Check Result:
18 * 
19 *  inspect http://localhost:50070 by web explorer
20 */
21
22package tw.org.nchc.code;
23
24import java.io.IOException;
25import java.util.Iterator;
26import java.util.StringTokenizer;
27import java.io.FileOutputStream;
28import java.io.File;
29import java.io.RandomAccessFile;
30import org.apache.hadoop.fs.FileSystem;
31import org.apache.hadoop.fs.Path;
32import org.apache.hadoop.hbase.HStoreKey;
33import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34import org.apache.hadoop.hbase.mapred.TableInputFormat;
35import org.apache.hadoop.hbase.mapred.TableMap;
36import org.apache.hadoop.io.IntWritable;
37import org.apache.hadoop.io.MapWritable;
38import org.apache.hadoop.io.Text;
39import org.apache.hadoop.mapred.JobClient;
40import org.apache.hadoop.mapred.JobConf;
41import org.apache.hadoop.mapred.MapReduceBase;
42import org.apache.hadoop.mapred.OutputCollector;
43import org.apache.hadoop.mapred.Reducer;
44import org.apache.hadoop.mapred.Reporter;
45@SuppressWarnings("unused")
46
47public class WordCountFromHBase {
48  /* setup parameters */
49  // set the output path
50  static String outputPath = "counts2";
51
52  // org.apache.hadoop.hbase.mapred.TableMap<K,V>  \
53  // TableMap<K extends org.apache.hadoop.io.WritableComparable, \
54  //  V extends org.apache.hadoop.io.Writable> \
55  // Scan an HBase table to sort by a specified sort column. \
56  // If the column does not exist, the record is not passed to Reduce.;
57  private static class MapClass extends TableMap<Text, IntWritable> {
58
59    // set one as (IntWritable)1
60    private final static IntWritable one = new IntWritable(1);
61    // set column
62    private final static Text textcol = new Text(WordCountIntoHBase.colstr);
63    private Text word = new Text();   
64    // TableMap is a interface, map is a abstract method. now, we should \
65    //  inprement map() at here, format is : \
66    // map(HStoreKey key, MapWritable value,  \
67    //  OutputCollector<K,V> output, Reporter reporter) ;
68        // Call a user defined function on a single HBase record, \ 
69    //  represented by a key and its associated record value. ;
70    public void map(HStoreKey key, MapWritable cols,
71        OutputCollector<Text, IntWritable> output, Reporter reporter)
72        throws IOException {
73      //
74      // The first get() is : Writable <- get(Object key) \
75      //  get in interface Map<Writable,Writable>  ;
76      // Use ImmutableBytesWritable to downcast Writable \
77      // The second get() is : byte[] <- get() \
78      //  Get the data from the BytesWritable. ;
79      // Text.decode is parse UTF-8 code to a String ;
80      // per "line" is per row data in HTable
81      String line = Text.decode( ((ImmutableBytesWritable) cols.get(textcol) )
82          .get() );
83     
84      //let us know what is "line"
85      /*
86      RandomAccessFile raf =
87        new RandomAccessFile("/home/waue/mr-result.txt","rw");
88      raf.seek(raf.length()); // move pointer to end
89      raf.write(("\n"+line).getBytes());
90      raf.close();
91      *///end
92      // the result is the contents of merged files "
93     
94      //StringTokenizer will divide a line into a word 
95      StringTokenizer itr = new StringTokenizer(line);
96      // set every word as one
97      while (itr.hasMoreTokens()) {
98        // nextToken will return this value in String and point to next \
99        // Text.set() = Set to contain the contents of a string.
100        word.set(itr.nextToken()); 
101        // OutputCollector.collect = collect(K key, V value) \
102        //  Adds a key/value pair to the output.
103        output.collect(word, one);
104      }
105    }
106  }
107
108  // reducer: sums up all the counts
109  private static class ReduceClass extends MapReduceBase implements
110      Reducer<Text, IntWritable, Text, IntWritable> {
111
112    // reuse objects
113    private final static IntWritable SumValue = new IntWritable();
114   
115    // this sample's reduce() format is the same as map() \
116    //  reduce is a method waiting for implement \
117    //  four type in this sample is (Text , Iterator<IntWritable>, \
118    //    OutputCollector<Text, IntWritable> , Reporter ) ;
119    public void reduce(Text key, Iterator<IntWritable> values,
120        OutputCollector<Text, IntWritable> output, Reporter reporter)
121        throws IOException {
122      // sum up value
123      int sum = 0;
124      // "key" is word , "value" is sum
125      // why values.hasNext(), not key.hasNext()
126      while (values.hasNext()) { 
127        // next() will return this value and pointer to next event \
128        //  IntWritable.get() will transfer IntWritable to Int
129        sum += values.next().get(); 
130      }
131      // IntWritable.set(int) will transfer Int to IntWritable
132      SumValue.set(sum);
133      // hense we set outputPath in main, the output.collect will put
134      //  data in Hadoop
135      output.collect(key, SumValue);
136    }
137  }
138
139  private WordCountFromHBase() {
140  }
141
142  /**
143   * Runs the demo.
144   */
145  public static void main(String[] args) throws IOException {
146   
147
148    int mapTasks = 1;
149    int reduceTasks = 1;
150    // initialize job;
151    JobConf conf = new JobConf(WordCountFromHBase.class);
152    // TableMap.initJob will build HBase code \
153    //  "org.apache.hadoop.hbase.mapred.TableMap".initJob \
154    //  (Table_name,column_string,Which_class_will_use,job_configure);
155    TableMap.initJob(WordCountIntoHBase.Table_Name,
156        WordCountIntoHBase.colstr, MapClass.class, conf);
157    conf.setJobName(WordCountIntoHBase.Table_Name + "store");
158    conf.setNumMapTasks(mapTasks);
159    conf.setNumReduceTasks(reduceTasks);
160   
161    //Set the key class for the job output data.
162    conf.setOutputKeyClass(Text.class);
163    //Set the value class for job outputs.
164    conf.setOutputValueClass(IntWritable.class);
165    // MapperClass,CombinerClass,ReducerClass are essential
166    conf.setMapperClass(MapClass.class);
167    conf.setCombinerClass(ReduceClass.class);
168    conf.setReducerClass(ReduceClass.class);
169    // input is Hbase format => TableInputFormat
170    conf.setInputFormat(TableInputFormat.class);
171    conf.setOutputPath(new Path(outputPath));
172//     delete the old path with the same name
173    FileSystem.get(conf).delete(new Path(outputPath));
174    JobClient.runJob(conf);
175  }
176}
Note: See TracBrowser for help on using the repository browser.