/** * Program: WordCountFromHBase.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 07/02/2008 * Upgrade to 0.17 */ /** * Purpose : * Word counting from Hbase then store result in Hadoop file system * * HowToUse : * Make sure Hadoop file system are running and HBase has correct data. * Suggest to run WordCountIntoHBase first. * finally, modify these setup parameters and run. * * Check Result: * * inspect http://localhost:50070 by web explorer */ package tw.org.nchc.code; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableInputFormat; import org.apache.hadoop.hbase.mapred.TableMap; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; @SuppressWarnings("unused") public class WordCountFromHBase { /* setup parameters */ // set the output path static String outputPath = "counts2"; // org.apache.hadoop.hbase.mapred.TableMap \ // TableMap \ // Scan an HBase table to sort by a specified sort column. \ // If the column does not exist, the record is not passed to Reduce.; private static class MapClass extends TableMap { // set one as (IntWritable)1 private final static IntWritable one = new IntWritable(1); // set column private final static Text textcol = new Text(WordCountIntoHBase.colstr); private Text word = new Text(); // TableMap is a interface, map is a abstract method. now, we should \ // inprement map() at here, format is : \ // map(HStoreKey key, MapWritable value, \ // OutputCollector output, Reporter reporter) ; // Call a user defined function on a single HBase record, \ // represented by a key and its associated record value. ; public void map(HStoreKey key, MapWritable cols, OutputCollector output, Reporter reporter) throws IOException { // // The first get() is : Writable <- get(Object key) \ // get in interface Map ; // Use ImmutableBytesWritable to downcast Writable \ // The second get() is : byte[] <- get() \ // Get the data from the BytesWritable. ; // Text.decode is parse UTF-8 code to a String ; // per "line" is per row data in HTable String line = Text.decode( ((ImmutableBytesWritable) cols.get(textcol) ) .get() ); //let us know what is "line" /* RandomAccessFile raf = new RandomAccessFile("/home/waue/mr-result.txt","rw"); raf.seek(raf.length()); // move pointer to end raf.write(("\n"+line).getBytes()); raf.close(); *///end // the result is the contents of merged files " //StringTokenizer will divide a line into a word StringTokenizer itr = new StringTokenizer(line); // set every word as one while (itr.hasMoreTokens()) { // nextToken will return this value in String and point to next \ // Text.set() = Set to contain the contents of a string. word.set(itr.nextToken()); // OutputCollector.collect = collect(K key, V value) \ // Adds a key/value pair to the output. output.collect(word, one); } } } // reducer: sums up all the counts private static class ReduceClass extends MapReduceBase implements Reducer { // reuse objects private final static IntWritable SumValue = new IntWritable(); // this sample's reduce() format is the same as map() \ // reduce is a method waiting for implement \ // four type in this sample is (Text , Iterator, \ // OutputCollector , Reporter ) ; public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { // sum up value int sum = 0; // "key" is word , "value" is sum // why values.hasNext(), not key.hasNext() while (values.hasNext()) { // next() will return this value and pointer to next event \ // IntWritable.get() will transfer IntWritable to Int sum += values.next().get(); } // IntWritable.set(int) will transfer Int to IntWritable SumValue.set(sum); // hense we set outputPath in main, the output.collect will put // data in Hadoop output.collect(key, SumValue); } } private WordCountFromHBase() { } /** * Runs the demo. */ public static void main(String[] args) throws IOException { int mapTasks = 1; int reduceTasks = 1; // initialize job; JobConf conf = new JobConf(WordCountFromHBase.class); // TableMap.initJob will build HBase code \ // "org.apache.hadoop.hbase.mapred.TableMap".initJob \ // (Table_name,column_string,Which_class_will_use,job_configure); TableMap.initJob(WordCountIntoHBase.Table_Name, WordCountIntoHBase.colstr, MapClass.class, conf); conf.setJobName(WordCountIntoHBase.Table_Name + "store"); conf.setNumMapTasks(mapTasks); conf.setNumReduceTasks(reduceTasks); //Set the key class for the job output data. conf.setOutputKeyClass(Text.class); //Set the value class for job outputs. conf.setOutputValueClass(IntWritable.class); // MapperClass,CombinerClass,ReducerClass are essential conf.setMapperClass(MapClass.class); conf.setCombinerClass(ReduceClass.class); conf.setReducerClass(ReduceClass.class); // input is Hbase format => TableInputFormat conf.setInputFormat(TableInputFormat.class); conf.setOutputPath(new Path(outputPath)); // delete the old path with the same name FileSystem.get(conf).delete(new Path(outputPath)); JobClient.runJob(conf); } }