說明:
This sample code will put the indicate data to Hbase.
執行方式 :
1. put test.txt in t1 directory which content is --------------- name:locate:years waue:taiwan:1981 shellon:taiwan:1981 --------------- 2. hadoop_root/$ bin/hadoop dfs -put t1 t1 3. hbase_root/$ bin/hbase shell 4. hql > create table t1_table("person"); 5. Come to Eclipse and run this code, and we will let database as that t1_table -> person ---------------- | name | locate | years | ---------------- | waue | taiwan | 1981 | ---------------- | shellon | taiwan | 1981 | * 6. Go to hbase console, type : hql > select * from t1_table;
結果:
08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key +-------------------------+-------------------------+-------------------------+ | Row | Column | Cell | +-------------------------+-------------------------+-------------------------+ | 0 | person:locate | locate | +-------------------------+-------------------------+-------------------------+ | 0 | person:name | name | +-------------------------+-------------------------+-------------------------+ | 0 | person:years | years | +-------------------------+-------------------------+-------------------------+ | 19 | person:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 19 | person:name | waue | +-------------------------+-------------------------+-------------------------+ | 19 | person:years | 1981 | +-------------------------+-------------------------+-------------------------+ | 36 | person:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 36 | person:name | shellon | +-------------------------+-------------------------+-------------------------+ | 36 | person:years | 1981 | +-------------------------+-------------------------+-------------------------+ 3 row(s) in set. (0.04 sec)
程式碼 :
/* * NCHC Hbase with map reduce sample code * DemoHBaseSlink.java */ package tw.org.nchc.demo; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableReduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; public class DemoHBaseSink { private static class ReduceClass extends TableReduce<LongWritable, Text> { // Column id is created dymanically, private static final Text col_name = new Text("person:name"); private static final Text col_local = new Text("person:locate"); private static final Text col_year = new Text("person:years"); // this map holds the columns per row private MapWritable map = new MapWritable(); // on this sample, map is nonuse, we use reduce to handle public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<Text, MapWritable> output, Reporter reporter) throws IOException { // values.next().getByte() can get value and transfer to byte form, there is an other way that let decode() // to substitude getByte() String stro = new String(values.next().getBytes()); String str[] = stro.split(":"); byte b_local[] = str[0].getBytes(); byte b_name[] = str[1].getBytes(); byte b_year[] = str[2].getBytes(); // contents must be ImmutableBytesWritable ImmutableBytesWritable w_local = new ImmutableBytesWritable( b_local); ImmutableBytesWritable w_name = new ImmutableBytesWritable( b_name ); ImmutableBytesWritable w_year = new ImmutableBytesWritable( b_year ); // populate the current row map.clear(); map.put(col_name, w_local); map.put(col_local, w_name); map.put(col_year, w_year); // add the row with the key as the row id output.collect(new Text(key.toString()), map); } } private DemoHBaseSink() { } /** * Runs the demo. */ public static void main(String[] args) throws IOException { // which path of input files in Hadoop file system String file_path = "/user/waue/t1"; int mapTasks = 1; int reduceTasks = 1; JobConf conf = new JobConf(DemoHBaseSink.class); //Job name; you can modify to any you like conf.setJobName("DemoPersonBase"); // Hbase table name must be correct , in our profile is t1_table TableReduce.initJob("t1_table", ReduceClass.class, conf); // below are map-reduce profile conf.setNumMapTasks(mapTasks); conf.setNumReduceTasks(reduceTasks); conf.setInputPath(new Path(file_path)); conf.setMapperClass(IdentityMapper.class); conf.setCombinerClass(IdentityReducer.class); conf.setReducerClass(ReduceClass.class); JobClient.runJob(conf); } }
Last modified 17 years ago
Last modified on Jun 6, 2008, 2:08:03 PM