/*
* NCHC Hbase with map reduce sample code
* DemoHBaseSlink.java
*/
package tw.org.nchc.demo;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
/**
* This sample code will put the indicate data to Hbase.
* 1. put test.txt in t1 directory which content is
---------------
name:locate:years
waue:taiwan:1981
shellon:taiwan:1981
---------------
* 2. hadoop_root/$ bin/hadoop dfs -put t1 t1
* 3. hbase_root/$ bin/hbase shell
* 4. hql > create table t1_table("person");
* 5. Come to Eclipse and run this code, and we will let database as that
t1_table -> person
----------------
| name | locate | years |
----------------
| waue | taiwan | 1981 |
----------------
| shellon | taiwan | 1981 |
* 6. Go to hbase console, type : hql > select * from t1_table;
08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key
+-------------------------+-------------------------+-------------------------+
| Row | Column | Cell |
+-------------------------+-------------------------+-------------------------+
| 0 | person:locate | locate |
+-------------------------+-------------------------+-------------------------+
| 0 | person:name | name |
+-------------------------+-------------------------+-------------------------+
| 0 | person:years | years |
+-------------------------+-------------------------+-------------------------+
| 19 | person:locate | taiwan |
+-------------------------+-------------------------+-------------------------+
| 19 | person:name | waue |
+-------------------------+-------------------------+-------------------------+
| 19 | person:years | 1981 |
+-------------------------+-------------------------+-------------------------+
| 36 | person:locate | taiwan |
+-------------------------+-------------------------+-------------------------+
| 36 | person:name | shellon |
+-------------------------+-------------------------+-------------------------+
| 36 | person:years | 1981 |
+-------------------------+-------------------------+-------------------------+
3 row(s) in set. (0.04 sec)
**/
public class DemoHBaseSink {
private static class ReduceClass extends TableReduce<LongWritable, Text> {
// Column id is created dymanically,
private static final Text col_name = new Text("person:name");
private static final Text col_local = new Text("person:locate");
private static final Text col_year = new Text("person:years");
// this map holds the columns per row
private MapWritable map = new MapWritable();
// on this sample, map is nonuse, we use reduce to handle
public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<Text, MapWritable> output, Reporter reporter)
throws IOException {
// values.next().getByte() can get value and transfer to byte form, there is an other way that let decode()
// to substitude getByte()
String stro = new String(values.next().getBytes());
String str[] = stro.split(":");
byte b_local[] = str[0].getBytes();
byte b_name[] = str[1].getBytes();
byte b_year[] = str[2].getBytes();
// contents must be ImmutableBytesWritable
ImmutableBytesWritable w_local = new ImmutableBytesWritable( b_local);
ImmutableBytesWritable w_name = new ImmutableBytesWritable( b_name );
ImmutableBytesWritable w_year = new ImmutableBytesWritable( b_year );
// populate the current row
map.clear();
map.put(col_name, w_local);
map.put(col_local, w_name);
map.put(col_year, w_year);
// add the row with the key as the row id
output.collect(new Text(key.toString()), map);
}
}
private DemoHBaseSink() {
}
/**
* Runs the demo.
*/
public static void main(String[] args) throws IOException {
// which path of input files in Hadoop file system
String file_path = "/user/waue/t1";
int mapTasks = 1;
int reduceTasks = 1;
JobConf conf = new JobConf(DemoHBaseSink.class);
//Job name; you can modify to any you like
conf.setJobName("DemoPersonBase");
// Hbase table name must be correct , in our profile is t1_table
TableReduce.initJob("t1_table", ReduceClass.class, conf);
// below are map-reduce profile
conf.setNumMapTasks(mapTasks);
conf.setNumReduceTasks(reduceTasks);
conf.setInputPath(new Path(file_path));
conf.setMapperClass(IdentityMapper.class);
conf.setCombinerClass(IdentityReducer.class);
conf.setReducerClass(ReduceClass.class);
JobClient.runJob(conf);
}
}