| [20] | 1 | /** | 
|---|
|  | 2 | * Program: DemoHBaseSource.java | 
|---|
|  | 3 | * Editor: Waue Chen | 
|---|
|  | 4 | * From :  NCHC. Taiwn | 
|---|
|  | 5 | * Last Update Date: 07/02/2008 | 
|---|
|  | 6 | * Upgrade to 0.17 | 
|---|
|  | 7 | * Re-code from : Cloud9: A MapReduce Library for Hadoop | 
|---|
|  | 8 | */ | 
|---|
|  | 9 |  | 
|---|
|  | 10 | package tw.org.nchc.demo; | 
|---|
|  | 11 |  | 
|---|
|  | 12 | import java.io.IOException; | 
|---|
|  | 13 | import java.util.Iterator; | 
|---|
|  | 14 | import java.util.StringTokenizer; | 
|---|
|  | 15 |  | 
|---|
|  | 16 | import org.apache.hadoop.fs.Path; | 
|---|
|  | 17 | import org.apache.hadoop.hbase.HStoreKey; | 
|---|
|  | 18 | import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | 
|---|
|  | 19 | import org.apache.hadoop.hbase.mapred.TableInputFormat; | 
|---|
|  | 20 | import org.apache.hadoop.hbase.mapred.TableMap; | 
|---|
|  | 21 | import org.apache.hadoop.io.IntWritable; | 
|---|
|  | 22 | import org.apache.hadoop.io.MapWritable; | 
|---|
|  | 23 | import org.apache.hadoop.io.Text; | 
|---|
|  | 24 | import org.apache.hadoop.mapred.JobClient; | 
|---|
|  | 25 | import org.apache.hadoop.mapred.JobConf; | 
|---|
|  | 26 | import org.apache.hadoop.mapred.MapReduceBase; | 
|---|
|  | 27 | import org.apache.hadoop.mapred.OutputCollector; | 
|---|
|  | 28 | import org.apache.hadoop.mapred.Reducer; | 
|---|
|  | 29 | import org.apache.hadoop.mapred.Reporter; | 
|---|
|  | 30 |  | 
|---|
|  | 31 | import tw.org.nchc.code.Convert; | 
|---|
|  | 32 |  | 
|---|
|  | 33 | /** | 
|---|
|  | 34 | * | 
|---|
|  | 35 | */ | 
|---|
|  | 36 | public class DemoHBaseSource { | 
|---|
|  | 37 |  | 
|---|
|  | 38 | // mapper: emits (token, 1) for every word occurrence | 
|---|
|  | 39 | private static class MapClass extends TableMap<Text, IntWritable> { | 
|---|
|  | 40 |  | 
|---|
|  | 41 | // reuse objects to save overhead of object creation | 
|---|
|  | 42 | private final static IntWritable one = new IntWritable(1); | 
|---|
|  | 43 | private final static Text textcol = new Text("default:text"); | 
|---|
|  | 44 | private Text word = new Text(); | 
|---|
|  | 45 |  | 
|---|
|  | 46 | public void map(HStoreKey key, MapWritable cols, | 
|---|
|  | 47 | OutputCollector<Text, IntWritable> output, Reporter reporter) | 
|---|
|  | 48 | throws IOException { | 
|---|
|  | 49 |  | 
|---|
|  | 50 | String line = Text.decode(((ImmutableBytesWritable) cols | 
|---|
|  | 51 | .get(textcol)).get()); | 
|---|
|  | 52 |  | 
|---|
|  | 53 | StringTokenizer itr = new StringTokenizer(line); | 
|---|
|  | 54 | while (itr.hasMoreTokens()) { | 
|---|
|  | 55 | word.set(itr.nextToken()); | 
|---|
|  | 56 | output.collect(word, one); | 
|---|
|  | 57 | } | 
|---|
|  | 58 | } | 
|---|
|  | 59 | } | 
|---|
|  | 60 |  | 
|---|
|  | 61 | // reducer: sums up all the counts | 
|---|
|  | 62 | private static class ReduceClass extends MapReduceBase implements | 
|---|
|  | 63 | Reducer<Text, IntWritable, Text, IntWritable> { | 
|---|
|  | 64 |  | 
|---|
|  | 65 | // reuse objects | 
|---|
|  | 66 | private final static IntWritable SumValue = new IntWritable(); | 
|---|
|  | 67 |  | 
|---|
|  | 68 | public void reduce(Text key, Iterator<IntWritable> values, | 
|---|
|  | 69 | OutputCollector<Text, IntWritable> output, Reporter reporter) | 
|---|
|  | 70 | throws IOException { | 
|---|
|  | 71 | // sum up values | 
|---|
|  | 72 | int sum = 0; | 
|---|
|  | 73 | while (values.hasNext()) { | 
|---|
|  | 74 | sum += values.next().get(); | 
|---|
|  | 75 | } | 
|---|
|  | 76 | SumValue.set(sum); | 
|---|
|  | 77 | output.collect(key, SumValue); | 
|---|
|  | 78 | } | 
|---|
|  | 79 | } | 
|---|
|  | 80 |  | 
|---|
|  | 81 | private DemoHBaseSource() { | 
|---|
|  | 82 | } | 
|---|
|  | 83 |  | 
|---|
|  | 84 | /** | 
|---|
|  | 85 | * Runs the demo. | 
|---|
|  | 86 | */ | 
|---|
|  | 87 | public static void main(String[] args) throws IOException { | 
|---|
|  | 88 | String outputPath = "sample-counts2"; | 
|---|
|  | 89 |  | 
|---|
|  | 90 | int mapTasks = 1; | 
|---|
|  | 91 | int reduceTasks = 1; | 
|---|
|  | 92 |  | 
|---|
|  | 93 | JobConf conf = new JobConf(DemoHBaseSource.class); | 
|---|
|  | 94 |  | 
|---|
|  | 95 | TableMap.initJob("test", "default:text", MapClass.class, conf); | 
|---|
|  | 96 |  | 
|---|
|  | 97 | conf.setJobName("wordcount"); | 
|---|
|  | 98 |  | 
|---|
|  | 99 | conf.setNumMapTasks(mapTasks); | 
|---|
|  | 100 | conf.setNumReduceTasks(reduceTasks); | 
|---|
|  | 101 |  | 
|---|
|  | 102 | conf.setInputFormat(TableInputFormat.class); | 
|---|
|  | 103 |  | 
|---|
|  | 104 | conf.setOutputKeyClass(Text.class); | 
|---|
|  | 105 | conf.setOutputValueClass(IntWritable.class); | 
|---|
|  | 106 | //0.16 | 
|---|
|  | 107 | //    conf.setOutputPath(new Path(outputPath)); | 
|---|
|  | 108 | Convert.setOutputPath(conf,new Path(outputPath)); | 
|---|
|  | 109 |  | 
|---|
|  | 110 | conf.setMapperClass(MapClass.class); | 
|---|
|  | 111 | conf.setCombinerClass(ReduceClass.class); | 
|---|
|  | 112 | conf.setReducerClass(ReduceClass.class); | 
|---|
|  | 113 |  | 
|---|
|  | 114 | JobClient.runJob(conf); | 
|---|
|  | 115 | } | 
|---|
|  | 116 | } | 
|---|