/** * Program: DemoHBaseSink.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 07/02/2008 * Upgrade to 0.17 * Re-code from : Cloud9: A MapReduce Library for Hadoop */ /* * Cloud9: A MapReduce Library for Hadoop */ package tw.org.nchc.demo; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableReduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import tw.org.nchc.code.Convert; /** * */ public class DemoHBaseSink { private static class ReduceClass extends TableReduce { // this is the column we're going to be writing private static final Text col = new Text("default:text"); // this map holds the columns per row private MapWritable map = new MapWritable(); public void reduce(LongWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { // contents must be ImmutableBytesWritable ImmutableBytesWritable bytes = new ImmutableBytesWritable(values .next().getBytes()); // populate the current row map.clear(); map.put(col, bytes); // add the row with the key as the row id output.collect(new Text(key.toString()), map); } } private DemoHBaseSink() { } /** * Runs the demo. */ public static void main(String[] args) throws IOException { String filename = "/shared/sample"; int mapTasks = 1; int reduceTasks = 1; JobConf conf = new JobConf(DemoHBaseSink.class); conf.setJobName("wordcount"); // must initialize the TableReduce before running job TableReduce.initJob("test", ReduceClass.class, conf); conf.setNumMapTasks(mapTasks); conf.setNumReduceTasks(reduceTasks); // 0.16 // conf.setInputPath(new Path(filename)); Convert.setInputPath(conf, new Path(filename)); conf.setMapperClass(IdentityMapper.class); conf.setCombinerClass(IdentityReducer.class); conf.setReducerClass(ReduceClass.class); JobClient.runJob(conf); } }