Index: /sample/HBaseRecord2.java
===================================================================
--- /sample/HBaseRecord2.java	(revision 10)
+++ /sample/HBaseRecord2.java	(revision 10)
@@ -0,0 +1,184 @@
+/**
+ * Program: HBaseRecord.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 06/13/2008
+ */
+
+/**
+ * Purpose : 
+ * 	1.Auto generate HTable 
+ *  2.Parse your record and then store in HBase.
+ * 
+ * HowToUse : 
+ * 	Make sure Hadoop file system and Hbase are running correctly.
+ * 	1. put test.txt in t1 directory which content is 
+	---------------
+	name:locate:years 
+	waue:taiwan:1981
+	shellon:taiwan:1981
+	---------------
+ * 	2. hadoop_root/$ bin/hadoop dfs -put t1 t1
+ * 	3. hbase_root/$ bin/hbase shell
+ * 	4. hql > create table t1_table("person");
+ * 	5. Come to Eclipse and run this code, and we will let database as that 
+ 	t1_table -> person
+	  ----------------
+	  |  name | locate | years |
+	  | waue  | taiwan | 1981 |
+	  | shellon | taiwan | 1981 |
+	  ----------------
+ * Check Result:
+ * 	Go to hbase console, type : 
+ * 		hql > select * from t1_table; 
+08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key 
++-------------------------+-------------------------+-------------------------+
+| Row                     | Column                  | Cell                    |
++-------------------------+-------------------------+-------------------------+
+| 0                       | person:locate           | locate                  |
++-------------------------+-------------------------+-------------------------+
+| 0                       | person:name             | name                    |
++-------------------------+-------------------------+-------------------------+
+| 0                       | person:years            | years                   |
++-------------------------+-------------------------+-------------------------+
+| 19                      | person:locate           | taiwan                  |
++-------------------------+-------------------------+-------------------------+
+| 19                      | person:name             | waue                    |
++-------------------------+-------------------------+-------------------------+
+| 19                      | person:years            | 1981                    |
++-------------------------+-------------------------+-------------------------+
+| 36                      | person:locate           | taiwan                  |
++-------------------------+-------------------------+-------------------------+
+| 36                      | person:name             | shellon                 |
++-------------------------+-------------------------+-------------------------+
+| 36                      | person:years            | 1981                    |
++-------------------------+-------------------------+-------------------------+
+3 row(s) in set. (0.04 sec)
+ */
+
+
+
+
+package tw.org.nchc.code;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+
+public class HBaseRecord2 {
+
+	/* Denify parameter */
+	// one column family: person; three column qualifier: name,locate,years
+	final String colstr;
+	// Hbase table name
+	static String[] col;
+	String Table_Name = "Record1";
+	//split character
+	static String sp = ":";
+	// file path in hadoop file system (not phisical file system)
+	String file_path = "/user/waue/t1";
+
+
+
+	public HBaseRecord2(){
+		colstr ="person:name,locate,years";
+	}
+	public HBaseRecord2(String str){
+		colstr = str; 
+	}
+
+	
+	private static class ReduceClass extends TableReduce<LongWritable, Text> {
+
+		// Column id is created dymanically, 
+		private static final Text col_name = new Text(baseId1);
+		private static final Text col_local = new Text(baseId2);
+		private static final Text col_year = new Text(baseId3);
+		
+		// this map holds the columns per row
+		private MapWritable map = new MapWritable();	
+		
+		// on this sample, map is nonuse, we use reduce to handle
+		public void reduce(LongWritable key, Iterator<Text> values,
+				OutputCollector<Text, MapWritable> output, Reporter reporter)
+				throws IOException {
+
+			// values.next().getByte() can get value and transfer to byte form, there is an other way that let decode()
+			// to substitude getByte() 
+			String stro = new String(values.next().getBytes());
+			String str[] = stro.split(sp);
+			byte b_local[] = str[0].getBytes();
+			byte b_name[] = str[1].getBytes();
+			byte b_year[] = str[2].getBytes();
+			
+			// contents must be ImmutableBytesWritable
+			ImmutableBytesWritable w_local = new ImmutableBytesWritable( b_local);
+			ImmutableBytesWritable w_name = new ImmutableBytesWritable( b_name );
+			ImmutableBytesWritable w_year = new ImmutableBytesWritable( b_year );
+
+			// populate the current row
+			map.clear();
+			map.put(col_name, w_local);
+			map.put(col_local, w_name);
+			map.put(col_year, w_year);
+
+			// add the row with the key as the row id
+			output.collect(new Text(key.toString()), map);
+		}
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		// parse colstr to split column family and column qualify
+		HBaseRecord2 work = new HBaseRecord2();
+		
+		String tmp[] = work.colstr.split(":");
+		String Column_Family = tmp[0]+":";
+		String CF[] = {Column_Family};
+		String CQ[] = tmp[2].split(",");
+		// check whether create table or not , we don't admit \ 
+		// the same name but different structure
+		
+		BuildHTable build_table = new BuildHTable(work.Table_Name,CF);
+		if (!build_table.checkTableExist(work.Table_Name)) {
+			if (!build_table.createTable()) {
+				System.out.println("create table error !");
+			}
+		}else{
+			System.out.println("Table \"" + work.Table_Name +"\" has already existed !");
+		}		
+
+		JobConf conf = new JobConf(HBaseRecord2.class);
+		int mapTasks = 1;
+		int reduceTasks = 1;
+		//Job name; you can modify to any you like  
+		conf.setJobName("NCHC_PersonDataBase");
+
+		// Hbase table name must be correct , in our profile is t1_table
+		TableReduce.initJob(work.Table_Name, ReduceClass.class, conf);
+		
+		// below are map-reduce profile
+		conf.setNumMapTasks(mapTasks);
+		conf.setNumReduceTasks(reduceTasks);
+		conf.setInputPath(new Path(work.file_path));
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setCombinerClass(IdentityReducer.class);
+		conf.setReducerClass(ReduceClass.class);
+		JobClient.runJob(conf);
+	}
+}
