/** * Program: WordCountFromHBase.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 06/10/2008 */ /** * Purpose : * Store the result of WordCountIntoHbase.java from Hbase to Hadoop file system * * HowToUse : * Make sure Hadoop file system and HBase are running correctly. * Then run the program with BuildHTable.java after \ * modifying these setup parameters. * * Check Result: * inspect http://localhost:60070 by web explorer */ package tw.org.nchc.demo; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import java.io.FileOutputStream; import java.io.File; import java.io.RandomAccessFile; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableInputFormat; import org.apache.hadoop.hbase.mapred.TableMap; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; @SuppressWarnings("unused") public class WordCountFromHBase { /* setup parameters */ // set the output path static String outputPath = "counts2"; // org.apache.hadoop.hbase.mapred.TableMap \ // TableMap \ // Scan an HBase table to sort by a specified sort column. \ // If the column does not exist, the record is not passed to Reduce.; private static class MapClass extends TableMap { // set one as (IntWritable)1 private final static IntWritable one = new IntWritable(1); // set column private final static Text textcol = new Text(WordCountIntoHBase.colstr); private Text word = new Text(); // TableMap is a interface, map is a abstract method. now, we should \ // inprement map() at here, format is : \ // map(HStoreKey key, MapWritable value, \ // OutputCollector output, Reporter reporter) ; // Call a user defined function on a single HBase record, \ // represented by a key and its associated record value. ; public void map(HStoreKey key, MapWritable cols, OutputCollector output, Reporter reporter) throws IOException { // // The first get() is : Writable <- get(Object key) \ // get in interface Map ; // Use ImmutableBytesWritable to downcast Writable \ // The second get() is : byte[] <- get() \ // Get the data from the BytesWritable. ; // Text.decode is parse UTF-8 code to a String ; // per "line" is per row data in HTable String line = Text.decode( ((ImmutableBytesWritable) cols.get(textcol) ) .get() ); //let us know what is "line" /* RandomAccessFile raf = new RandomAccessFile("/home/waue/mr-result.txt","rw"); raf.seek(raf.length()); // move pointer to end raf.write(("\n"+line).getBytes()); raf.close(); *///end // the result is the contents of merged files " StringTokenizer itr = new StringTokenizer(line); // set every word as one while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } // reducer: sums up all the counts private static class ReduceClass extends MapReduceBase implements Reducer { // reuse objects private final static IntWritable SumValue = new IntWritable(); public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { // sum up values int sum = 0; while (values.hasNext()) { sum += values.next().get(); } SumValue.set(sum); output.collect(key, SumValue); } } private WordCountFromHBase() { } /** * Runs the demo. */ public static void main(String[] args) throws IOException { int mapTasks = 1; int reduceTasks = 1; // initialize job; JobConf conf = new JobConf(WordCountFromHBase.class); // TableMap.initJob will build HBase code \ // "org.apache.hadoop.hbase.mapred.TableMap".initJob \ // (Table_name,column_string,Which_class_will_use,job_configure); TableMap.initJob(WordCountIntoHBase.Table_Name, WordCountIntoHBase.colstr, MapClass.class, conf); conf.setJobName(WordCountIntoHBase.Table_Name + "store"); conf.setNumMapTasks(mapTasks); conf.setNumReduceTasks(reduceTasks); //Set the key class for the job output data. conf.setOutputKeyClass(Text.class); //Set the value class for job outputs. conf.setOutputValueClass(IntWritable.class); // MapperClass,CombinerClass,ReducerClass are essential conf.setMapperClass(MapClass.class); conf.setCombinerClass(ReduceClass.class); conf.setReducerClass(ReduceClass.class); // input is Hbase format => TableInputFormat conf.setInputFormat(TableInputFormat.class); conf.setOutputPath(new Path(outputPath)); // delete the old path with the same name FileSystem.get(conf).delete(new Path(outputPath)); JobClient.runJob(conf); } }