Index: rdCount.java
===================================================================
--- /WordCount.java	(revision 6)
+++ 	(revision )
@@ -1,96 +1,0 @@
-/*
- * map reduce sample code
- */
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-
-public class WordCount {
-
-	// mapper: emits (token, 1) for every word occurrence
-	private static class MapClass extends MapReduceBase implements
-			Mapper<LongWritable, Text, Text, IntWritable> {
-
-		// reuse objects to save overhead of object creation
-		private final static IntWritable one = new IntWritable(1);
-		private Text word = new Text();
-
-		public void map(LongWritable key, Text value,
-				OutputCollector<Text, IntWritable> output, Reporter reporter)
-				throws IOException {
-			String line = ((Text) value).toString();
-			StringTokenizer itr = new StringTokenizer(line);
-			while (itr.hasMoreTokens()) {
-				word.set(itr.nextToken());
-				output.collect(word, one);
-			}
-		}
-	}
-
-	// reducer: sums up all the counts
-	private static class ReduceClass extends MapReduceBase implements
-			Reducer<Text, IntWritable, Text, IntWritable> {
-
-		// reuse objects
-		private final static IntWritable SumValue = new IntWritable();
-
-		public void reduce(Text key, Iterator<IntWritable> values,
-				OutputCollector<Text, IntWritable> output, Reporter reporter)
-				throws IOException {
-			// sum up values
-			int sum = 0;
-			while (values.hasNext()) {
-				sum += values.next().get();
-			}
-			SumValue.set(sum);
-			output.collect(key, SumValue);
-		}
-	}
-
-	
-	/**
-	 * Runs the demo.
-	 */
-	public static void main(String[] args) throws IOException {
-		String filename = "/user/waue/test/132.txt";
-		String outputPath = "sample-counts";
-		int mapTasks = 20;
-		int reduceTasks = 1;
-
-		JobConf conf = new JobConf(WordCount.class);
-		conf.setJobName("wordcount");
-
-		conf.setNumMapTasks(mapTasks);
-		conf.setNumReduceTasks(reduceTasks);
-
-		conf.setInputPath(new Path(filename));
-		conf.setOutputKeyClass(Text.class);
-		conf.setOutputValueClass(IntWritable.class);
-		conf.setOutputPath(new Path(outputPath));
-
-		conf.setMapperClass(MapClass.class);
-		conf.setCombinerClass(ReduceClass.class);
-		conf.setReducerClass(ReduceClass.class);
-		
-		// Delete the output directory if it exists already
-		Path outputDir = new Path(outputPath);
-		FileSystem.get(conf).delete(outputDir);
-
-		JobClient.runJob(conf);
-	}
-}
Index: /sample/BuildHTable.java
===================================================================
--- /sample/BuildHTable.java	(revision 7)
+++ /sample/BuildHTable.java	(revision 7)
@@ -0,0 +1,70 @@
+/**
+ * Program: BuildHTable.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 06/10/2008
+ */
+
+package tw.org.nchc.demo;
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseAdmin;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.io.Text;
+
+public class BuildHTable {
+  private String table_name ;
+  private String[] Column_Family ;
+  HBaseConfiguration conf = new HBaseConfiguration();
+  HBaseAdmin admin = new HBaseAdmin(conf);
+  public BuildHTable(String table,String[] CF) throws IOException{
+	  table_name = table;
+	  Column_Family = CF;
+  }
+  
+
+  public boolean checkTableExist(String tname)throws IOException{
+	  if(! admin.tableExists(new Text(tname))){
+		  return false;
+	  }
+	  return true;
+  }
+  // create Hbase table , success = 1 ; failse = 0; Table_exist = 2;
+  public boolean createTable() throws IOException{
+	  // check whether Table name exite or not
+	    if(! checkTableExist(table_name)){
+	      System.out.println("HTable : " + table_name + "  creating ... please wait");
+	      HTableDescriptor tableDesc = new HTableDescriptor(table_name);
+	      for(int i =0; i<Column_Family.length; i++){
+	    	  String st = Column_Family[i];
+	    	  //check name format "string:"
+	    	  if(! st.endsWith(":")){
+	    		  Column_Family[i] = st+":";
+	    		  System.out.println("normize :" + st +"->" +Column_Family[i]);
+	    	  }
+	    	  //add column family
+	    	  tableDesc.addFamily(new HColumnDescriptor(Column_Family[i]));
+	      }
+	      admin.createTable(tableDesc);
+	    } else {
+	      	return false;
+	    }
+	    return true;
+  }
+  public static void main(String[] args) throws IOException {
+	  
+	  // setup Table name 
+	  String Table_Name = "test_create_table2";
+	  // setup Column Family
+	  String[] Column_Family = {"http:","url:","referrer:"};
+	  
+	  BuildHTable bt = new BuildHTable( Table_Name , Column_Family);
+	  boolean ret = bt.createTable();
+	  if(ret == true){
+		  System.out.println("Create Table \"" +Table_Name +" \" Compelte !!!");
+	  }else {
+		  System.out.println("Table Name \"" +Table_Name +" \"  exit!!!");
+	  }
+    }
+}
Index: /sample/HBaseClient.java
===================================================================
--- /sample/HBaseClient.java	(revision 7)
+++ /sample/HBaseClient.java	(revision 7)
@@ -0,0 +1,152 @@
+/**
+ * Program: HBaseClient.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 06/10/2008
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HScannerInterface;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.HTable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Demo that illustrates the HBase client API.
+ * The demo program will insert values to " Column Family: Column qualifier" and then print these.
+ *  pre-do : create a hbase table "test_table" with (CF:CI) which (" column family ", " column qualifier ")
+ *  1. $ bin/hbase shell
+ *  2. > create table test_table("CF");
+ *  ok ! we can test it  
+ *  3. > insert into test_table("CF:CI") values=("Hellow World") where row = "1";
+ *  4. > select * from test_table; 
+
+08/06/03 16:16:36 INFO hbase.HTable: Creating scanner over test_table starting at key 
++---------+-----------+-----------+
+| Row                   | Column                  | Cell                     |
++---------+-----------+-----------+
+| 1                        | CF:CI                     | Hellow World      |
++---------+-----------+-----------+
+1 row(s) in set. (0.24 sec)
+
+ *  on the  structure , "Row" means Row ID which is a key to describe a column;
+ *  Column means the database structure in test_table, 
+ *  Column Family , "CF",  should be defined while creating table.
+ *  Column qualifier , "CI" , can be added dynamically.
+ *  Cell is the value of CF:CI
+ * 
+ *  that's the structure; then the demo program will show you in console as below :
+ *  
+Illustration of adding data...
+Writing row = 0, col 'CF:CI' = Hellow0
+Writing row = 1, col 'CF:CI' = Hellow1
+Writing row = 2, col 'CF:CI' = Hellow2
+Writing row = 3, col 'CF:CI' = Hellow3
+Writing row = 4, col 'CF:CI' = Hellow4
+Writing row = 5, col 'CF:CI' = Hellow5
+Writing row = 6, col 'CF:CI' = Hellow6
+Writing row = 7, col 'CF:CI' = Hellow7
+Writing row = 8, col 'CF:CI' = Hellow8
+Writing row = 9, col 'CF:CI' = Hellow9
+
+Illustration of querying...
+row = 1, 'CF : CI ' = Hellow1
+
+Illustration of scanning...
+08/06/03 16:47:51 INFO hbase.HTable: Creating scanner over test_table starting at key 
+row = 0//9223372036854775807, col 'CF:CI' = Hellow0
+row = 1//9223372036854775807, col 'CF:CI' = Hellow1
+row = 2//9223372036854775807, col 'CF:CI' = Hellow2
+row = 3//9223372036854775807, col 'CF:CI' = Hellow3
+row = 4//9223372036854775807, col 'CF:CI' = Hellow4
+row = 5//9223372036854775807, col 'CF:CI' = Hellow5
+row = 6//9223372036854775807, col 'CF:CI' = Hellow6
+row = 7//9223372036854775807, col 'CF:CI' = Hellow7
+row = 8//9223372036854775807, col 'CF:CI' = Hellow8
+row = 9//9223372036854775807, col 'CF:CI' = Hellow9
+
+
+ *  
+ */
+public class HBaseClient {
+
+	public static void main(String[] args) throws IOException {
+
+		// Open the "test_table" table. If it hasn't been in Hbase, you should create.
+		HBaseConfiguration conf = new HBaseConfiguration();
+		HTable table = new HTable(conf, new Text("test_table"));
+	
+		System.out.println("Illustration of adding data...");
+
+		// create column formed  (Column Family:Column qualifier)
+		Text column = new Text("CF:CI");
+
+		// create row_id 
+		Text row_id = new Text();
+
+		// demo 1  : Insert ten demo values
+		for (int i = 0; i < 10; i++) {
+			
+			// give row_id  value
+			row_id.set(new Integer(i).toString());
+			
+			// let "indicate_id" indicate the column which row = row_id
+			long indicate_id= table.startUpdate(row_id);
+			
+			//val =  value of CF:CI where row_id = i
+			Text val = new Text("Hellow" + i);
+
+			// put "val" to "column" from "table" where "row_id"
+			// the same as :  
+			// hql> INSERT INTO table( column ) VALUES=( val) WHERE ROW = row_id ;
+			table.put(indicate_id, column, val.getBytes());
+			table.commit(indicate_id);
+
+			System.out.println("Writing row = " + row_id + ", col '" + column
+					+ "' = " + val);
+		}
+
+		// demo 2 : print column value only row = 1 ;
+		System.out.println("\n Querying row = 1");
+		
+		// Get a single value for the specified row and column
+		// byte[] = HTable.get(Text row, Text column)
+		
+		String s = Text.decode(table.get(new Text("1"),new Text("CF:CI")));
+		// if change as  
+		// String s = (table.get(new Text("1"),new Text("CF:CI"))).toString();
+		// will get chaos code "  [B@1f14ceb"
+		System.out.println("row = 1, 'CF : CI ' = " + s);
+
+		// demo 3 :  Print the all contents of this table
+		System.out.println("\nIllustration of scanning...");
+
+		// we only want one column, but you can specify multiple columns to
+		// fetch at once
+		Text[] cols = { column };
+
+		// Use HScannerInterface to crawl table
+		HScannerInterface scanner = table.obtainScanner(cols, new Text());
+
+		// column values are stored in a Map
+		SortedMap<Text, byte[]> values = new TreeMap<Text, byte[]>();
+		HStoreKey currentKey = new HStoreKey();
+		while (scanner.next(currentKey, values)) {
+			// decode the stored byte[] back into a String
+			String val = Text.decode(values.get(column));
+			System.out.println("row = " + currentKey + ", col '" + column + "' = "
+					+ val);
+		}
+
+		// remember to close scanner when done
+		scanner.close();
+
+	}
+
+}
Index: /sample/HBaseRecord.java
===================================================================
--- /sample/HBaseRecord.java	(revision 7)
+++ /sample/HBaseRecord.java	(revision 7)
@@ -0,0 +1,153 @@
+/*
+ *  NCHC Hbase with map reduce sample code 
+ *  DemoHBaseSlink.java
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * This sample code will put the indicate data to Hbase.
+ * 1. put test.txt in t1 directory which content is 
+---------------
+name:locate:years 
+waue:taiwan:1981
+shellon:taiwan:1981
+---------------
+ * 2. hadoop_root/$ bin/hadoop dfs -put t1 t1
+ * 3. hbase_root/$ bin/hbase shell
+ * 4. hql > create table t1_table("person");
+ * 5. Come to Eclipse and run this code, and we will let database as that 
+ t1_table -> person
+  ----------------
+  |  name | locate | years |
+  ----------------
+  | waue  | taiwan | 1981 |
+  ----------------
+  | shellon | taiwan | 1981 |
+  * 6. Go to hbase console, type : hql > select * from t1_table;
+     
+08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key 
++-------------------------+-------------------------+-------------------------+
+| Row                     | Column                  | Cell                    |
++-------------------------+-------------------------+-------------------------+
+| 0                       | person:locate           | locate                  |
++-------------------------+-------------------------+-------------------------+
+| 0                       | person:name             | name                    |
++-------------------------+-------------------------+-------------------------+
+| 0                       | person:years            | years                   |
++-------------------------+-------------------------+-------------------------+
+| 19                      | person:locate           | taiwan                  |
++-------------------------+-------------------------+-------------------------+
+| 19                      | person:name             | waue                    |
++-------------------------+-------------------------+-------------------------+
+| 19                      | person:years            | 1981                    |
++-------------------------+-------------------------+-------------------------+
+| 36                      | person:locate           | taiwan                  |
++-------------------------+-------------------------+-------------------------+
+| 36                      | person:name             | shellon                 |
++-------------------------+-------------------------+-------------------------+
+| 36                      | person:years            | 1981                    |
++-------------------------+-------------------------+-------------------------+
+3 row(s) in set. (0.04 sec)
+ **/
+public class HBaseRecord {
+
+	/* Denify parameter */
+	// one column family: person; three column qualifier: name,locate,years
+	static private String  baseId1 = "person:name";
+	static private String  baseId2 ="person:locate";
+	static private String  baseId3 ="person:years";
+	//split character
+	static private String sp = ":";
+	// file path in hadoop file system (not phisical file system)
+	String file_path = "/user/waue/t1";
+	// Hbase table name
+	String table_name = "t1_table";
+	// setup MapTask and Reduce Task
+	int mapTasks = 1;
+	int reduceTasks = 1;
+	
+	private static class ReduceClass extends TableReduce<LongWritable, Text> {
+
+		// Column id is created dymanically, 
+		private static final Text col_name = new Text(baseId1);
+		private static final Text col_local = new Text(baseId2);
+		private static final Text col_year = new Text(baseId3);
+		
+		// this map holds the columns per row
+		private MapWritable map = new MapWritable();	
+		
+		// on this sample, map is nonuse, we use reduce to handle
+		public void reduce(LongWritable key, Iterator<Text> values,
+				OutputCollector<Text, MapWritable> output, Reporter reporter)
+				throws IOException {
+
+			// values.next().getByte() can get value and transfer to byte form, there is an other way that let decode()
+			// to substitude getByte() 
+			String stro = new String(values.next().getBytes());
+			String str[] = stro.split(sp);
+			byte b_local[] = str[0].getBytes();
+			byte b_name[] = str[1].getBytes();
+			byte b_year[] = str[2].getBytes();
+			
+			// contents must be ImmutableBytesWritable
+			ImmutableBytesWritable w_local = new ImmutableBytesWritable( b_local);
+			ImmutableBytesWritable w_name = new ImmutableBytesWritable( b_name );
+			ImmutableBytesWritable w_year = new ImmutableBytesWritable( b_year );
+
+			// populate the current row
+			map.clear();
+			map.put(col_name, w_local);
+			map.put(col_local, w_name);
+			map.put(col_year, w_year);
+
+			// add the row with the key as the row id
+			output.collect(new Text(key.toString()), map);
+		}
+	}
+
+	private HBaseRecord() {
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		// which path of input files in Hadoop file system 	
+
+		
+		HBaseRecord setup = new HBaseRecord();
+		JobConf conf = new JobConf(HBaseRecord.class);
+
+		//Job name; you can modify to any you like  
+		conf.setJobName("NCHC_PersonDataBase");
+
+		// Hbase table name must be correct , in our profile is t1_table
+		TableReduce.initJob(setup.table_name, ReduceClass.class, conf);
+		
+		// below are map-reduce profile
+		conf.setNumMapTasks(setup.mapTasks);
+		conf.setNumReduceTasks(setup.reduceTasks);
+		conf.setInputPath(new Path(setup.file_path));
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setCombinerClass(IdentityReducer.class);
+		conf.setReducerClass(ReduceClass.class);
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/WordCount.java
===================================================================
--- /sample/WordCount.java	(revision 7)
+++ /sample/WordCount.java	(revision 7)
@@ -0,0 +1,97 @@
+/*
+ * map reduce sample code
+ */
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+
+public class WordCount {
+
+	// mapper: emits (token, 1) for every word occurrence
+	private static class MapClass extends MapReduceBase implements
+			Mapper<LongWritable, Text, Text, IntWritable> {
+
+		// reuse objects to save overhead of object creation
+		private final static IntWritable one = new IntWritable(1);
+		private Text word = new Text();
+
+		public void map(LongWritable key, Text value,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+			String line = ((Text) value).toString();
+			StringTokenizer itr = new StringTokenizer(line);
+			while (itr.hasMoreTokens()) {
+				word.set(itr.nextToken());
+				output.collect(word, one);
+			}
+		}
+	}
+
+	// reducer: sums up all the counts
+	private static class ReduceClass extends MapReduceBase implements
+			Reducer<Text, IntWritable, Text, IntWritable> {
+
+		// reuse objects
+		private final static IntWritable SumValue = new IntWritable();
+
+		public void reduce(Text key, Iterator<IntWritable> values,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+			// sum up values
+			int sum = 0;
+			while (values.hasNext()) {
+				sum += values.next().get();
+			}
+			SumValue.set(sum);
+			output.collect(key, SumValue);
+		}
+	}
+
+	
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		String filename = "/user/waue/input/";
+		String outputPath = "sample-counts";
+		int mapTasks = 20;
+		int reduceTasks = 1;
+
+		JobConf conf = new JobConf(WordCount.class);
+		conf.setJobName("wordcount");
+
+		conf.setNumMapTasks(mapTasks);
+		conf.setNumReduceTasks(reduceTasks);
+
+		conf.setInputPath(new Path(filename));
+		conf.setOutputKeyClass(Text.class);
+		conf.setOutputValueClass(IntWritable.class);
+		conf.setOutputPath(new Path(outputPath));
+
+		conf.setMapperClass(MapClass.class);
+		conf.setCombinerClass(ReduceClass.class);
+		conf.setReducerClass(ReduceClass.class);
+		
+		// Delete the output directory if it exists already
+		Path outputDir = new Path(outputPath);
+		FileSystem.get(conf).delete(outputDir);
+
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/WordCountFromHBase.java
===================================================================
--- /sample/WordCountFromHBase.java	(revision 7)
+++ /sample/WordCountFromHBase.java	(revision 7)
@@ -0,0 +1,158 @@
+/**
+ * Program: WordCountFromHBase.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 06/10/2008
+ */
+
+/**
+ * Purpose : 
+ * 	Store the result of WordCountIntoHbase.java from Hbase to Hadoop file system 
+ * 
+ * HowToUse : 
+ * 	Make sure Hadoop file system and HBase are running correctly.
+ * 	Then run the program with BuildHTable.java after \
+ * 	modifying these setup parameters.
+ * 
+ * Check Result:
+ * 	inspect http://localhost:60070 by web explorer
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+import java.io.FileOutputStream;
+import java.io.File;
+import java.io.RandomAccessFile;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.hbase.mapred.TableMap;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+@SuppressWarnings("unused")
+
+public class WordCountFromHBase {
+	/* setup parameters */
+	// set the output path 
+	static String outputPath = "counts2";
+
+	// org.apache.hadoop.hbase.mapred.TableMap<K,V>  \ 
+	// TableMap<K extends org.apache.hadoop.io.WritableComparable, \ 
+	// 	V extends org.apache.hadoop.io.Writable> \
+	// Scan an HBase table to sort by a specified sort column. \
+	// If the column does not exist, the record is not passed to Reduce.;
+	private static class MapClass extends TableMap<Text, IntWritable> {
+
+		// set one as (IntWritable)1
+		private final static IntWritable one = new IntWritable(1);
+		// set column 
+		private final static Text textcol = new Text(WordCountIntoHBase.colstr);
+		private Text word = new Text();		
+		// TableMap is a interface, map is a abstract method. now, we should \
+		// 	inprement map() at here, format is : \
+		// map(HStoreKey key, MapWritable value,  \
+		// 	OutputCollector<K,V> output, Reporter reporter) ;
+        // Call a user defined function on a single HBase record, \  
+		// 	represented by a key and its associated record value. ; 
+		public void map(HStoreKey key, MapWritable cols,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+			// 
+			// The first get() is : Writable <- get(Object key) \
+			// 	get in interface Map<Writable,Writable>  ;
+			// Use ImmutableBytesWritable to downcast Writable \
+			// The second get() is : byte[] <- get() \
+			// 	Get the data from the BytesWritable. ;
+			// Text.decode is parse UTF-8 code to a String ;
+			// per "line" is per row data in HTable 
+			String line = Text.decode( ((ImmutableBytesWritable) cols.get(textcol) )
+					.get() );
+			//let us know what is "line"
+			/*
+			RandomAccessFile raf = 
+				new RandomAccessFile("/home/waue/mr-result.txt","rw");
+			raf.seek(raf.length()); // move pointer to end
+			raf.write(("\n"+line).getBytes());
+			raf.close();
+			*///end
+			// the result is the contents of merged files "
+			
+			StringTokenizer itr = new StringTokenizer(line);
+			// set every word as one
+			while (itr.hasMoreTokens()) {
+				word.set(itr.nextToken());				
+				output.collect(word, one);
+			}
+		}
+	}
+
+	// reducer: sums up all the counts
+	private static class ReduceClass extends MapReduceBase implements
+			Reducer<Text, IntWritable, Text, IntWritable> {
+
+		// reuse objects
+		private final static IntWritable SumValue = new IntWritable();
+
+		public void reduce(Text key, Iterator<IntWritable> values,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+			// sum up values
+			int sum = 0;
+			while (values.hasNext()) {
+				sum += values.next().get();
+			}
+			SumValue.set(sum);
+			output.collect(key, SumValue);
+		}
+	}
+
+	private WordCountFromHBase() {
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		
+
+		int mapTasks = 1;
+		int reduceTasks = 1;
+		// initialize job;
+		JobConf conf = new JobConf(WordCountFromHBase.class);
+		// TableMap.initJob will build HBase code \
+		// 	"org.apache.hadoop.hbase.mapred.TableMap".initJob \
+		// 	(Table_name,column_string,Which_class_will_use,job_configure);
+		TableMap.initJob(WordCountIntoHBase.Table_Name,
+				WordCountIntoHBase.colstr, MapClass.class, conf);
+		conf.setJobName(WordCountIntoHBase.Table_Name + "store");
+		conf.setNumMapTasks(mapTasks);
+		conf.setNumReduceTasks(reduceTasks);
+		
+		//Set the key class for the job output data.
+		conf.setOutputKeyClass(Text.class);
+		//Set the value class for job outputs.
+		conf.setOutputValueClass(IntWritable.class);
+		// MapperClass,CombinerClass,ReducerClass are essential
+		conf.setMapperClass(MapClass.class);
+		conf.setCombinerClass(ReduceClass.class);
+		conf.setReducerClass(ReduceClass.class);
+		// input is Hbase format => TableInputFormat
+		conf.setInputFormat(TableInputFormat.class);
+		conf.setOutputPath(new Path(outputPath));
+//		 delete the old path with the same name 
+		FileSystem.get(conf).delete(new Path(outputPath));
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/WordCountIntoHBase.java
===================================================================
--- /sample/WordCountIntoHBase.java	(revision 7)
+++ /sample/WordCountIntoHBase.java	(revision 7)
@@ -0,0 +1,109 @@
+/**
+ * Program: WordCountIntoHBase.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 06/10/2008
+ */
+
+/**
+ * Purpose : 
+ * 	Store every line from $Input_Path to HBase
+ * 
+ * HowToUse : 
+ * 	Make sure Hadoop file system and HBase are running correctly.
+ * 	Use Hadoop instruction to add input-text-files to $Input_Path.
+ *  ($ bin/hadoop dfs -put local_dir hdfs_dir)
+ * 	Then run the program with BuildHTable.java after \
+ * 	modifying these setup parameters.
+ * 
+ * Check Result : 
+ * 	View the result by hbase instruction (hql> select * from $Table_Name). 
+ * 	Or run WordCountFromHBase.java then inspect http://localhost:60070 by web explorer;
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class WordCountIntoHBase {
+
+	/* setup parameters */
+	// $Input_Path. Please make sure the path is correct and contains input files
+	static final String Input_Path = "/user/waue/simple";
+	// Hbase table name, the program will create it
+	static final String Table_Name = "word_count5";
+	// column name, the program will create it
+	static final String colstr = "word:text" ;
+	
+	// constructor
+	private WordCountIntoHBase() {
+	}
+
+	private static class ReduceClass extends TableReduce<LongWritable, Text> {
+		// set (column_family:column_qualify)
+		private static final Text col = new Text(WordCountIntoHBase.colstr);
+		// this map holds the columns per row
+		private MapWritable map = new MapWritable();
+		public void reduce(LongWritable key, Iterator<Text> values,
+				OutputCollector<Text, MapWritable> output, Reporter reporter)
+				throws IOException {
+			// contents must be ImmutableBytesWritable
+			ImmutableBytesWritable bytes = 
+				new ImmutableBytesWritable(values.next().getBytes());			
+			map.clear();
+			// write data 
+			map.put(col, bytes);
+			// add the row with the key as the row id
+			output.collect(new Text(key.toString()), map);
+		}
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {	
+		// parse colstr to split column family and column qualify
+		String tmp[] = colstr.split(":");
+		String Column_Family = tmp[0]+":";
+		String CF[] = {Column_Family};
+		// check whether create table or not , we don't admit \ 
+		// the same name but different structure
+		BuildHTable build_table = new BuildHTable(Table_Name,CF);
+		if (!build_table.checkTableExist(Table_Name)) {
+			if (!build_table.createTable()) {
+				System.out.println("create table error !");
+			}
+		}else{
+			System.out.println("Table \"" + Table_Name +"\" has already existed !");
+		}
+		int mapTasks = 1;
+		int reduceTasks = 1;
+		JobConf conf = new JobConf(WordCountIntoHBase.class);
+		conf.setJobName(Table_Name);
+
+		// must initialize the TableReduce before running job
+		TableReduce.initJob(Table_Name, ReduceClass.class, conf);
+		conf.setNumMapTasks(mapTasks);
+		conf.setNumReduceTasks(reduceTasks);
+		conf.setInputPath(new Path(Input_Path));
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setCombinerClass(IdentityReducer.class);
+		conf.setReducerClass(ReduceClass.class);
+
+		JobClient.runJob(conf);
+	}
+}
