source: sample/WordCountFromHBase.java @ 7

Last change on this file since 7 was 7, checked in by waue, 16 years ago

6/12 modify

File size: 5.4 KB
Line 
1/**
2 * Program: WordCountFromHBase.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 06/10/2008
6 */
7
8/**
9 * Purpose :
10 *  Store the result of WordCountIntoHbase.java from Hbase to Hadoop file system
11 *
12 * HowToUse :
13 *  Make sure Hadoop file system and HBase are running correctly.
14 *  Then run the program with BuildHTable.java after \
15 *  modifying these setup parameters.
16 *
17 * Check Result:
18 *  inspect http://localhost:60070 by web explorer
19 */
20
21package tw.org.nchc.demo;
22
23import java.io.IOException;
24import java.util.Iterator;
25import java.util.StringTokenizer;
26import java.io.FileOutputStream;
27import java.io.File;
28import java.io.RandomAccessFile;
29import org.apache.hadoop.fs.FileSystem;
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.hbase.HStoreKey;
32import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33import org.apache.hadoop.hbase.mapred.TableInputFormat;
34import org.apache.hadoop.hbase.mapred.TableMap;
35import org.apache.hadoop.io.IntWritable;
36import org.apache.hadoop.io.MapWritable;
37import org.apache.hadoop.io.Text;
38import org.apache.hadoop.mapred.JobClient;
39import org.apache.hadoop.mapred.JobConf;
40import org.apache.hadoop.mapred.MapReduceBase;
41import org.apache.hadoop.mapred.OutputCollector;
42import org.apache.hadoop.mapred.Reducer;
43import org.apache.hadoop.mapred.Reporter;
44@SuppressWarnings("unused")
45
46public class WordCountFromHBase {
47  /* setup parameters */
48  // set the output path
49  static String outputPath = "counts2";
50
51  // org.apache.hadoop.hbase.mapred.TableMap<K,V>  \
52  // TableMap<K extends org.apache.hadoop.io.WritableComparable, \
53  //  V extends org.apache.hadoop.io.Writable> \
54  // Scan an HBase table to sort by a specified sort column. \
55  // If the column does not exist, the record is not passed to Reduce.;
56  private static class MapClass extends TableMap<Text, IntWritable> {
57
58    // set one as (IntWritable)1
59    private final static IntWritable one = new IntWritable(1);
60    // set column
61    private final static Text textcol = new Text(WordCountIntoHBase.colstr);
62    private Text word = new Text();   
63    // TableMap is a interface, map is a abstract method. now, we should \
64    //  inprement map() at here, format is : \
65    // map(HStoreKey key, MapWritable value,  \
66    //  OutputCollector<K,V> output, Reporter reporter) ;
67        // Call a user defined function on a single HBase record, \ 
68    //  represented by a key and its associated record value. ;
69    public void map(HStoreKey key, MapWritable cols,
70        OutputCollector<Text, IntWritable> output, Reporter reporter)
71        throws IOException {
72      //
73      // The first get() is : Writable <- get(Object key) \
74      //  get in interface Map<Writable,Writable>  ;
75      // Use ImmutableBytesWritable to downcast Writable \
76      // The second get() is : byte[] <- get() \
77      //  Get the data from the BytesWritable. ;
78      // Text.decode is parse UTF-8 code to a String ;
79      // per "line" is per row data in HTable
80      String line = Text.decode( ((ImmutableBytesWritable) cols.get(textcol) )
81          .get() );
82      //let us know what is "line"
83      /*
84      RandomAccessFile raf =
85        new RandomAccessFile("/home/waue/mr-result.txt","rw");
86      raf.seek(raf.length()); // move pointer to end
87      raf.write(("\n"+line).getBytes());
88      raf.close();
89      *///end
90      // the result is the contents of merged files "
91     
92      StringTokenizer itr = new StringTokenizer(line);
93      // set every word as one
94      while (itr.hasMoreTokens()) {
95        word.set(itr.nextToken());       
96        output.collect(word, one);
97      }
98    }
99  }
100
101  // reducer: sums up all the counts
102  private static class ReduceClass extends MapReduceBase implements
103      Reducer<Text, IntWritable, Text, IntWritable> {
104
105    // reuse objects
106    private final static IntWritable SumValue = new IntWritable();
107
108    public void reduce(Text key, Iterator<IntWritable> values,
109        OutputCollector<Text, IntWritable> output, Reporter reporter)
110        throws IOException {
111      // sum up values
112      int sum = 0;
113      while (values.hasNext()) {
114        sum += values.next().get();
115      }
116      SumValue.set(sum);
117      output.collect(key, SumValue);
118    }
119  }
120
121  private WordCountFromHBase() {
122  }
123
124  /**
125   * Runs the demo.
126   */
127  public static void main(String[] args) throws IOException {
128   
129
130    int mapTasks = 1;
131    int reduceTasks = 1;
132    // initialize job;
133    JobConf conf = new JobConf(WordCountFromHBase.class);
134    // TableMap.initJob will build HBase code \
135    //  "org.apache.hadoop.hbase.mapred.TableMap".initJob \
136    //  (Table_name,column_string,Which_class_will_use,job_configure);
137    TableMap.initJob(WordCountIntoHBase.Table_Name,
138        WordCountIntoHBase.colstr, MapClass.class, conf);
139    conf.setJobName(WordCountIntoHBase.Table_Name + "store");
140    conf.setNumMapTasks(mapTasks);
141    conf.setNumReduceTasks(reduceTasks);
142   
143    //Set the key class for the job output data.
144    conf.setOutputKeyClass(Text.class);
145    //Set the value class for job outputs.
146    conf.setOutputValueClass(IntWritable.class);
147    // MapperClass,CombinerClass,ReducerClass are essential
148    conf.setMapperClass(MapClass.class);
149    conf.setCombinerClass(ReduceClass.class);
150    conf.setReducerClass(ReduceClass.class);
151    // input is Hbase format => TableInputFormat
152    conf.setInputFormat(TableInputFormat.class);
153    conf.setOutputPath(new Path(outputPath));
154//     delete the old path with the same name
155    FileSystem.get(conf).delete(new Path(outputPath));
156    JobClient.runJob(conf);
157  }
158}
Note: See TracBrowser for help on using the repository browser.