/** * Program: HBaseRecord.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 06/01/2008 */ /** * Purpose : * Parse your record and then store in HBase. * * HowToUse : * Make sure Hadoop file system and Hbase are running correctly. * 1. put test.txt in t1 directory which content is --------------- name:locate:years waue:taiwan:1981 shellon:taiwan:1981 --------------- * 2. hadoop_root/$ bin/hadoop dfs -put t1 t1 * 3. hbase_root/$ bin/hbase shell * 4. hql > create table t1_table("person"); * 5. Come to Eclipse and run this code, and we will let database as that t1_table -> person ---------------- | name | locate | years | | waue | taiwan | 1981 | | shellon | taiwan | 1981 | ---------------- * Check Result: * Go to hbase console, type : * hql > select * from t1_table; 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key +-------------------------+-------------------------+-------------------------+ | Row | Column | Cell | +-------------------------+-------------------------+-------------------------+ | 0 | person:locate | locate | +-------------------------+-------------------------+-------------------------+ | 0 | person:name | name | +-------------------------+-------------------------+-------------------------+ | 0 | person:years | years | +-------------------------+-------------------------+-------------------------+ | 19 | person:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 19 | person:name | waue | +-------------------------+-------------------------+-------------------------+ | 19 | person:years | 1981 | +-------------------------+-------------------------+-------------------------+ | 36 | person:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 36 | person:name | shellon | +-------------------------+-------------------------+-------------------------+ | 36 | person:years | 1981 | +-------------------------+-------------------------+-------------------------+ 3 row(s) in set. (0.04 sec) */ package tw.org.nchc.code; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableReduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; public class HBaseRecordPro { /* Denify parameter */ // file path in hadoop file system (not phisical file system) private String file_path = "/home/waue/test.txt"; // setup MapTask and Reduce Task private final static String bf = "person:"; private final String table_name = "testpro"; private final static String sp = ":"; private static String[] cf; private static class ReduceClass extends TableReduce { // on this sample, map is nonuse, we use reduce to handle public void reduce(LongWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { // this map holds the columns per row MapWritable map = new MapWritable(); // values.next().getByte() can get value and transfer to byte form, String stro = new String(values.next().getBytes()); String str[] = stro.split(sp); int length = cf.length; // Column id is created dymanically, Text[] col_n = new Text[length]; byte[][] b_l = new byte[length][]; // contents must be ImmutableBytesWritable ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length]; map.clear(); for(int i = 0; i < length; i++){ col_n[i] = new Text(bf+cf[i]); b_l[i] = str[i].getBytes(); w_l[i] = new ImmutableBytesWritable(b_l[i]); // populate the current row map.put(col_n[i], w_l[i]); } // add the row with the key as the row id output.collect(new Text(key.toString()), map); } } private HBaseRecordPro() { } String parseFirstLine(String in, String ou) throws IOException { BufferedReader fi = new BufferedReader(new FileReader(new File(in))); BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou))); String first_line,data; first_line = fi.readLine(); do{ data = fi.readLine(); if( data == null){ break; }else{ fw.write(data+"\n"); } }while(true); fw.close(); return first_line; } /** * Runs the demo. */ public static void main(String[] args) throws IOException { final String file_tmp = "/tmp/HBaseRecord.text.tmp"; final int mapTasks = 1; final int reduceTasks = 1; String[] column_family = {bf}; HBaseRecordPro setup = new HBaseRecordPro(); String first_line = setup.parseFirstLine(setup.file_path, file_tmp); System.out.println(first_line); HBaseRecordPro.cf = first_line.split(sp); //test for(int i =0 ; i< cf.length; i++){ System.out.println("column["+i+"]"+bf+cf[i]); } BuildHTable build_table = new BuildHTable(setup.table_name,column_family); if (!build_table.checkTableExist(setup.table_name)) { if (!build_table.createTable()) { System.out.println("create table error !"); } } else { System.out.println("Table \"" + setup.table_name + "\" has already existed !"); } JobConf conf = new JobConf(HBaseRecordPro.class); FileSystem fileconf = FileSystem.get(conf); fileconf.copyFromLocalFile(true,new Path(file_tmp), new Path(file_tmp)); //Job name; you can modify to any you like conf.setJobName("PersonDataBase"); // Hbase table name must be correct , in our profile is t1_table TableReduce.initJob(setup.table_name, ReduceClass.class, conf); // below are map-reduce profile conf.setNumMapTasks(mapTasks); conf.setNumReduceTasks(reduceTasks); conf.setInputPath(new Path(file_tmp)); conf.setMapperClass(IdentityMapper.class); conf.setCombinerClass(IdentityReducer.class); conf.setReducerClass(ReduceClass.class); JobClient.runJob(conf); } }