Index: /sample/hadoop-0.17/tw/org/nchc/code/BuildHTable.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/code/BuildHTable.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/code/BuildHTable.java	(revision 20)
@@ -0,0 +1,70 @@
+/**
+ * Program: BuildHTable.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 06/10/2008
+ */
+
+package tw.org.nchc.code;
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseAdmin;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.io.Text;
+
+public class BuildHTable {
+  private String table_name ;
+  private String[] Column_Family ;
+  HBaseConfiguration conf = new HBaseConfiguration();
+  HBaseAdmin admin = new HBaseAdmin(conf);
+  public BuildHTable(String table,String[] CF) throws IOException{
+	  table_name = table;
+	  Column_Family = CF;
+  }
+  
+
+  public boolean checkTableExist(String tname)throws IOException{
+	  if(! admin.tableExists(new Text(tname))){
+		  return false;
+	  }
+	  return true;
+  }
+  // create Hbase table , success = 1 ; failse = 0; Table_exist = 2;
+  public boolean createTable() throws IOException{
+	  // check whether Table name exite or not
+	    if(! checkTableExist(table_name)){
+	      System.out.println("HTable : " + table_name + "  creating ... please wait");
+	      HTableDescriptor tableDesc = new HTableDescriptor(table_name);
+	      for(int i =0; i<Column_Family.length; i++){
+	    	  String st = Column_Family[i];
+	    	  //check name format "string:"
+	    	  if(! st.endsWith(":")){
+	    		  Column_Family[i] = st+":";
+	    		  System.out.println("normize :" + st +"->" +Column_Family[i]);
+	    	  }
+	    	  //add column family
+	    	  tableDesc.addFamily(new HColumnDescriptor(Column_Family[i]));
+	      }
+	      admin.createTable(tableDesc);
+	    } else {
+	      	return false;
+	    }
+	    return true;
+  }
+  public static void main(String[] args) throws IOException {
+	  
+	  // setup Table name 
+	  String Table_Name = "test_create_table2";
+	  // setup Column Family
+	  String[] Column_Family = {"http:","url:","referrer:"};
+	  
+	  BuildHTable bt = new BuildHTable( Table_Name , Column_Family);
+	  boolean ret = bt.createTable();
+	  if(ret == true){
+		  System.out.println("Create Table \"" +Table_Name +" \" Compelte !!!");
+	  }else {
+		  System.out.println("Table Name \"" +Table_Name +" \"  exit!!!");
+	  }
+    }
+}
Index: /sample/hadoop-0.17/tw/org/nchc/code/Convert.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/code/Convert.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/code/Convert.java	(revision 20)
@@ -0,0 +1,33 @@
+package tw.org.nchc.code;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+
+public class Convert
+{	
+	static public Path[] listPaths(FileSystem fsm,Path path) throws IOException
+	{
+		FileStatus[] fss = fsm.listStatus(path);
+		int length = fss.length;
+		Path[] pi = new Path[length];
+		for (int i=0 ; i< length; i++)
+		{
+			pi[i] = fss[i].getPath();
+		}
+		return pi;
+	}
+	static public void setInputPath(JobConf conf, Path path){
+		FileInputFormat.setInputPaths(conf, path);
+	}
+	static public void addInputPath(JobConf conf, Path path){
+		FileInputFormat.addInputPath(conf, path);
+	}
+	static public void setOutputPath(JobConf conf,Path path){
+		FileOutputFormat.setOutputPath(conf, path);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/code/HBaseClient.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/code/HBaseClient.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/code/HBaseClient.java	(revision 20)
@@ -0,0 +1,152 @@
+/**
+ * Program: HBaseClient.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 06/10/2008
+ */
+
+package tw.org.nchc.code;
+
+import java.io.IOException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HScannerInterface;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.HTable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Demo that illustrates the HBase client API.
+ * The demo program will insert values to " Column Family: Column qualifier" and then print these.
+ *  pre-do : create a hbase table "test_table" with (CF:CI) which (" column family ", " column qualifier ")
+ *  1. $ bin/hbase shell
+ *  2. > create table test_table("CF");
+ *  ok ! we can test it  
+ *  3. > insert into test_table("CF:CI") values=("Hellow World") where row = "1";
+ *  4. > select * from test_table; 
+
+08/06/03 16:16:36 INFO hbase.HTable: Creating scanner over test_table starting at key 
++---------+-----------+-----------+
+| Row                   | Column                  | Cell                     |
++---------+-----------+-----------+
+| 1                        | CF:CI                     | Hellow World      |
++---------+-----------+-----------+
+1 row(s) in set. (0.24 sec)
+
+ *  on the  structure , "Row" means Row ID which is a key to describe a column;
+ *  Column means the database structure in test_table, 
+ *  Column Family , "CF",  should be defined while creating table.
+ *  Column qualifier , "CI" , can be added dynamically.
+ *  Cell is the value of CF:CI
+ * 
+ *  that's the structure; then the demo program will show you in console as below :
+ *  
+Illustration of adding data...
+Writing row = 0, col 'CF:CI' = Hellow0
+Writing row = 1, col 'CF:CI' = Hellow1
+Writing row = 2, col 'CF:CI' = Hellow2
+Writing row = 3, col 'CF:CI' = Hellow3
+Writing row = 4, col 'CF:CI' = Hellow4
+Writing row = 5, col 'CF:CI' = Hellow5
+Writing row = 6, col 'CF:CI' = Hellow6
+Writing row = 7, col 'CF:CI' = Hellow7
+Writing row = 8, col 'CF:CI' = Hellow8
+Writing row = 9, col 'CF:CI' = Hellow9
+
+Illustration of querying...
+row = 1, 'CF : CI ' = Hellow1
+
+Illustration of scanning...
+08/06/03 16:47:51 INFO hbase.HTable: Creating scanner over test_table starting at key 
+row = 0//9223372036854775807, col 'CF:CI' = Hellow0
+row = 1//9223372036854775807, col 'CF:CI' = Hellow1
+row = 2//9223372036854775807, col 'CF:CI' = Hellow2
+row = 3//9223372036854775807, col 'CF:CI' = Hellow3
+row = 4//9223372036854775807, col 'CF:CI' = Hellow4
+row = 5//9223372036854775807, col 'CF:CI' = Hellow5
+row = 6//9223372036854775807, col 'CF:CI' = Hellow6
+row = 7//9223372036854775807, col 'CF:CI' = Hellow7
+row = 8//9223372036854775807, col 'CF:CI' = Hellow8
+row = 9//9223372036854775807, col 'CF:CI' = Hellow9
+
+
+ *  
+ */
+public class HBaseClient {
+
+	public static void main(String[] args) throws IOException {
+
+		// Open the "test_table" table. If it hasn't been in Hbase, you should create.
+		HBaseConfiguration conf = new HBaseConfiguration();
+		HTable table = new HTable(conf, new Text("test_table"));
+	
+		System.out.println("Illustration of adding data...");
+
+		// create column formed  (Column Family:Column qualifier)
+		Text column = new Text("CF:CI");
+
+		// create row_id 
+		Text row_id = new Text();
+
+		// demo 1  : Insert ten demo values
+		for (int i = 0; i < 10; i++) {
+			
+			// give row_id  value
+			row_id.set(new Integer(i).toString());
+			
+			// let "indicate_id" indicate the column which row = row_id
+			long indicate_id= table.startUpdate(row_id);
+			
+			//val =  value of CF:CI where row_id = i
+			Text val = new Text("Hellow" + i);
+
+			// put "val" to "column" from "table" where "row_id"
+			// the same as :  
+			// hql> INSERT INTO table( column ) VALUES=( val) WHERE ROW = row_id ;
+			table.put(indicate_id, column, val.getBytes());
+			table.commit(indicate_id);
+
+			System.out.println("Writing row = " + row_id + ", col '" + column
+					+ "' = " + val);
+		}
+
+		// demo 2 : print column value only row = 1 ;
+		System.out.println("\n Querying row = 1");
+		
+		// Get a single value for the specified row and column
+		// byte[] = HTable.get(Text row, Text column)
+		
+		String s = Text.decode(table.get(new Text("1"),new Text("CF:CI")));
+		// if change as  
+		// String s = (table.get(new Text("1"),new Text("CF:CI"))).toString();
+		// will get chaos code "  [B@1f14ceb"
+		System.out.println("row = 1, 'CF : CI ' = " + s);
+
+		// demo 3 :  Print the all contents of this table
+		System.out.println("\nIllustration of scanning...");
+
+		// we only want one column, but you can specify multiple columns to
+		// fetch at once
+		Text[] cols = { column };
+
+		// Use HScannerInterface to crawl table
+		HScannerInterface scanner = table.obtainScanner(cols, new Text());
+
+		// column values are stored in a Map
+		SortedMap<Text, byte[]> values = new TreeMap<Text, byte[]>();
+		HStoreKey currentKey = new HStoreKey();
+		while (scanner.next(currentKey, values)) {
+			// decode the stored byte[] back into a String
+			String val = Text.decode(values.get(column));
+			System.out.println("row = " + currentKey + ", col '" + column + "' = "
+					+ val);
+		}
+
+		// remember to close scanner when done
+		scanner.close();
+
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/code/HBaseRecord.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/code/HBaseRecord.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/code/HBaseRecord.java	(revision 20)
@@ -0,0 +1,174 @@
+/**
+ * Program: HBaseRecord.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Upgrade to 0.17
+ */
+
+/**
+ * Purpose : 
+ * 	Parse your record and then store in HBase.
+ * 
+ * HowToUse : 
+ * 	Make sure Hadoop file system and Hbase are running correctly.
+ * 	1. put test.txt in t1 directory which content is 
+ ---------------
+ name:locate:years 
+ waue:taiwan:1981
+ shellon:taiwan:1981
+ ---------------
+ * 	2. hadoop_root/$ bin/hadoop dfs -put t1 t1
+ * 	3. hbase_root/$ bin/hbase shell
+ * 	4. hql > create table t1_table("person");
+ * 	5. Come to Eclipse and run this code, and we will let database as that 
+ t1_table -> person
+ ----------------
+ |  name | locate | years |
+ | waue  | taiwan | 1981 |
+ | shellon | taiwan | 1981 |
+ ----------------
+ * Check Result:
+ * 	Go to hbase console, type : 
+ * 		hql > select * from t1_table; 
+ 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key 
+ +-------------------------+-------------------------+-------------------------+
+ | Row                     | Column                  | Cell                    |
+ +-------------------------+-------------------------+-------------------------+
+ | 0                       | person:locate           | locate                  |
+ +-------------------------+-------------------------+-------------------------+
+ | 0                       | person:name             | name                    |
+ +-------------------------+-------------------------+-------------------------+
+ | 0                       | person:years            | years                   |
+ +-------------------------+-------------------------+-------------------------+
+ | 19                      | person:locate           | taiwan                  |
+ +-------------------------+-------------------------+-------------------------+
+ | 19                      | person:name             | waue                    |
+ +-------------------------+-------------------------+-------------------------+
+ | 19                      | person:years            | 1981                    |
+ +-------------------------+-------------------------+-------------------------+
+ | 36                      | person:locate           | taiwan                  |
+ +-------------------------+-------------------------+-------------------------+
+ | 36                      | person:name             | shellon                 |
+ +-------------------------+-------------------------+-------------------------+
+ | 36                      | person:years            | 1981                    |
+ +-------------------------+-------------------------+-------------------------+
+ 3 row(s) in set. (0.04 sec)
+ */
+
+package tw.org.nchc.code;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class HBaseRecord {
+
+	/* Denify parameter */
+	// one column family: person; three column qualifier: name,locate,years
+	static private String baseId1 = "person:name";
+
+	static private String baseId2 = "person:locate";
+
+	static private String baseId3 = "person:years";
+
+	// split character
+	static private String sp = ":";
+
+	// file path in hadoop file system (not phisical file system)
+	String file_path = "/user/waue/t1";
+
+	// Hbase table name
+	String table_name = "t1_table";
+
+	// setup MapTask and Reduce Task
+	int mapTasks = 1;
+
+	int reduceTasks = 1;
+
+	private static class ReduceClass extends TableReduce<LongWritable, Text> {
+
+		// Column id is created dymanically,
+		private static final Text col_name = new Text(baseId1);
+
+		private static final Text col_local = new Text(baseId2);
+
+		private static final Text col_year = new Text(baseId3);
+
+		// this map holds the columns per row
+		private MapWritable map = new MapWritable();
+
+		// on this sample, map is nonuse, we use reduce to handle
+		public void reduce(LongWritable key, Iterator<Text> values,
+				OutputCollector<Text, MapWritable> output, Reporter reporter)
+				throws IOException {
+
+			// values.next().getByte() can get value and transfer to byte form,
+			// there is an other way that let decode()
+			// to substitude getByte()
+			String stro = new String(values.next().getBytes());
+			String str[] = stro.split(sp);
+			byte b_local[] = str[0].getBytes();
+			byte b_name[] = str[1].getBytes();
+			byte b_year[] = str[2].getBytes();
+
+			// contents must be ImmutableBytesWritable
+			ImmutableBytesWritable w_local = new ImmutableBytesWritable(b_local);
+			ImmutableBytesWritable w_name = new ImmutableBytesWritable(b_name);
+			ImmutableBytesWritable w_year = new ImmutableBytesWritable(b_year);
+
+			// populate the current row
+			map.clear();
+			map.put(col_name, w_local);
+			map.put(col_local, w_name);
+			map.put(col_year, w_year);
+
+			// add the row with the key as the row id
+			output.collect(new Text(key.toString()), map);
+		}
+	}
+
+	private HBaseRecord() {
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		// which path of input files in Hadoop file system
+
+		HBaseRecord setup = new HBaseRecord();
+		JobConf conf = new JobConf(HBaseRecord.class);
+
+		// Job name; you can modify to any you like
+		conf.setJobName("NCHC_PersonDataBase");
+
+		// Hbase table name must be correct , in our profile is t1_table
+		TableReduce.initJob(setup.table_name, ReduceClass.class, conf);
+
+		// below are map-reduce profile
+		conf.setNumMapTasks(setup.mapTasks);
+		conf.setNumReduceTasks(setup.reduceTasks);
+
+		// 0.16
+		// conf.setInputPath(new Path(setup.file_path));
+		Convert.setInputPath(conf, new Path(setup.file_path));
+
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setCombinerClass(IdentityReducer.class);
+		conf.setReducerClass(ReduceClass.class);
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/code/HBaseRecord2.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/code/HBaseRecord2.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/code/HBaseRecord2.java	(revision 20)
@@ -0,0 +1,162 @@
+/**
+ * Program: HBaseRecord.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/01/2008
+ * Upgrade to 0.17
+ */
+
+/**
+ * Purpose : 
+ * 	Parse your record and then store in HBase.
+ * 
+ * HowToUse : 
+ * 	Make sure Hadoop file system and Hbase are running correctly.
+ * 	1. put test.txt in t1 directory which content is 
+	---------------
+	name:locate:years 
+	waue:taiwan:1981
+	shellon:taiwan:1981
+	---------------
+ * 	2. hadoop_root/$ bin/hadoop dfs -put t1 t1
+	  ----------------
+ * Check Result:
+ * 	Go to hbase console, type : 
+ * 		hql > select * from t1_table; 
+08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key 
++-------------------------+-------------------------+-------------------------+
+| Row                     | Column                  | Cell                    |
++-------------------------+-------------------------+-------------------------+
+| 0                       | person:locate           | locate                  |
++-------------------------+-------------------------+-------------------------+
+| 0                       | person:name             | name                    |
++-------------------------+-------------------------+-------------------------+
+| 0                       | person:years            | years                   |
++-------------------------+-------------------------+-------------------------+
+| 19                      | person:locate           | taiwan                  |
++-------------------------+-------------------------+-------------------------+
+| 19                      | person:name             | waue                    |
++-------------------------+-------------------------+-------------------------+
+| 19                      | person:years            | 1981                    |
++-------------------------+-------------------------+-------------------------+
+| 36                      | person:locate           | taiwan                  |
++-------------------------+-------------------------+-------------------------+
+| 36                      | person:name             | shellon                 |
++-------------------------+-------------------------+-------------------------+
+| 36                      | person:years            | 1981                    |
++-------------------------+-------------------------+-------------------------+
+3 row(s) in set. (0.04 sec)
+ */
+
+
+
+
+package tw.org.nchc.code;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+
+public class HBaseRecord2 {
+
+	/* Denify parameter */
+	static String[] bf = {"person:name","person:local","person:birthyear"};
+	// file path in hadoop file system (not phisical file system)
+	String file_path = "/user/waue/t1/test.txt";
+	// Hbase table name
+	String table_name = "testtable";
+	
+	
+	// setup MapTask and Reduce Task
+	int mapTasks = 1;
+	int reduceTasks = 1;
+	
+	private static class ReduceClass extends TableReduce<LongWritable, Text> {
+
+
+		
+		// on this sample, map is nonuse, we use reduce to handle
+		public void reduce(LongWritable key, Iterator<Text> values,
+				OutputCollector<Text, MapWritable> output, Reporter reporter)
+				throws IOException {
+			// this map holds the columns per row
+			MapWritable map = new MapWritable();	
+			// values.next().getByte() can get value and transfer to byte form, 
+			String stro = new String(values.next().getBytes());
+			String str[] = stro.split(":");
+			
+			int length = bf.length;
+			
+			// Column id is created dymanically, 
+			Text[] col_n = new Text[length];
+			byte[][] b_l = new byte[length][];
+			// contents must be ImmutableBytesWritable
+			ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
+			map.clear();
+			for(int i = 0; i < length; i++){
+				col_n[i] = new Text(bf[i]);
+				b_l[i] = str[i].getBytes();
+				w_l[i] = new ImmutableBytesWritable(b_l[i]);
+				// populate the current row
+				map.put(col_n[i], w_l[i]);
+			}
+			// add the row with the key as the row id
+			output.collect(new Text(key.toString()), map);
+		}
+	}
+
+	private HBaseRecord2() {
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+
+		
+		HBaseRecord2 setup = new HBaseRecord2();
+		String[] tmp = bf[0].split(":");
+		String[] CF = {tmp[0]};
+		BuildHTable build_table = new BuildHTable(setup.table_name, CF);
+		if (!build_table.checkTableExist(setup.table_name)) {
+			if (!build_table.createTable()) {
+				System.out.println("create table error !");
+			}
+		} else {
+			System.out.println("Table \"" + setup.table_name
+					+ "\" has already existed !");
+		}
+		
+		JobConf conf = new JobConf(HBaseRecord2.class);
+
+		//Job name; you can modify to any you like  
+		conf.setJobName("PersonDataBase");
+
+		// Hbase table name must be correct , in our profile is t1_table
+		TableReduce.initJob(setup.table_name, ReduceClass.class, conf);
+		
+		// below are map-reduce profile
+		conf.setNumMapTasks(setup.mapTasks);
+		conf.setNumReduceTasks(setup.reduceTasks);
+		// 0.16
+//		conf.setInputPath(new Path(setup.file_path));
+		Convert.setInputPath(conf, new Path(setup.file_path));
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setCombinerClass(IdentityReducer.class);
+		conf.setReducerClass(ReduceClass.class);
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/code/HBaseRecordPro.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/code/HBaseRecordPro.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/code/HBaseRecordPro.java	(revision 20)
@@ -0,0 +1,247 @@
+/**
+ * Program: HBaseRecordPro.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Upgrade to 0.17
+ */
+
+/**
+ * Purpose : 
+ * 	First, program would parse your record and create Hbase.\
+ *  Then it sets the first line as column qualify \
+ * 	Finally it stores in HBase automatically.
+ * 
+ * HowToUse : 
+ * 	Make sure two thing :
+ * 	1. source_file must be regular as follow:
+ * 		first line: qualify1:qualify2:...:qualifyN
+ * 		other line: records1:records2:...:recordsN
+ * 	   (the number N of qualify must be the same as records ) 
+-----------------
+name:locate:years
+waue:taiwan:1981
+rock:taiwan:1981
+aso:taiwan:1981
+jazz:taiwan:1982
+-----------------
+ *  2. source_file path must be correct.
+
+ * Check Result:
+ * 	Go to hbase console, type : 
+ * 		hql > select * from t1_table; 
+ 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key 
+
++-------------------------+-------------------------+-------------------------+
+| Row                     | Column                  | Cell                    |
++-------------------------+-------------------------+-------------------------+
+| 0                       | member:locate           | taiwan                  |
++-------------------------+-------------------------+-------------------------+
+| 0                       | member:name             | waue                    |
++-------------------------+-------------------------+-------------------------+
+| 0                       | member:years            | 1981                    |
++-------------------------+-------------------------+-------------------------+
+| 17                      | member:locate           | taiwan                  |
++-------------------------+-------------------------+-------------------------+
+| 17                      | member:name             | rock                    |
++-------------------------+-------------------------+-------------------------+
+| 17                      | member:years            | 1981                    |
++-------------------------+-------------------------+-------------------------+
+| 34                      | member:locate           | taiwan                  |
++-------------------------+-------------------------+-------------------------+
+| 34                      | member:name             | aso                     |
++-------------------------+-------------------------+-------------------------+
+| 34                      | member:years            | 1981                    |
++-------------------------+-------------------------+-------------------------+
+| 50                      | member:locate           | taiwan                  |
++-------------------------+-------------------------+-------------------------+
+| 50                      | member:name             | jazz                    |
++-------------------------+-------------------------+-------------------------+
+| 50                      | member:years            | 1982                    |
++-------------------------+-------------------------+-------------------------+
+4 row(s) in set. (0.31 sec)
+
+ */
+
+package tw.org.nchc.code;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class HBaseRecordPro {
+	/* Major parameter */
+	// it indicates local path, not hadoop file system path
+	final static String source_file = "/home/waue/test.txt";
+
+	/* Minor parameter */
+	// column family name
+	final static String column_family = "member:";
+
+	// table name
+	final static String table_name = "HBaseRecord";
+
+	// separate char
+	final static String sp = ":";
+	
+	// conf tmp with column qualify
+	final static String conf_tmp = "/tmp/HBaseRecordPro.firstLine.tmp";
+	
+	// data source tmp
+	final static String text_tmp = "/tmp/HBaseRecord.text.tmp";
+
+	// on this sample, map is nonuse, we use reduce to handle
+	private static class ReduceClass extends TableReduce<LongWritable, Text> {
+		public void reduce(LongWritable key, Iterator<Text> values,
+				OutputCollector<Text, MapWritable> output, Reporter reporter)
+				throws IOException {
+
+			// read the configure file
+			BufferedReader fconf = new BufferedReader(new FileReader(new File(
+					conf_tmp)));
+			String first_line = fconf.readLine();
+			fconf.close();
+			// extract cf data
+			String[] cf = first_line.split(sp);
+			int length = cf.length;
+			 
+			// values.next().getByte() can get value and transfer to byte form,
+			String stro = new String(values.next().getBytes());
+			String str[] = stro.split(sp);
+
+			// Column id is created dymanically,
+			Text[] col_n = new Text[length];
+			byte[][] b_l = new byte[length][];
+			// contents must be ImmutableBytesWritable
+			ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
+
+			// This map connect to hbase table and holds the columns per row
+			MapWritable map = new MapWritable();
+			map.clear();
+
+			// prepare to write data into map
+			for (int i = 0; i < length; i++) {
+				col_n[i] = new Text(column_family + cf[i]);
+				b_l[i] = str[i].getBytes();
+				w_l[i] = new ImmutableBytesWritable(b_l[i]);
+				// populate the current row
+				map.put(col_n[i], w_l[i]);
+			}
+			// add the row with the key as the row id
+			output.collect(new Text(key.toString()), map);
+		}
+	}
+
+	public HBaseRecordPro() {
+	}
+	
+	// This function can split the source text into two file, \
+	// 	conf_tmp file recorded first line is used to set column qualify \
+	//	text_tmp , ou, recorded data is used to store into table.
+	public String parseFirstLine(String in, String ou)
+			throws IOException {
+
+		BufferedReader fi = new BufferedReader(new FileReader(new File(in)));
+		BufferedWriter ff = new BufferedWriter(new FileWriter(new File(conf_tmp)));
+		BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou)));
+		String first_line, data;
+		first_line = fi.readLine();
+		ff.write(first_line);
+		ff.flush();
+		do {
+			data = fi.readLine();
+			if (data == null) {
+				break;
+			} else {
+				fw.write(data + "\n");
+				fw.flush();
+			}
+		} while (true);
+		fw.close();
+		ff.close();
+		return first_line;
+	}
+	// tmp file delete
+	boolean deleteFile(String str)throws IOException{
+		File df = new File(str);
+		
+		if(df.exists()){
+			if(!df.delete()){
+				System.err.print("delete file error !");
+			}
+		}else{
+			System.out.println("file not exit!");
+		}
+		return true;
+	}
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+
+		HBaseRecordPro setup = new HBaseRecordPro();
+		String[] col_family = {column_family};
+		Path text_path = new Path(text_tmp);
+		
+		setup.parseFirstLine(source_file, text_tmp);
+//		System.out.println(first_line);
+
+		BuildHTable build_table = new BuildHTable(table_name,
+				col_family);
+		if (!build_table.checkTableExist(table_name)) {
+			if (!build_table.createTable()) {
+				System.out.println("create table error !");
+			}
+		} else {
+			System.out.println("Table \"" + table_name
+					+ "\" has already existed !");
+		}
+		JobConf conf = new JobConf(HBaseRecordPro.class);
+		FileSystem fileconf = FileSystem.get(conf);
+		fileconf.copyFromLocalFile(true, text_path, text_path);
+		// Job name; you can modify to any you like
+		conf.setJobName("PersonDataBase");
+		final int mapTasks = 1;
+		final int reduceTasks = 1;
+		// Hbase table name must be correct , in our profile is t1_table
+		TableReduce.initJob(table_name, ReduceClass.class, conf);
+
+		// below are map-reduce profile
+		conf.setNumMapTasks(mapTasks);
+		conf.setNumReduceTasks(reduceTasks);
+		// 0.16
+//		conf.setInputPath(text_path);
+		Convert.setInputPath(conf, text_path);
+		
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setCombinerClass(IdentityReducer.class);
+		conf.setReducerClass(ReduceClass.class);
+
+		JobClient.runJob(conf);
+		
+		// delete tmp file
+		// 0.16
+//		FileSystem.get(conf).delete(text_path);
+		FileSystem.get(conf).delete(text_path,true);
+		
+		setup.deleteFile(conf_tmp);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/code/WordCount.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/code/WordCount.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/code/WordCount.java	(revision 20)
@@ -0,0 +1,132 @@
+/**
+ * Program: WordCount.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Upgrade to 0.17
+ */
+
+/**
+ * Purpose : 
+ * 	Store the result of WordCount.java from Hbase to Hadoop file system 
+ * 
+ * HowToUse : 
+ * 	Make sure Hadoop file system is running correctly.
+ * 	Put text file on the directory "/local_src/input" 
+ * 	You can use the instruction to upload "/local_src/input" to HDFS input dir
+ * 		$ bin/hadoop dfs -put /local_src/input input
+ * 	Then modify the $filepath parameter in construtor to be correct and run this code.
+ * 	
+ * 
+ * Check Result:
+ * 	inspect http://localhost:50070 by web explorer
+ */
+package tw.org.nchc.code;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+public class WordCount {
+	private String filepath;
+
+	private String outputPath;
+
+	public WordCount() {
+		filepath = "/user/waue/input/";
+		outputPath = "counts1";
+	}
+
+	public WordCount(String path, String output) {
+		filepath = path;
+		outputPath = output;
+	}
+
+	// mapper: emits (token, 1) for every word occurrence
+	private static class MapClass extends MapReduceBase implements
+			Mapper<LongWritable, Text, Text, IntWritable> {
+
+		// reuse objects to save overhead of object creation
+		private final static IntWritable one = new IntWritable(1);
+
+		private Text word = new Text();
+
+		public void map(LongWritable key, Text value,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+			String line = ((Text) value).toString();
+			StringTokenizer itr = new StringTokenizer(line);
+			while (itr.hasMoreTokens()) {
+				word.set(itr.nextToken());
+				output.collect(word, one);
+			}
+		}
+	}
+
+	// reducer: sums up all the counts
+	private static class ReduceClass extends MapReduceBase implements
+			Reducer<Text, IntWritable, Text, IntWritable> {
+
+		// reuse objects
+		private final static IntWritable SumValue = new IntWritable();
+
+		public void reduce(Text key, Iterator<IntWritable> values,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+			// sum up values
+			int sum = 0;
+			while (values.hasNext()) {
+				sum += values.next().get();
+			}
+			SumValue.set(sum);
+			output.collect(key, SumValue);
+		}
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		WordCount wc = new WordCount();
+
+		int mapTasks = 1;
+		int reduceTasks = 1;
+		JobConf conf = new JobConf(WordCount.class);
+		conf.setJobName("wordcount");
+
+		conf.setNumMapTasks(mapTasks);
+		conf.setNumReduceTasks(reduceTasks);
+		// 0.16
+		// conf.setInputPath(new Path(wc.filepath));
+		Convert.setInputPath(conf, new Path(wc.filepath));
+		conf.setOutputKeyClass(Text.class);
+		conf.setOutputValueClass(IntWritable.class);
+		// 0.16
+		// conf.setOutputPath(new Path(wc.outputPath));
+		Convert.setOutputPath(conf, new Path(wc.outputPath));
+
+		conf.setMapperClass(MapClass.class);
+		conf.setCombinerClass(ReduceClass.class);
+		conf.setReducerClass(ReduceClass.class);
+
+		// Delete the output directory if it exists already
+		Path outputDir = new Path(wc.outputPath);
+		// 0.16
+		FileSystem.get(conf).delete(outputDir,true);
+
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/code/WordCountFromHBase.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/code/WordCountFromHBase.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/code/WordCountFromHBase.java	(revision 20)
@@ -0,0 +1,177 @@
+/**
+ * Program: WordCountFromHBase.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Upgrade to 0.17
+ */
+
+/**
+ * Purpose : 
+ * 	Word counting from Hbase then store result in Hadoop file system 
+ * 
+ * HowToUse : 
+ * 	Make sure Hadoop file system are running and HBase has correct data.
+ * 	Suggest to run WordCountIntoHBase first.
+ * 	finally, modify these setup parameters and run.
+ * 
+ * Check Result:
+ *  
+ * 	inspect http://localhost:50070 by web explorer
+ */
+
+package tw.org.nchc.code;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.hbase.mapred.TableMap;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+@SuppressWarnings("unused")
+
+public class WordCountFromHBase {
+	/* setup parameters */
+	// set the output path 
+	static String outputPath = "counts2";
+
+	// org.apache.hadoop.hbase.mapred.TableMap<K,V>  \ 
+	// TableMap<K extends org.apache.hadoop.io.WritableComparable, \ 
+	// 	V extends org.apache.hadoop.io.Writable> \
+	// Scan an HBase table to sort by a specified sort column. \
+	// If the column does not exist, the record is not passed to Reduce.;
+	private static class MapClass extends TableMap<Text, IntWritable> {
+
+		// set one as (IntWritable)1
+		private final static IntWritable one = new IntWritable(1);
+		// set column 
+		private final static Text textcol = new Text(WordCountIntoHBase.colstr);
+		private Text word = new Text();		
+		// TableMap is a interface, map is a abstract method. now, we should \
+		// 	inprement map() at here, format is : \
+		// map(HStoreKey key, MapWritable value,  \
+		// 	OutputCollector<K,V> output, Reporter reporter) ;
+        // Call a user defined function on a single HBase record, \  
+		// 	represented by a key and its associated record value. ; 
+		public void map(HStoreKey key, MapWritable cols,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+			// 
+			// The first get() is : Writable <- get(Object key) \
+			// 	get in interface Map<Writable,Writable>  ;
+			// Use ImmutableBytesWritable to downcast Writable \
+			// The second get() is : byte[] <- get() \
+			// 	Get the data from the BytesWritable. ;
+			// Text.decode is parse UTF-8 code to a String ;
+			// per "line" is per row data in HTable 
+			String line = Text.decode( ((ImmutableBytesWritable) cols.get(textcol) )
+					.get() );
+			
+			//let us know what is "line"
+			/*
+			RandomAccessFile raf = 
+				new RandomAccessFile("/home/waue/mr-result.txt","rw");
+			raf.seek(raf.length()); // move pointer to end
+			raf.write(("\n"+line).getBytes());
+			raf.close();
+			*///end
+			// the result is the contents of merged files "
+			
+			//StringTokenizer will divide a line into a word  
+			StringTokenizer itr = new StringTokenizer(line);
+			// set every word as one
+			while (itr.hasMoreTokens()) {
+				// nextToken will return this value in String and point to next \
+				// Text.set() = Set to contain the contents of a string.
+				word.set(itr.nextToken());	
+				// OutputCollector.collect = collect(K key, V value) \
+				//  Adds a key/value pair to the output.
+				output.collect(word, one);
+			}
+		}
+	}
+
+	// reducer: sums up all the counts
+	private static class ReduceClass extends MapReduceBase implements
+			Reducer<Text, IntWritable, Text, IntWritable> {
+
+		// reuse objects
+		private final static IntWritable SumValue = new IntWritable();
+		
+		// this sample's reduce() format is the same as map() \
+		// 	reduce is a method waiting for implement \
+		// 	four type in this sample is (Text , Iterator<IntWritable>, \
+		// 		OutputCollector<Text, IntWritable> , Reporter ) ;
+		public void reduce(Text key, Iterator<IntWritable> values,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+			// sum up value
+			int sum = 0;
+			// "key" is word , "value" is sum 
+			// why values.hasNext(), not key.hasNext()
+			while (values.hasNext()) { 
+				// next() will return this value and pointer to next event \
+				// 	IntWritable.get() will transfer IntWritable to Int
+				sum += values.next().get(); 
+			}
+			// IntWritable.set(int) will transfer Int to IntWritable
+			SumValue.set(sum);
+			// hense we set outputPath in main, the output.collect will put 
+			// 	data in Hadoop
+			output.collect(key, SumValue);
+		}
+	}
+
+	private WordCountFromHBase() {
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		
+
+		int mapTasks = 1;
+		int reduceTasks = 1;
+		// initialize job;
+		JobConf conf = new JobConf(WordCountFromHBase.class);
+		// TableMap.initJob will build HBase code \
+		// 	"org.apache.hadoop.hbase.mapred.TableMap".initJob \
+		// 	(Table_name,column_string,Which_class_will_use,job_configure);
+		TableMap.initJob(WordCountIntoHBase.Table_Name,
+				WordCountIntoHBase.colstr, MapClass.class, conf);
+		conf.setJobName(WordCountIntoHBase.Table_Name + "store");
+		conf.setNumMapTasks(mapTasks);
+		conf.setNumReduceTasks(reduceTasks);
+		
+		//Set the key class for the job output data.
+		conf.setOutputKeyClass(Text.class);
+		//Set the value class for job outputs.
+		conf.setOutputValueClass(IntWritable.class);
+		// MapperClass,CombinerClass,ReducerClass are essential
+		conf.setMapperClass(MapClass.class);
+		conf.setCombinerClass(ReduceClass.class);
+		conf.setReducerClass(ReduceClass.class);
+		// input is Hbase format => TableInputFormat
+		conf.setInputFormat(TableInputFormat.class);
+		// 0.16
+//		conf.setOutputPath(new Path(outputPath));
+		Convert.setOutputPath(conf, new Path(outputPath));
+//		 delete the old path with the same name 
+		FileSystem.get(conf).delete(new Path(outputPath),true);
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/code/WordCountIntoHBase.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/code/WordCountIntoHBase.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/code/WordCountIntoHBase.java	(revision 20)
@@ -0,0 +1,118 @@
+/**
+ * Program: WordCountIntoHBase.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Upgrade to 0.17
+ */
+
+/**
+ * Purpose : 
+ * 	Store every line from $Input_Path to HBase
+ * 
+ * HowToUse : 
+ * 	Make sure Hadoop file system and HBase are running correctly.
+ * 	Use Hadoop instruction to add input-text-files to $Input_Path.
+ *  ($ bin/hadoop dfs -put local_dir hdfs_dir)
+ * 	Then run the program with BuildHTable.java after \
+ * 	modifying these setup parameters.
+ * 
+ * Check Result : 
+ * 	View the result by hbase instruction (hql> select * from $Table_Name). 
+ * 	Or run WordCountFromHBase.java then inspect http://localhost:60070 by web explorer;
+ */
+
+package tw.org.nchc.code;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class WordCountIntoHBase {
+
+	/* setup parameters */
+	// $Input_Path. Please make sure the path is correct and contains input
+	// files
+	static final String Input_Path = "/user/waue/simple";
+
+	// Hbase table name, the program will create it
+	static final String Table_Name = "word_count5";
+
+	// column name, the program will create it
+	static final String colstr = "word:text";
+
+	// constructor
+	private WordCountIntoHBase() {
+	}
+
+	private static class ReduceClass extends TableReduce<LongWritable, Text> {
+		// set (column_family:column_qualify)
+		private static final Text col = new Text(WordCountIntoHBase.colstr);
+
+		// this map holds the columns per row
+		private MapWritable map = new MapWritable();
+
+		public void reduce(LongWritable key, Iterator<Text> values,
+				OutputCollector<Text, MapWritable> output, Reporter reporter)
+				throws IOException {
+			// contents must be ImmutableBytesWritable
+			ImmutableBytesWritable bytes = new ImmutableBytesWritable(values
+					.next().getBytes());
+			map.clear();
+			// write data
+			map.put(col, bytes);
+			// add the row with the key as the row id
+			output.collect(new Text(key.toString()), map);
+		}
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		// parse colstr to split column family and column qualify
+		String tmp[] = colstr.split(":");
+		String Column_Family = tmp[0] + ":";
+		String CF[] = { Column_Family };
+		// check whether create table or not , we don't admit \
+		// the same name but different structure
+		BuildHTable build_table = new BuildHTable(Table_Name, CF);
+		if (!build_table.checkTableExist(Table_Name)) {
+			if (!build_table.createTable()) {
+				System.out.println("create table error !");
+			}
+		} else {
+			System.out.println("Table \"" + Table_Name
+					+ "\" has already existed !");
+		}
+		int mapTasks = 1;
+		int reduceTasks = 1;
+		JobConf conf = new JobConf(WordCountIntoHBase.class);
+		conf.setJobName(Table_Name);
+
+		// must initialize the TableReduce before running job
+		TableReduce.initJob(Table_Name, ReduceClass.class, conf);
+		conf.setNumMapTasks(mapTasks);
+		conf.setNumReduceTasks(reduceTasks);
+		// 0.16
+		// conf.setInputPath(new Path(Input_Path));
+		Convert.setInputPath(conf, new Path(Input_Path));
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setCombinerClass(IdentityReducer.class);
+		conf.setReducerClass(ReduceClass.class);
+
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/AccessLogParser.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/AccessLogParser.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/AccessLogParser.java	(revision 20)
@@ -0,0 +1,110 @@
+package tw.org.nchc.demo;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Locale;
+import java.util.StringTokenizer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+
+public class AccessLogParser {
+  private String ip;
+  private String protocol;
+  private String method;
+  private String url;
+  private String code;
+  private String byteSize;
+  private String referrer;
+  private String agent;
+  private long timestamp;
+
+  private static Pattern p = Pattern
+  .compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" +
+                  " ([^ ]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\".*");
+  
+  
+  
+  public AccessLogParser(String line) throws ParseException, Exception{
+	 
+	 Matcher matcher = p.matcher(line);
+	 if(matcher.matches()){
+		 this.ip = matcher.group(1);
+		 // IP address of the client requesting the web page.
+		 if(isIpAddress(ip)){
+			 SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z",Locale.US);
+			 this.timestamp = sdf.parse(matcher.group(4)).getTime();
+			 String[] http = matcher.group(5).split(" ");
+			 this.method = http[0];
+			 this.url = http[1];
+			 this.protocol = http[2];
+			 this.code = matcher.group(6);
+			 this.byteSize = matcher.group(7);
+			 this.referrer = matcher.group(8);
+			 this.agent = matcher.group(9);
+		 }
+	 }
+
+
+  }
+
+  public static boolean isIpAddress(String inputString) {
+    StringTokenizer tokenizer = new StringTokenizer(inputString, ".");
+    if (tokenizer.countTokens() != 4) {
+      return false;
+    }
+    try {
+      for (int i = 0; i < 4; i++) {
+        String t = tokenizer.nextToken();
+        int chunk = Integer.parseInt(t);
+        if ((chunk & 255) != chunk) {
+          return false;
+        }
+      }
+    } catch (NumberFormatException e) {
+      return false;
+    }
+    if (inputString.indexOf("..") >= 0) {
+      return false;
+    }
+    return true;
+  }
+
+  public String getIp() {
+    return ip;
+  }
+
+  public String getProtocol() {
+    return protocol;
+  }
+
+  public String getMethod() {
+    return method;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  public String getCode() {
+    return code;
+  }
+
+  public String getByteSize() {
+    return byteSize;
+  }
+
+  public String getReferrer() {
+    return referrer;
+  }
+
+  public String getAgent() {
+    return agent;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+  
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/DemoHBaseClient.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/DemoHBaseClient.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/DemoHBaseClient.java	(revision 20)
@@ -0,0 +1,149 @@
+/*
+ *  nchc hadoop hbase test
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HScannerInterface;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.HTable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Demo that illustrates the HBase client API.
+ * The demo program will insert values to " Column Family: Column qualifier" and then print these.
+ *  pre-do : create a hbase table "test_table" with (CF:CI) which (" column family ", " column qualifier ")
+ *  1. $ bin/hbase shell
+ *  2. > create table test_table("CF");
+ *  ok ! we can test it  
+ *  3. > insert into test_table("CF:CI") values=("Hellow World") where row = "1";
+ *  4. > select * from test_table; 
+
+08/06/03 16:16:36 INFO hbase.HTable: Creating scanner over test_table starting at key 
++---------+-----------+-----------+
+| Row                   | Column                  | Cell                     |
++---------+-----------+-----------+
+| 1                        | CF:CI                     | Hellow World      |
++---------+-----------+-----------+
+1 row(s) in set. (0.24 sec)
+
+ *  on the  structure , "Row" means Row ID which is a key to describe a column;
+ *  Column means the database structure in test_table, 
+ *  Column Family , "CF",  should be defined while creating table.
+ *  Column qualifier , "CI" , can be added dynamically.
+ *  Cell is the value of CF:CI
+ * 
+ *  that's the structure; then the demo program will show you in console as below :
+ *  
+Illustration of adding data...
+Writing row = 0, col 'CF:CI' = Hellow0
+Writing row = 1, col 'CF:CI' = Hellow1
+Writing row = 2, col 'CF:CI' = Hellow2
+Writing row = 3, col 'CF:CI' = Hellow3
+Writing row = 4, col 'CF:CI' = Hellow4
+Writing row = 5, col 'CF:CI' = Hellow5
+Writing row = 6, col 'CF:CI' = Hellow6
+Writing row = 7, col 'CF:CI' = Hellow7
+Writing row = 8, col 'CF:CI' = Hellow8
+Writing row = 9, col 'CF:CI' = Hellow9
+
+Illustration of querying...
+row = 1, 'CF : CI ' = Hellow1
+
+Illustration of scanning...
+08/06/03 16:47:51 INFO hbase.HTable: Creating scanner over test_table starting at key 
+row = 0//9223372036854775807, col 'CF:CI' = Hellow0
+row = 1//9223372036854775807, col 'CF:CI' = Hellow1
+row = 2//9223372036854775807, col 'CF:CI' = Hellow2
+row = 3//9223372036854775807, col 'CF:CI' = Hellow3
+row = 4//9223372036854775807, col 'CF:CI' = Hellow4
+row = 5//9223372036854775807, col 'CF:CI' = Hellow5
+row = 6//9223372036854775807, col 'CF:CI' = Hellow6
+row = 7//9223372036854775807, col 'CF:CI' = Hellow7
+row = 8//9223372036854775807, col 'CF:CI' = Hellow8
+row = 9//9223372036854775807, col 'CF:CI' = Hellow9
+
+
+ *  
+ */
+public class DemoHBaseClient {
+
+	public static void main(String[] args) throws IOException {
+
+		// Open the "test_table" table. If it hasn't been in Hbase, you should create.
+		HBaseConfiguration conf = new HBaseConfiguration();
+		HTable table = new HTable(conf, new Text("test_table"));
+	
+		System.out.println("Illustration of adding data...");
+
+		// create column formed  (Column Family:Column qualifier)
+		Text column = new Text("CF:CI");
+
+		// create row_id 
+		Text row_id = new Text();
+
+		// demo 1  : Insert ten demo values
+		for (int i = 0; i < 10; i++) {
+			
+			// give row_id  value
+			row_id.set(new Integer(i).toString());
+			
+			// let "indicate_id" indicate the column which row = row_id
+			long indicate_id= table.startUpdate(row_id);
+			
+			//val =  value of CF:CI where row_id = i
+			Text val = new Text("Hellow" + i);
+
+			// put "val" to "column" from "table" where "row_id"
+			// the same as :  
+			// hql> INSERT INTO table( column ) VALUES=( val) WHERE ROW = row_id ;
+			table.put(indicate_id, column, val.getBytes());
+			table.commit(indicate_id);
+
+			System.out.println("Writing row = " + row_id + ", col '" + column
+					+ "' = " + val);
+		}
+
+		// demo 2 : print column value only row = 1 ;
+		System.out.println("\n Querying row = 1");
+		
+		// Get a single value for the specified row and column
+		// byte[] = HTable.get(Text row, Text column)
+		
+		String s = Text.decode(table.get(new Text("1"),new Text("CF:CI")));
+		// if change as  
+		// String s = (table.get(new Text("1"),new Text("CF:CI"))).toString();
+		// will get chaos code "  [B@1f14ceb"
+		System.out.println("row = 1, 'CF : CI ' = " + s);
+
+		// demo 3 :  Print the all contents of this table
+		System.out.println("\nIllustration of scanning...");
+
+		// we only want one column, but you can specify multiple columns to
+		// fetch at once
+		Text[] cols = { column };
+
+		// Use HScannerInterface to crawl table
+		HScannerInterface scanner = table.obtainScanner(cols, new Text());
+
+		// column values are stored in a Map
+		SortedMap<Text, byte[]> values = new TreeMap<Text, byte[]>();
+		HStoreKey currentKey = new HStoreKey();
+		while (scanner.next(currentKey, values)) {
+			// decode the stored byte[] back into a String
+			String val = Text.decode(values.get(column));
+			System.out.println("row = " + currentKey + ", col '" + column + "' = "
+					+ val);
+		}
+
+		// remember to close scanner when done
+		scanner.close();
+
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/DemoHBaseSink.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/DemoHBaseSink.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/DemoHBaseSink.java	(revision 20)
@@ -0,0 +1,93 @@
+/**
+ * Program: DemoHBaseSink.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Upgrade to 0.17
+ * Re-code from : Cloud9: A MapReduce Library for Hadoop
+ */
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+import tw.org.nchc.code.Convert;
+
+/**
+ * 
+ */
+public class DemoHBaseSink {
+
+	private static class ReduceClass extends TableReduce<LongWritable, Text> {
+
+		// this is the column we're going to be writing
+		private static final Text col = new Text("default:text");
+
+		// this map holds the columns per row
+		private MapWritable map = new MapWritable();
+
+		public void reduce(LongWritable key, Iterator<Text> values,
+				OutputCollector<Text, MapWritable> output, Reporter reporter)
+				throws IOException {
+
+			// contents must be ImmutableBytesWritable
+			ImmutableBytesWritable bytes = new ImmutableBytesWritable(values
+					.next().getBytes());
+
+			// populate the current row
+			map.clear();
+			map.put(col, bytes);
+
+			// add the row with the key as the row id
+			output.collect(new Text(key.toString()), map);
+		}
+	}
+
+	private DemoHBaseSink() {
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		String filename = "/shared/sample";
+
+		int mapTasks = 1;
+		int reduceTasks = 1;
+
+		JobConf conf = new JobConf(DemoHBaseSink.class);
+		conf.setJobName("wordcount");
+
+		// must initialize the TableReduce before running job
+		TableReduce.initJob("test", ReduceClass.class, conf);
+
+		conf.setNumMapTasks(mapTasks);
+		conf.setNumReduceTasks(reduceTasks);
+		// 0.16
+		// conf.setInputPath(new Path(filename));
+		Convert.setInputPath(conf, new Path(filename));
+
+		conf.setMapperClass(IdentityMapper.class);
+		conf.setCombinerClass(IdentityReducer.class);
+		conf.setReducerClass(ReduceClass.class);
+
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/DemoHBaseSource.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/DemoHBaseSource.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/DemoHBaseSource.java	(revision 20)
@@ -0,0 +1,116 @@
+/**
+ * Program: DemoHBaseSource.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Upgrade to 0.17
+ * Re-code from : Cloud9: A MapReduce Library for Hadoop
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.hbase.mapred.TableMap;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import tw.org.nchc.code.Convert;
+
+/**
+ * 
+ */
+public class DemoHBaseSource {
+
+	// mapper: emits (token, 1) for every word occurrence
+	private static class MapClass extends TableMap<Text, IntWritable> {
+
+		// reuse objects to save overhead of object creation
+		private final static IntWritable one = new IntWritable(1);
+		private final static Text textcol = new Text("default:text");
+		private Text word = new Text();
+
+		public void map(HStoreKey key, MapWritable cols,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+
+			String line = Text.decode(((ImmutableBytesWritable) cols
+					.get(textcol)).get());
+
+			StringTokenizer itr = new StringTokenizer(line);
+			while (itr.hasMoreTokens()) {
+				word.set(itr.nextToken());
+				output.collect(word, one);
+			}
+		}
+	}
+
+	// reducer: sums up all the counts
+	private static class ReduceClass extends MapReduceBase implements
+			Reducer<Text, IntWritable, Text, IntWritable> {
+
+		// reuse objects
+		private final static IntWritable SumValue = new IntWritable();
+
+		public void reduce(Text key, Iterator<IntWritable> values,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+			// sum up values
+			int sum = 0;
+			while (values.hasNext()) {
+				sum += values.next().get();
+			}
+			SumValue.set(sum);
+			output.collect(key, SumValue);
+		}
+	}
+
+	private DemoHBaseSource() {
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		String outputPath = "sample-counts2";
+
+		int mapTasks = 1;
+		int reduceTasks = 1;
+
+		JobConf conf = new JobConf(DemoHBaseSource.class);
+
+		TableMap.initJob("test", "default:text", MapClass.class, conf);
+
+		conf.setJobName("wordcount");
+
+		conf.setNumMapTasks(mapTasks);
+		conf.setNumReduceTasks(reduceTasks);
+
+		conf.setInputFormat(TableInputFormat.class);
+
+		conf.setOutputKeyClass(Text.class);
+		conf.setOutputValueClass(IntWritable.class);
+		//0.16
+//		conf.setOutputPath(new Path(outputPath));
+		Convert.setOutputPath(conf,new Path(outputPath));
+		
+		conf.setMapperClass(MapClass.class);
+		conf.setCombinerClass(ReduceClass.class);
+		conf.setReducerClass(ReduceClass.class);
+
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/DemoPackRecords.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/DemoPackRecords.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/DemoPackRecords.java	(revision 20)
@@ -0,0 +1,83 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import tw.org.nchc.tuple.Schema;
+import tw.org.nchc.tuple.Tuple;
+import tw.org.nchc.util.LocalTupleRecordWriter;
+
+/**
+ * <p>
+ * Demo that packs the sample collection into records using the tuple library,
+ * illustrating the use of the {@link tw.org.nchc.tuple.Tuple} class. The
+ * records are stored in a local SequenceFile; this file can then be transfered
+ * over to HDFS to serve as the starting point for a MapReduce operation.
+ * </p>
+ * 
+ * <p>
+ * Each record is a tuple; the first field of the tuple is a String with the
+ * field name "text", which consists of the raw text of the record.
+ * </p>
+ * 
+ * @see DemoPackRecords2
+ * @see DemoReadPackedRecords
+ */
+public class DemoPackRecords {
+	private DemoPackRecords() {
+	}
+
+	// define the tuple schema for the input record
+	private static final Schema RECORD_SCHEMA = new Schema();
+	static {
+		RECORD_SCHEMA.addField("text", String.class, "");
+	}
+
+	// instantiate a single tuple
+	private static Tuple tuple = RECORD_SCHEMA.instantiate();
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		String infile = "../umd-hadoop-dist/sample-input/bible+shakes.nopunc";
+		String outfile = "../umd-hadoop-dist/sample-input/bible+shakes.nopunc.packed";
+
+		// create LocalTupleRecordWriter to write tuples to a local SequenceFile
+		LocalTupleRecordWriter writer = new LocalTupleRecordWriter(outfile);
+
+		// read in raw text records, line separated
+		BufferedReader data = new BufferedReader(new InputStreamReader(
+				new FileInputStream(infile)));
+
+		String line;
+		while ((line = data.readLine()) != null) {
+			// write the record
+			tuple.set(0, line);
+			writer.add(tuple);
+		}
+
+		data.close();
+		writer.close();
+
+		System.out.println("Wrote " + writer.getRecordCount() + " records.");
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/DemoPackRecords2.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/DemoPackRecords2.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/DemoPackRecords2.java	(revision 20)
@@ -0,0 +1,106 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.io.Text;
+
+import tw.org.nchc.tuple.ListWritable;
+import tw.org.nchc.tuple.Schema;
+import tw.org.nchc.tuple.Tuple;
+import tw.org.nchc.util.LocalTupleRecordWriter;
+
+/**
+ * <p>
+ * Demo that packs the sample collection into records using the tuple library,
+ * illustrating the use of the {@link tw.org.nchc.tuple.Tuple} and
+ * {@link tw.org.nchc.tuple.ListWritable} classes. The records are stored in
+ * a local SequenceFile; this file can then be transfered over to HDFS to serve
+ * as the starting point for a MapReduce operation.
+ * </p>
+ * 
+ * <p>
+ * Each record is a tuple with two fields:
+ * </p>
+ * 
+ * <ul>
+ * 
+ * <li>the first field of the tuple is an Integer with the field name "length";
+ * its value is the length of the record in number of characters.</li>
+ * 
+ * <li>the second field of the tuple is a ListWritable<Text> with the field
+ * name "tokens"; its value is a list of tokens that comprise the text of the
+ * record.</li>
+ * 
+ * </ul>
+ * 
+ * @see DemoPackRecords
+ * @see DemoReadPackedRecords2
+ */
+public class DemoPackRecords2 {
+	private DemoPackRecords2() {
+	}
+
+	// define the tuple schema for the input record
+	private static final Schema RECORD_SCHEMA = new Schema();
+	static {
+		RECORD_SCHEMA.addField("length", Integer.class);
+		RECORD_SCHEMA.addField("tokens", ListWritable.class, "");
+	}
+
+	// instantiate a single tuple
+	private static Tuple tuple = RECORD_SCHEMA.instantiate();
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		String infile = "../umd-hadoop-dist/sample-input/bible+shakes.nopunc";
+		String outfile = "../umd-hadoop-dist/sample-input/bible+shakes.nopunc.packed2";
+
+		// create LocalTupleRecordWriter to write tuples to a local SequenceFile
+		LocalTupleRecordWriter writer = new LocalTupleRecordWriter(outfile);
+
+		// read in raw text records, line separated
+		BufferedReader data = new BufferedReader(new InputStreamReader(
+				new FileInputStream(infile)));
+
+		String line;
+		while ((line = data.readLine()) != null) {
+			ListWritable<Text> tokens = new ListWritable<Text>();
+			StringTokenizer itr = new StringTokenizer(line);
+			while (itr.hasMoreTokens()) {
+				tokens.add(new Text(itr.nextToken()));
+			}
+
+			// write the record
+			tuple.set("length", line.length());
+			tuple.set("tokens", tokens);
+			writer.add(tuple);
+		}
+
+		data.close();
+		writer.close();
+
+		System.out.println("Wrote " + writer.getRecordCount() + " records.");
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/DemoReadPackedRecords.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/DemoReadPackedRecords.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/DemoReadPackedRecords.java	(revision 20)
@@ -0,0 +1,52 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+
+import tw.org.nchc.tuple.Tuple;
+import tw.org.nchc.util.LocalTupleRecordReader;
+
+/**
+ * Demo that illustrates how to read records from a local SequenceFile. Dumps
+ * the contents of the SequenceFile generated by {@link DemoPackRecords}.
+ */
+public class DemoReadPackedRecords {
+	private DemoReadPackedRecords() {
+	}
+
+	private static final Tuple tuple = new Tuple();
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		String file = "../umd-hadoop-dist/sample-input/bible+shakes.nopunc.packed";
+
+		// open local records file
+		LocalTupleRecordReader reader = new LocalTupleRecordReader(file);
+		// iterate over all tuples
+		while (reader.read(tuple)) {
+			// print out each tuple
+			System.out.println(tuple);
+		}
+		reader.close();
+
+		System.out.println("Read " + reader.getRecordCount() + " records.");
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/DemoReadPackedRecords2.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/DemoReadPackedRecords2.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/DemoReadPackedRecords2.java	(revision 20)
@@ -0,0 +1,52 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+
+import tw.org.nchc.tuple.Tuple;
+import tw.org.nchc.util.LocalTupleRecordReader;
+
+/**
+ * Demo that illustrates how to read records from a local SequenceFile. Dumps
+ * the contents of the SequenceFile generated by {@link DemoPackRecords2}.
+ */
+public class DemoReadPackedRecords2 {
+	private DemoReadPackedRecords2() {
+	}
+
+	private static final Tuple tuple = new Tuple();
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		String file = "../umd-hadoop-dist/sample-input/bible+shakes.nopunc.packed2";
+
+		// open local records file
+		LocalTupleRecordReader reader = new LocalTupleRecordReader(file);
+		// iterate over all tuples
+		while (reader.read(tuple)) {
+			// print out each tuple
+			System.out.println(tuple);
+		}
+		reader.close();
+
+		System.out.println("Read " + reader.getRecordCount() + " records.");
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCondProb.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCondProb.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCondProb.java	(revision 20)
@@ -0,0 +1,210 @@
+/**
+ * Program: DemoWordCondProb.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Re-code from : Cloud9: A MapReduce Library for Hadoop
+ */
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.rmi.UnexpectedException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Partitioner;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+import tw.org.nchc.code.Convert;
+import tw.org.nchc.tuple.Schema;
+import tw.org.nchc.tuple.Tuple;
+
+/**
+ * <p>
+ * Demo that illustrates the use of a Partitioner and special symbols in Tuple
+ * to compute conditional probabilities. Demo builds on
+ * {@link DemoWordCountTuple}, and has similar structure. Input comes from
+ * Bible+Shakespeare sample collection, encoded as single-field tuples; see
+ * {@link DemoPackRecords}. Sample of final output:
+ * 
+ * <pre>
+ * ...
+ * (admirable, *)   15.0
+ * (admirable, 0)   0.6
+ * (admirable, 1)   0.4
+ * (admiral, *)     6.0
+ * (admiral, 0)     0.33333334
+ * (admiral, 1)     0.6666667
+ * (admiration, *)  16.0
+ * (admiration, 0)  0.625
+ * (admiration, 1)  0.375
+ * (admire, *)      8.0
+ * (admire, 0)      0.625
+ * (admire, 1)      0.375
+ * (admired, *)     19.0
+ * (admired, 0)     0.6315789
+ * (admired, 1)     0.36842105
+ * ...
+ * </pre>
+ * 
+ * <p>
+ * The first field of the key tuple contains a token. If the second field
+ * contains the special symbol '*', then the value indicates the count of the
+ * token in the collection. Otherwise, the value indicates p(EvenOrOdd|Token),
+ * the probability that a line is odd-length or even-length, given the
+ * occurrence of a token.
+ * </p>
+ */
+public class DemoWordCondProb {
+
+	// create the schema for the tuple that will serve as the key
+	private static final Schema KEY_SCHEMA = new Schema();
+
+	// define the schema statically
+	static {
+		KEY_SCHEMA.addField("Token", String.class, "");
+		KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
+	}
+
+	// mapper that emits tuple as the key, and value '1' for each occurrence
+	private static class MapClass extends MapReduceBase implements
+			Mapper<LongWritable, Tuple, Tuple, FloatWritable> {
+		private final static FloatWritable one = new FloatWritable(1);
+		private Tuple tupleOut = KEY_SCHEMA.instantiate();
+
+		public void map(LongWritable key, Tuple tupleIn,
+				OutputCollector<Tuple, FloatWritable> output, Reporter reporter)
+				throws IOException {
+
+			// the input value is a tuple; get field 0
+			// see DemoPackRecords of how input SequenceFile is generated
+			String line = (String) ((Tuple) tupleIn).get(0);
+			StringTokenizer itr = new StringTokenizer(line);
+			while (itr.hasMoreTokens()) {
+				String token = itr.nextToken();
+
+				// emit key-value pair for either even-length or odd-length line
+				tupleOut.set("Token", token);
+				tupleOut.set("EvenOrOdd", line.length() % 2);
+				output.collect(tupleOut, one);
+
+				// emit key-value pair for the total count
+				tupleOut.set("Token", token);
+				// use special symbol in field 2
+				tupleOut.setSymbol("EvenOrOdd", "*");
+				output.collect(tupleOut, one);
+			}
+		}
+	}
+
+	// reducer computes conditional probabilities
+	private static class ReduceClass extends MapReduceBase implements
+			Reducer<Tuple, FloatWritable, Tuple, FloatWritable> {
+		// HashMap keeps track of total counts
+		private final static HashMap<String, Integer> TotalCounts = new HashMap<String, Integer>();
+
+		public synchronized void reduce(Tuple tupleKey,
+				Iterator<FloatWritable> values,
+				OutputCollector<Tuple, FloatWritable> output, Reporter reporter)
+				throws IOException {
+			// sum values
+			int sum = 0;
+			while (values.hasNext()) {
+				sum += values.next().get();
+			}
+
+			String tok = (String) tupleKey.get("Token");
+
+			// check if the second field is a special symbol
+			if (tupleKey.containsSymbol("EvenOrOdd")) {
+				// emit total count
+				output.collect(tupleKey, new FloatWritable(sum));
+				// record total count
+				TotalCounts.put(tok, sum);
+			} else {
+				if (!TotalCounts.containsKey(tok))
+					throw new UnexpectedException("Don't have total counts!");
+
+				// divide sum by total count to obtain conditional probability
+				float p = (float) sum / TotalCounts.get(tok);
+
+				// emit P(EvenOrOdd|Token)
+				output.collect(tupleKey, new FloatWritable(p));
+			}
+		}
+	}
+
+	// partition by first field of the tuple, so that tuples corresponding
+	// to the same token will be sent to the same reducer
+	private static class MyPartitioner implements
+			Partitioner<Tuple, FloatWritable> {
+		public void configure(JobConf job) {
+		}
+
+		public int getPartition(Tuple key, FloatWritable value,
+				int numReduceTasks) {
+			return (key.get("Token").hashCode() & Integer.MAX_VALUE)
+					% numReduceTasks;
+		}
+	}
+
+	// dummy constructor
+	private DemoWordCondProb() {
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		String inPath = "/shared/sample-input/bible+shakes.nopunc.packed";
+		String output1Path = "condprob";
+		int numMapTasks = 20;
+		int numReduceTasks = 10;
+
+		// first MapReduce cycle is to do the tuple counting
+		JobConf conf1 = new JobConf(DemoWordCondProb.class);
+		conf1.setJobName("DemoWordCondProb.MR1");
+
+		conf1.setNumMapTasks(numMapTasks);
+		conf1.setNumReduceTasks(numReduceTasks);
+		//0.16
+//		conf1.setInputPath(new Path(inPath));
+		Convert.setInputPath(conf1, new Path(inPath));
+		
+		conf1.setInputFormat(SequenceFileInputFormat.class);
+		
+		// 0.16
+//		conf1.setOutputPath(new Path(output1Path));
+		Convert.setOutputPath(conf1,new Path(output1Path));
+		conf1.setOutputKeyClass(Tuple.class);
+		conf1.setOutputValueClass(FloatWritable.class);
+		conf1.setOutputFormat(TextOutputFormat.class);
+
+		conf1.setMapperClass(MapClass.class);
+		// this is a potential gotcha! can't use ReduceClass for combine because
+		// we have not collected all the counts yet, so we can't divide through
+		// to compute the conditional probabilities
+		conf1.setCombinerClass(IdentityReducer.class);
+		conf1.setReducerClass(ReduceClass.class);
+		conf1.setPartitionerClass(MyPartitioner.class);
+
+		JobClient.runJob(conf1);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCount.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCount.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCount.java	(revision 20)
@@ -0,0 +1,128 @@
+/**
+ * Program: HBaseRecordPro.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Upgrade to 0.17
+ */
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import tw.org.nchc.code.Convert;
+
+/**
+ * <p>Simple word count demo. Counts words in the Bible+Shakespeare sample
+ * collection. Expected trace of MapReduce operation:</p>
+ * 
+ * <pre>
+ * Map input records=156215
+ * Map output records=1734298
+ * Map input bytes=9068074
+ * Map output bytes=15919397
+ * Combine input records=1734298
+ * Combine output records=135372
+ * Reduce input groups=41788
+ * Reduce input records=135372
+ * Reduce output records=41788
+ * </pre>
+ *
+ */
+public class DemoWordCount {
+
+	// mapper: emits (token, 1) for every word occurrence
+	private static class MapClass extends MapReduceBase implements
+			Mapper<LongWritable, Text, Text, IntWritable> {
+
+		// reuse objects to save overhead of object creation
+		private final static IntWritable one = new IntWritable(1);
+		private Text word = new Text();
+
+		public void map(LongWritable key, Text value,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+			String line = ((Text) value).toString();
+			StringTokenizer itr = new StringTokenizer(line);
+			while (itr.hasMoreTokens()) {
+				word.set(itr.nextToken());
+				output.collect(word, one);
+			}
+		}
+	}
+
+	// reducer: sums up all the counts
+	private static class ReduceClass extends MapReduceBase implements
+			Reducer<Text, IntWritable, Text, IntWritable> {
+
+		// reuse objects
+		private final static IntWritable SumValue = new IntWritable();
+
+		public void reduce(Text key, Iterator<IntWritable> values,
+				OutputCollector<Text, IntWritable> output, Reporter reporter)
+				throws IOException {
+			// sum up values
+			int sum = 0;
+			while (values.hasNext()) {
+				sum += values.next().get();
+			}
+			SumValue.set(sum);
+			output.collect(key, SumValue);
+		}
+	}
+
+	private DemoWordCount() {
+	}
+	
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		String filename = "/user/waue/test/132.txt";
+		String outputPath = "sample-counts";
+		int mapTasks = 20;
+		int reduceTasks = 1;
+
+		JobConf conf = new JobConf(DemoWordCount.class);
+		conf.setJobName("wordcount");
+
+		conf.setNumMapTasks(mapTasks);
+		conf.setNumReduceTasks(reduceTasks);
+		//0.16
+//		conf.setInputPath(new Path(filename));
+		Convert.setInputPath(conf, new Path(filename));
+		conf.setOutputKeyClass(Text.class);
+		conf.setOutputValueClass(IntWritable.class);
+		// 0.16
+//		conf.setOutputPath(new Path(outputPath));
+		Convert.setInputPath(conf, new Path(outputPath));
+		conf.setMapperClass(MapClass.class);
+		conf.setCombinerClass(ReduceClass.class);
+		conf.setReducerClass(ReduceClass.class);
+		
+		// Delete the output directory if it exists already
+		Path outputDir = new Path(outputPath);
+		// 0.16
+//		FileSystem.get(conf).delete(outputDir);
+		FileSystem.get(conf).delete(outputDir,true);
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCountTuple.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCountTuple.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCountTuple.java	(revision 20)
@@ -0,0 +1,183 @@
+/**
+ * Program: HBaseRecordPro.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Upgrade to 0.17
+ */
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+import tw.org.nchc.code.Convert;
+import tw.org.nchc.tuple.Schema;
+import tw.org.nchc.tuple.Tuple;
+
+/**
+ * <p>
+ * Demo that illustrates the use of the tuple library ({@link Tuple} class).
+ * Input comes from Bible+Shakespeare sample collection, encoded as single-field
+ * tuples; see {@link DemoPackRecords}. Sample of final output:
+ * </p>
+ * 
+ * <pre>
+ * ...
+ * (admirable, 0)    9
+ * (admirable, 1)    6
+ * (admiral, 0)      2
+ * (admiral, 1)      4
+ * (admiration, 0)  10
+ * (admiration, 1)   6
+ * (admire, 0)       5
+ * (admire, 1)       3
+ * (admired, 0)     12
+ * (admired, 1)      7
+ * ...
+ * </pre>
+ * 
+ * <p>
+ * The first field of the key tuple contains a token, the second field indicates
+ * whether it was found on a even-length or odd-length line. The value is the
+ * count of the tuple occurrences in the collection. In the MapReduce cycle,
+ * output keys consist of tuples (Token, EvenOrOdd). The second field of the
+ * tuple indicates whether the token was found on a line with an even or an odd
+ * number of characters. Values consist of counts of tuple occurrences. Expected
+ * trace of the demo:
+ * </p>
+ * 
+ * <pre>
+ * Map input records=156215
+ * Map output records=1734298
+ * Map input bytes=13118917
+ * Map output bytes=66214039
+ * Combine input records=1734298
+ * Combine output records=192045
+ * Reduce input groups=59225
+ * Reduce input records=192045
+ * Reduce output records=59225
+ * </pre>
+ * 
+ * <p>
+ * Obviously, this isn't a particularly meaningful program, but does illustrate
+ * the use of the {@link Tuple} class.
+ * </p>
+ */
+public class DemoWordCountTuple {
+
+	// create the schema for the tuple that will serve as the key
+	private static final Schema KEY_SCHEMA = new Schema();
+
+	// define the schema statically
+	static {
+		KEY_SCHEMA.addField("Token", String.class, "");
+		KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
+	}
+
+	// mapper that emits tuple as the key, and value '1' for each occurrence
+	private static class MapClass extends MapReduceBase implements
+			Mapper<LongWritable, Tuple, Tuple, IntWritable> {
+
+		// define value '1' statically so we can reuse the object, i.e., avoid
+		// unnecessary object creation
+		private final static IntWritable one = new IntWritable(1);
+
+		// once again, reuse tuples if possible
+		private Tuple tupleOut = KEY_SCHEMA.instantiate();
+
+		public void map(LongWritable key, Tuple tupleIn,
+				OutputCollector<Tuple, IntWritable> output, Reporter reporter)
+				throws IOException {
+
+			// the input value is a tuple; get field 0
+			// see DemoPackRecords of how input SequenceFile is generated
+			String line = (String) tupleIn.get(0);
+			StringTokenizer itr = new StringTokenizer(line);
+			while (itr.hasMoreTokens()) {
+				String token = itr.nextToken();
+
+				// put new values into the tuple
+				tupleOut.set("Token", token);
+				tupleOut.set("EvenOrOdd", line.length() % 2);
+
+				// emit key-value pair
+				output.collect(tupleOut, one);
+			}
+		}
+	}
+
+	// reducer counts up tuple occurrences
+	private static class ReduceClass extends MapReduceBase implements
+			Reducer<Tuple, IntWritable, Tuple, IntWritable> {
+		private final static IntWritable SumValue = new IntWritable();
+
+		public synchronized void reduce(Tuple tupleKey,
+				Iterator<IntWritable> values,
+				OutputCollector<Tuple, IntWritable> output, Reporter reporter)
+				throws IOException {
+			// sum values
+			int sum = 0;
+			while (values.hasNext()) {
+				sum += values.next().get();
+			}
+
+			// keep original tuple key, emit sum of counts as value
+			SumValue.set(sum);
+			output.collect(tupleKey, SumValue);
+		}
+	}
+
+	// dummy constructor
+	private DemoWordCountTuple() {
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		String inPath = "/shared/sample-input/bible+shakes.nopunc.packed";
+		String outputPath = "word-counts-tuple";
+		int numMapTasks = 20;
+		int numReduceTasks = 20;
+
+		JobConf conf = new JobConf(DemoWordCountTuple.class);
+		conf.setJobName("wordcount");
+
+		conf.setNumMapTasks(numMapTasks);
+		conf.setNumReduceTasks(numReduceTasks);
+		// 0.16
+		// conf.setInputPath(new Path(inPath));
+		Convert.setInputPath(conf, new Path(inPath));
+		conf.setInputFormat(SequenceFileInputFormat.class);
+
+		// conf.setOutputPath(new Path(outputPath));
+		Convert.setInputPath(conf, new Path(outputPath));
+		conf.setOutputKeyClass(Tuple.class);
+		conf.setOutputValueClass(IntWritable.class);
+		conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+		conf.setMapperClass(MapClass.class);
+		conf.setCombinerClass(ReduceClass.class);
+		conf.setReducerClass(ReduceClass.class);
+
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCountTuple2.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCountTuple2.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCountTuple2.java	(revision 20)
@@ -0,0 +1,146 @@
+/**
+ * Program: HBaseRecordPro.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Upgrade to 0.17
+ */
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+import tw.org.nchc.code.Convert;
+import tw.org.nchc.tuple.ListWritable;
+import tw.org.nchc.tuple.Schema;
+import tw.org.nchc.tuple.Tuple;
+
+/**
+ * <p>
+ * Demo that illustrates the use of the tuple library ({@link Tuple} and
+ * {@link ListWritable} class). Input comes from Bible+Shakespeare sample
+ * collection, encoded with {@link DemoPackRecords2}. Otherwise, this demo is
+ * exactly the same as {@link DemoWordCountTuple}.
+ * </p>
+ */
+public class DemoWordCountTuple2 {
+
+	// create the schema for the tuple that will serve as the key
+	private static final Schema KEY_SCHEMA = new Schema();
+
+	// define the schema statically
+	static {
+		KEY_SCHEMA.addField("Token", String.class, "");
+		KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
+	}
+
+	// mapper that emits tuple as the key, and value '1' for each occurrence
+	private static class MapClass extends MapReduceBase implements
+			Mapper<LongWritable, Tuple, Tuple, IntWritable> {
+
+		// define value '1' statically so we can reuse the object, i.e., avoid
+		// unnecessary object creation
+		private final static IntWritable one = new IntWritable(1);
+
+		// once again, reuse tuples if possible
+		private Tuple tupleOut = KEY_SCHEMA.instantiate();
+
+		public void map(LongWritable key, Tuple tupleIn,
+				OutputCollector<Tuple, IntWritable> output, Reporter reporter)
+				throws IOException {
+
+			@SuppressWarnings("unchecked")
+			ListWritable<Text> list = (ListWritable<Text>) tupleIn.get(1);
+
+			for (int i = 0; i < list.size(); i++) {
+				Text t = (Text) list.get(i);
+
+				String token = t.toString();
+
+				// put new values into the tuple
+				tupleOut.set("Token", token);
+				tupleOut.set("EvenOrOdd", ((Integer) tupleIn.get(0)) % 2);
+
+				// emit key-value pair
+				output.collect(tupleOut, one);
+			}
+		}
+	}
+
+	// reducer counts up tuple occurrences
+	private static class ReduceClass extends MapReduceBase implements
+			Reducer<Tuple, IntWritable, Tuple, IntWritable> {
+		private final static IntWritable SumValue = new IntWritable();
+
+		public synchronized void reduce(Tuple tupleKey,
+				Iterator<IntWritable> values,
+				OutputCollector<Tuple, IntWritable> output, Reporter reporter)
+				throws IOException {
+			// sum values
+			int sum = 0;
+			while (values.hasNext()) {
+				sum += values.next().get();
+			}
+
+			// keep original tuple key, emit sum of counts as value
+			SumValue.set(sum);
+			output.collect(tupleKey, SumValue);
+		}
+	}
+
+	// dummy constructor
+	private DemoWordCountTuple2() {
+	}
+
+	/**
+	 * Runs the demo.
+	 */
+	public static void main(String[] args) throws IOException {
+		String inPath = "/shared/sample-input/bible+shakes.nopunc.packed2";
+		String outputPath = "word-counts2-tuple";
+		int numMapTasks = 20;
+		int numReduceTasks = 20;
+
+		JobConf conf = new JobConf(DemoWordCountTuple2.class);
+		conf.setJobName("wordcount");
+
+		conf.setNumMapTasks(numMapTasks);
+		conf.setNumReduceTasks(numReduceTasks);
+		
+		// 0.16
+//		conf.setInputPath(new Path(inPath));
+		Convert.setInputPath(conf,new Path(inPath));
+		conf.setInputFormat(SequenceFileInputFormat.class);
+		// 0.16
+//		conf.setOutputPath(new Path(outputPath));
+		Convert.setInputPath(conf, new Path(outputPath));
+		
+		conf.setOutputKeyClass(Tuple.class);
+		conf.setOutputValueClass(IntWritable.class);
+		conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+		conf.setMapperClass(MapClass.class);
+		conf.setCombinerClass(ReduceClass.class);
+		conf.setReducerClass(ReduceClass.class);
+
+		JobClient.runJob(conf);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/ExampleDriver.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/ExampleDriver.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/ExampleDriver.java	(revision 20)
@@ -0,0 +1,39 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package tw.org.nchc.demo;
+
+import org.apache.hadoop.util.ProgramDriver;
+
+/**
+ * A description of an example program based on its class and a human-readable
+ * description.
+ */
+public class ExampleDriver {
+
+  public static void main(String argv[]) {
+    ProgramDriver pgd = new ProgramDriver();
+    try {
+      pgd.addClass("logfetcher", LogFetcher.class, "Log File Fetcher.");
+      pgd.driver(argv);
+    } catch (Throwable e) {
+      e.printStackTrace();
+    }
+  }
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/LogFetcher.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/LogFetcher.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/LogFetcher.java	(revision 20)
@@ -0,0 +1,188 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package tw.org.nchc.demo;
+
+import java.io.IOException;
+import java.text.ParseException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseAdmin;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTable;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import tw.org.nchc.code.Convert;
+
+/**
+ * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
+ * W3CExtended, IISw3cExtended)
+ */
+public class LogFetcher {
+	static HBaseConfiguration conf = new HBaseConfiguration();
+
+	public static final String TABLE = "table.name";
+
+	static String tableName;
+
+	static HTable table = null;
+
+	static boolean eclipseRun = false;
+
+	public static class MapClass extends MapReduceBase implements
+			Mapper<WritableComparable, Text, Text, Writable> {
+
+		@Override
+		public void configure(JobConf job) {
+			tableName = job.get(TABLE, "");
+		}
+
+		public void map(WritableComparable key, Text value,
+				OutputCollector<Text, Writable> output, Reporter reporter)
+				throws IOException {
+			try {
+				AccessLogParser log = new AccessLogParser(value.toString());
+				if (table == null)
+					table = new HTable(conf, new Text(tableName));
+				long lockId = table.startUpdate(new Text(log.getIp()));
+				table.put(lockId, new Text("http:protocol"), log.getProtocol()
+						.getBytes());
+				table.put(lockId, new Text("http:method"), log.getMethod()
+						.getBytes());
+				table.put(lockId, new Text("http:code"), log.getCode()
+						.getBytes());
+				table.put(lockId, new Text("http:bytesize"), log.getByteSize()
+						.getBytes());
+				table.put(lockId, new Text("http:agent"), log.getAgent()
+						.getBytes());
+				table.put(lockId, new Text("url:" + log.getUrl()), log
+						.getReferrer().getBytes());
+				table.put(lockId, new Text("referrer:" + log.getReferrer()),
+						log.getUrl().getBytes());
+				table.commit(lockId, log.getTimestamp());
+			} catch (ParseException e) {
+				e.printStackTrace();
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+	}
+
+	public static void runMapReduce(String table, String dir)
+			throws IOException {
+		Path tempDir = new Path("log/temp");
+		Path InputDir = new Path(dir);
+		FileSystem fs = FileSystem.get(conf);
+		JobConf jobConf = new JobConf(conf, LogFetcher.class);
+		jobConf.setJobName("apache log fetcher");
+		jobConf.set(TABLE, table);
+		// my convert function from 0.16 to 0.17
+		Path[] in = Convert.listPaths(fs, InputDir);
+		if (fs.isFile(InputDir)) {
+			// 0.16
+//			jobConf.setInputPath(InputDir);
+			Convert.setInputPath(jobConf, InputDir);
+		} else {
+			for (int i = 0; i < in.length; i++) {
+				if (fs.isFile(in[i])) {
+					// 0.16
+//					jobConf.addInputPath(in[i]);
+					Convert.addInputPath(jobConf,in[i]);
+				} else {
+					// my convert function from 0.16 to 0.17
+					Path[] sub = Convert.listPaths(fs, in[i]);
+					for (int j = 0; j < sub.length; j++) {
+						if (fs.isFile(sub[j])) {
+							// 0.16
+//							jobConf.addInputPath(sub[j]);
+							Convert.addInputPath(jobConf, sub[j]);
+						}
+					}
+				}
+			}
+		}
+		// 0.16
+//		jobConf.setOutputPath(tempDir);
+		Convert.setOutputPath(jobConf, tempDir);
+		
+		jobConf.setMapperClass(MapClass.class);
+
+		JobClient client = new JobClient(jobConf);
+		ClusterStatus cluster = client.getClusterStatus();
+		jobConf.setNumMapTasks(cluster.getMapTasks());
+		jobConf.setNumReduceTasks(0);
+
+		JobClient.runJob(jobConf);
+		// 0.16
+//		fs.delete(tempDir);
+		fs.delete(tempDir,true);
+		
+		fs.close();
+	}
+
+	public static void creatTable(String table) throws IOException {
+		HBaseAdmin admin = new HBaseAdmin(conf);
+		if (!admin.tableExists(new Text(table))) {
+			System.out.println("1. " + table
+					+ " table creating ... please wait");
+			HTableDescriptor tableDesc = new HTableDescriptor(table);
+			tableDesc.addFamily(new HColumnDescriptor("http:"));
+			tableDesc.addFamily(new HColumnDescriptor("url:"));
+			tableDesc.addFamily(new HColumnDescriptor("referrer:"));
+			admin.createTable(tableDesc);
+		} else {
+			System.out.println("1. " + table + " table already exists.");
+		}
+		System.out.println("2. access_log files fetching using map/reduce");
+	}
+
+	@SuppressWarnings("deprecation")
+	public static void main(String[] args) throws IOException {
+		String table_name = "log";
+		String dir = "apache-log";
+
+		if (eclipseRun) {
+			table_name = "log";
+			dir = "apache-log";
+		} else if (args.length < 2) {
+			System.out
+					.println("Usage: logfetcher <access_log file or directory> <table_name>");
+			System.exit(1);
+		} else {
+			table_name = args[1];
+			dir = args[0];
+		}
+		creatTable(table_name);
+		runMapReduce(table_name, dir);
+
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/demo/SequentialPageRank.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/demo/SequentialPageRank.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/demo/SequentialPageRank.java	(revision 20)
@@ -0,0 +1,107 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.demo;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Collection;
+import java.util.List;
+
+import edu.uci.ics.jung.algorithms.cluster.WeakComponentGraphClusterer;
+import edu.uci.ics.jung.algorithms.importance.PageRank;
+import edu.uci.ics.jung.algorithms.importance.Ranking;
+import edu.uci.ics.jung.graph.DirectedSparseGraph;
+import edu.uci.ics.jung.graph.Graph;
+
+/**
+ * <p>
+ * Program that computes PageRank for a graph using the <a
+ * href="http://jung.sourceforge.net/">JUNG</a> package (2.0 alpha1). Program
+ * takes two command-line arguments: the first is a file containing the graph
+ * data, and the second is the random jump factor (a typical setting is 0.15).
+ * </p>
+ * 
+ * <p>
+ * The graph should be represented as an adjacency list. Each line should have
+ * at least one token; tokens should be tab delimited. The first token
+ * represents the unique id of the source node; subsequent tokens represent its
+ * link targets (i.e., outlinks from the source node). For completeness, there
+ * should be a line representing all nodes, even nodes without outlinks (those
+ * lines will simply contain one token, the source node id).
+ * </p>
+ * 
+ */
+public class SequentialPageRank {
+
+	private SequentialPageRank() {
+	}
+
+	/**
+	 * Runs the program
+	 */
+	public static void main(String[] args) throws IOException {
+		if (args.length != 2) {
+			System.err
+					.println("usage: SequentialPageRage [graph-adjacency-list] [random-jump-factor]");
+			System.exit(-1);
+		}
+		String infile = args[0];
+		float alpha = Float.parseFloat(args[1]);
+
+		int edgeCnt = 0;
+		DirectedSparseGraph<String, Integer> graph = new DirectedSparseGraph<String, Integer>();
+
+		BufferedReader data = new BufferedReader(new InputStreamReader(
+				new FileInputStream(infile)));
+
+		String line;
+		while ((line = data.readLine()) != null) {
+			line.trim();
+			String[] arr = line.split("\\t");
+
+			for (int i = 1; i < arr.length; i++) {
+				graph.addEdge(new Integer(edgeCnt++), arr[0], arr[i]);
+			}
+		}
+
+		data.close();
+
+		WeakComponentGraphClusterer<String, Integer> clusterer = new WeakComponentGraphClusterer<String, Integer>();
+
+		Collection<Graph<String, Integer>> components = clusterer
+				.transform(graph);
+		int numComponents = components.size();
+		System.out.println("Number of components: " + numComponents);
+		System.out.println("Number of edges: " + graph.getEdgeCount());
+		System.out.println("Number of nodes: " + graph.getVertexCount());
+		System.out.println("Random jump factor: " + alpha);
+
+		PageRank<String, Integer> ranker = new PageRank<String, Integer>(graph,
+				alpha);
+		ranker.evaluate();
+
+		System.out.println("\nPageRank of nodes, in descending order:");
+		for (Ranking<?> s : (List<Ranking<?>>) ranker.getRankings()) {
+			String pmid = s.getRanked().toString();
+
+			System.out.println(pmid + " " + s.rankScore);
+		}
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritable.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritable.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritable.java	(revision 20)
@@ -0,0 +1,129 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <p>
+ * Class that represents an array list in Hadoop's data type system. It extends ArrayList class, 
+ * hence supports all services provided by ArrayList.
+ * Elements in the list must be homogeneous and must implement Hadoop's Writable interface. 
+ * This class, combined with {@link Tuple}, allows the user to
+ * define arbitrarily complex data structures.
+ * </p>
+ * 
+ * @see Tuple
+ * @param <E>
+ *            type of list element
+ */
+
+public class ArrayListWritable<E extends Writable> extends ArrayList<E> implements Writable{
+
+    private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates an ArrayListWritable object.
+	 */
+	public ArrayListWritable() {
+		super();
+	}
+	
+	/**
+	 * Creates an ArrayListWritable object from a regular ArrayList.
+	 */
+	public ArrayListWritable(ArrayList<E> array) {
+		super(array);
+	}
+
+	/**
+	 * Deserializes the array.
+	 * 
+	 * @param in
+	 *            source for raw byte representation
+	 */
+	@SuppressWarnings("unchecked")
+	public void readFields(DataInput in) throws IOException {
+
+		this.clear();
+
+		int numFields = in.readInt();
+		if(numFields==0) return;
+		String className = in.readUTF();
+		E obj;
+		try {
+			Class c = Class.forName(className);
+			for (int i = 0; i < numFields; i++) {
+				obj = (E) c.newInstance();
+				obj.readFields(in);
+				this.add(obj);
+			}
+
+		} catch (ClassNotFoundException e) {
+			e.printStackTrace();
+		} catch (IllegalAccessException e) {
+			e.printStackTrace();
+		} catch (InstantiationException e) {
+			e.printStackTrace();
+		}
+	}
+	
+	/**
+	 * Serializes this array.
+	 * 
+	 * @param out
+	 *            where to write the raw byte representation
+	 */
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(this.size());
+		if(size()==0) return;
+		E obj=get(0);
+		
+		out.writeUTF(obj.getClass().getCanonicalName());
+
+		for (int i = 0; i < size(); i++) {
+			obj = get(i);
+			if (obj == null) {
+				throw new IOException("Cannot serialize null fields!");
+			}
+			obj.write(out);
+		}
+	}
+
+	/**
+	 * Generates human-readable String representation of this ArrayList.
+	 * 
+	 * @return human-readable String representation of this ArrayList
+	 */
+	public String toString() {
+		StringBuffer sb = new StringBuffer();
+		sb.append("[");
+		for (int i = 0; i < this.size(); i++) {
+			if (i != 0)
+				sb.append(", ");
+			sb.append(this.get(i));
+		}
+		sb.append("]");
+
+		return sb.toString();
+	}
+	
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritableComparable.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritableComparable.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritableComparable.java	(revision 20)
@@ -0,0 +1,191 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * <p>
+ * Class that represents an array list in Hadoop's data type system. It extends ArrayList class, 
+ * hence supports all services provided by ArrayList.
+ * Elements in the list must be homogeneous and must implement Hadoop's Writable interface. 
+ * This class, combined with {@link Tuple}, allows the user to
+ * define arbitrarily complex data structures.
+ * </p>
+ * 
+ * @see Tuple
+ * @param <E>
+ *            type of list element
+ */
+
+public class ArrayListWritableComparable<E extends WritableComparable> extends ArrayList<E> implements WritableComparable{
+
+    private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates an ArrayListWritable object.
+	 */
+	public ArrayListWritableComparable() {
+		super();
+	}
+	
+	/**
+	 * Creates an ArrayListWritableComparable object from a regular ArrayList.
+	 */
+	public ArrayListWritableComparable(ArrayList<E> array) {
+		super(array);
+	}
+
+	
+	
+	/**
+	 * Deserializes the array.
+	 * 
+	 * @param in
+	 *            source for raw byte representation
+	 */
+	@SuppressWarnings("unchecked")
+	public void readFields(DataInput in) throws IOException {
+
+		this.clear();
+
+		int numFields = in.readInt();
+		if(numFields==0) return;
+		String className = in.readUTF();
+		E obj;
+		try {
+			Class c = Class.forName(className);
+			for (int i = 0; i < numFields; i++) {
+				obj = (E) c.newInstance();
+				obj.readFields(in);
+				this.add(obj);
+			}
+
+		} catch (ClassNotFoundException e) {
+			e.printStackTrace();
+		} catch (IllegalAccessException e) {
+			e.printStackTrace();
+		} catch (InstantiationException e) {
+			e.printStackTrace();
+		}
+	}
+
+	/**
+	 * Serializes this Tuple.
+	 * 
+	 * @param out
+	 *            where to write the raw byte representation
+	 */
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(this.size());
+		if(size()==0) return;
+		E obj=get(0);
+		
+		out.writeUTF(obj.getClass().getCanonicalName());
+
+		for (int i = 0; i < size(); i++) {
+			obj = get(i);
+			if (obj == null) {
+				throw new IOException("Cannot serialize null fields!");
+			}
+			obj.write(out);
+		}
+	}
+	
+	/**
+	 * <p>
+	 * Defines a natural sort order for the ListWritable class. Following
+	 * standard convention, this method returns a value less than zero, a value
+	 * greater than zero, or zero if this ListWritable should be sorted before,
+	 * sorted after, or is equal to <code>obj</code>. The sort order is
+	 * defined as follows:
+	 * </p>
+	 * 
+	 * <ul>
+	 * <li>Each element in the list is compared sequentially from first to
+	 * last.</li>
+	 * <li>Lists are sorted with respect to the natural order of the current
+	 * list element under consideration, by calling its <code>compareTo</code>
+	 * method.</li>
+	 * <li>If the current list elements are equal, the next set of elements are
+	 * considered.</li>
+	 * <li>If all compared elements are equal, but lists are different lengths,
+	 * the shorter list is sorted first.</li>
+	 * <li>If all list elements are equal and the lists are equal in length,
+	 * then the lists are considered equal</li>
+	 * </ul>
+	 * 
+	 * @return a value less than zero, a value greater than zero, or zero if
+	 *         this Tuple should be sorted before, sorted after, or is equal to
+	 *         <code>obj</code>.
+	 */
+	public int compareTo(Object obj) {
+		ArrayListWritableComparable<?> that = (ArrayListWritableComparable<?>) obj;
+
+		// iterate through the fields
+		for (int i = 0; i < this.size(); i++) {
+			// sort shorter list first
+			if (i >= that.size())
+				return 1;
+
+			@SuppressWarnings("unchecked")
+			Comparable<Object> thisField = this.get(i);
+			@SuppressWarnings("unchecked")
+			Comparable<Object> thatField = that.get(i);
+
+			if (thisField.equals(thatField)) {
+				// if we're down to the last field, sort shorter list first
+				if (i == this.size() - 1) {
+					if (this.size() > that.size())
+						return 1;
+
+					if (this.size() < that.size())
+						return -1;
+				}
+				// otherwise, move to next field
+			} else {
+				return thisField.compareTo(thatField);
+			}
+		}
+
+		return 0;
+	}
+
+
+	/**
+	 * Generates human-readable String representation of this ArrayList.
+	 * 
+	 * @return human-readable String representation of this ArrayList
+	 */
+	public String toString() {
+		StringBuffer sb = new StringBuffer();
+		sb.append("[");
+		for (int i = 0; i < this.size(); i++){
+			if (i != 0)
+				sb.append(", ");
+			sb.append(this.get(i));
+		}
+		sb.append("]");
+
+		return sb.toString();
+	}
+	
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritableComparableTest.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritableComparableTest.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritableComparableTest.java	(revision 20)
@@ -0,0 +1,280 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import junit.framework.*;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Test;
+
+public class ArrayListWritableComparableTest {
+
+	@Test
+	public void testBasic() throws IOException {
+		ArrayListWritableComparable<Text> list = new ArrayListWritableComparable<Text>();
+
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+
+		assertEquals(list.get(0).toString(), "hi");
+		assertEquals(list.get(1).toString(), "there");
+	}
+
+	@Test
+	public void testSerialize1() throws IOException {
+		//ArrayListWritableComparable<Text> list = new ArrayListWritableComparable<Text>();
+		ArrayListWritableComparable<WritableComparable> list = new ArrayListWritableComparable<WritableComparable>();
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		list.write(dataOut);
+
+		ArrayListWritableComparable<Text> newList = new ArrayListWritableComparable<Text>();
+		newList.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+
+		assertEquals(newList.get(0).toString(), "hi");
+		assertEquals(newList.get(1).toString(), "there");
+	}
+
+	@Test
+	public void testSerialize2() throws IOException {
+		ArrayListWritableComparable<FloatWritable> list = new ArrayListWritableComparable<FloatWritable>();
+
+		list.add(new FloatWritable(0.3f));
+		list.add(new FloatWritable(3244.2f));
+
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		list.write(dataOut);
+
+		ArrayListWritableComparable<FloatWritable> newList = new ArrayListWritableComparable<FloatWritable>();
+		newList.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+
+		assertTrue(newList.get(0).get() == 0.3f);
+		assertTrue(newList.get(1).get() == 3244.2f);
+	}
+
+	@Test
+	public void testToString() {
+		ArrayListWritableComparable<Text> list = new ArrayListWritableComparable<Text>();
+
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+
+		assertEquals(list.toString(), "[hi, there]");
+	}
+
+	@Test
+	public void testClear() {
+		ArrayListWritableComparable<Text> list = new ArrayListWritableComparable<Text>();
+
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+		list.clear();
+		
+		assertEquals(list.size(), 0);
+	}
+
+	@Test
+	public void testEmpty() throws IOException {
+		ArrayListWritableComparable<Text> list = new ArrayListWritableComparable<Text>();
+		
+		assertTrue(list.size() == 0);
+		
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		list.write(dataOut);
+
+		ArrayListWritableComparable<Text> newList = new ArrayListWritableComparable<Text>();
+		newList.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+		assertTrue(newList.size() == 0);
+		
+		newList.add(new Text("Hey"));
+		assertEquals(newList.get(0),new Text("Hey"));
+
+	}
+	
+	/*@Test
+	public void testTypeSafety() {
+		ArrayListWritableComparable<WritableComparable> list = new ArrayListWritableComparable<WritableComparable> ();
+		list.add(new Text("Hello"));
+		list.add(new Text("Are you there"));
+		
+		try {
+			list.add(new IntWritable(5));
+			assertTrue(false); // should throw an exception before reaching this line.
+		} catch (IllegalArgumentException e) {
+			assertTrue(true);
+		}
+		
+		ArrayList<WritableComparable> otherList = new ArrayList<WritableComparable>();
+		otherList.add(new Text("Test"));
+		otherList.add(new Text("Test 2"));
+		
+		assertTrue(list.addAll(otherList));
+		
+		otherList.add(new IntWritable(6));
+		try {
+			list.addAll(otherList);
+			assertTrue(false);
+		} catch (IllegalArgumentException e) {
+			assertTrue(true);
+		}
+	}*/
+	
+	@Test 
+	public void testListMethods() {
+		IntWritable a = new IntWritable(1);
+		IntWritable b = new IntWritable(2);
+		IntWritable c = new IntWritable(3);
+		IntWritable d = new IntWritable(4);
+		IntWritable e = new IntWritable(5);
+		
+		ArrayListWritableComparable<IntWritable> list = new ArrayListWritableComparable<IntWritable>();
+		assertTrue(list.isEmpty());
+		list.add(a);
+		list.add(b);
+		list.add(c);
+		list.add(d);
+		list.add(e);
+		
+		int pos = 0;
+		for (IntWritable i : list) {
+			assertEquals(i, list.get(pos));
+			++pos;
+		}
+		
+		assertTrue(list.indexOf(d) == 3);
+		list.add(2, a);
+		assertTrue(list.lastIndexOf(a) == 2);
+		assertEquals(list.get(2), list.get(0));
+		assertTrue(list.size() == 6);
+		
+		assertTrue(list.contains(c));
+		assertTrue(!list.contains(new IntWritable(123)));
+		
+		ArrayList<IntWritable> otherList = new ArrayList<IntWritable>();
+		otherList.add(a);
+		otherList.add(b);
+		otherList.add(c);
+		
+		assertTrue(list.containsAll(otherList));
+		
+		otherList.add(new IntWritable(200));
+		assertTrue(!list.containsAll(otherList));
+		
+		assertEquals(a, otherList.remove(0));
+		assertTrue(list.remove(d));
+		
+	}
+	
+	@Test
+	public void testSorting1() {
+		ArrayListWritableComparable<Text> list1 = new ArrayListWritableComparable<Text>();
+		ArrayListWritableComparable<Text> list2 = new ArrayListWritableComparable<Text>();
+
+		list1.add(new Text("a"));
+
+		assertTrue(list1.compareTo(list2) > 0);
+	}
+	
+	@Test
+	public void testSorting2() {
+		ArrayListWritableComparable<Text> list1 = new ArrayListWritableComparable<Text>();
+		ArrayListWritableComparable<Text> list2 = new ArrayListWritableComparable<Text>();
+
+		list1.add(new Text("a"));
+		list2.add(new Text("b"));
+
+		assertTrue(list1.compareTo(list2) < 0);
+		assertTrue(list2.compareTo(list1) > 0);
+		
+		list2.clear();
+		list2.add(new Text("a"));
+		
+		assertTrue(list1.compareTo(list2) == 0);
+		
+		list1.add(new Text("a"));
+		list2.add(new Text("b"));
+		
+		// list 1 is now [a, a]
+		// list 2 is now [a, b]
+		assertTrue(list1.compareTo(list2) < 0);
+		assertTrue(list2.compareTo(list1) > 0);
+
+		// list 1 is now [a, a, a]
+		list1.add(new Text("a"));
+		
+		assertTrue(list1.compareTo(list2) < 0);
+	}
+
+	@Test
+	public void testSorting3() {
+		ArrayListWritableComparable<Text> list1 = new ArrayListWritableComparable<Text>();
+		ArrayListWritableComparable<Text> list2 = new ArrayListWritableComparable<Text>();
+		ArrayListWritableComparable<Text> list3 = new ArrayListWritableComparable<Text>();
+
+		list1.add(new Text("a"));
+		
+		list2.add(new Text("a"));
+		list2.add(new Text("a"));
+		
+		list3.add(new Text("a"));
+		list3.add(new Text("a"));
+		
+		assertTrue(list2.compareTo(list3) == 0);
+
+		list3.add(new Text("a"));
+		
+		// list 1 is [a]
+		// list 2 is [a, a]
+		// list 3 is [a, a, a]
+		
+		assertTrue(list1.compareTo(list2) < 0);
+		assertTrue(list1.compareTo(list3) < 0);
+		assertTrue(list2.compareTo(list1) > 0);
+		assertTrue(list2.compareTo(list3) < 0);
+		assertTrue(list3.compareTo(list1) > 0);
+		assertTrue(list3.compareTo(list2) > 0);
+	}
+
+	public static junit.framework.Test suite() {
+		return new JUnit4TestAdapter(ArrayListWritableComparableTest.class);
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritableTest.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritableTest.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/ArrayListWritableTest.java	(revision 20)
@@ -0,0 +1,235 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import junit.framework.JUnit4TestAdapter;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+public class ArrayListWritableTest {
+
+	@Test
+	public void testBasic() throws IOException {
+		ArrayListWritable<Text> list = new ArrayListWritable<Text>();
+
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+
+		assertEquals(list.get(0).toString(), "hi");
+		assertEquals(list.get(1).toString(), "there");
+	}
+
+	@Test
+	public void testSerialize1() throws IOException {
+		//ArrayListWritable<Text> list = new ArrayListWritable<Text>();
+		ArrayListWritable<Writable> list = new ArrayListWritable<Writable>();
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		list.write(dataOut);
+
+		ArrayListWritable<Text> newList = new ArrayListWritable<Text>();
+		newList.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+
+		assertEquals(newList.get(0).toString(), "hi");
+		assertEquals(newList.get(1).toString(), "there");
+	}
+
+	@Test
+	public void testSerialize2() throws IOException {
+		ArrayListWritable<FloatWritable> list = new ArrayListWritable<FloatWritable>();
+
+		list.add(new FloatWritable(0.3f));
+		list.add(new FloatWritable(3244.2f));
+
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		list.write(dataOut);
+
+		ArrayListWritable<FloatWritable> newList = new ArrayListWritable<FloatWritable>();
+		newList.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+
+		assertTrue(newList.get(0).get() == 0.3f);
+		assertTrue(newList.get(1).get() == 3244.2f);
+	}
+
+	@Test
+	public void testSerialize3() throws IOException {
+		//ArrayListWritable<Text> list = new ArrayListWritable<Text>();
+		ArrayListWritable<Writable> list = new ArrayListWritable<Writable>();
+		list.add(new Text("hi"));
+		list.add(new IntWritable(1));
+
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		list.write(dataOut);
+
+		ArrayListWritable<Writable> newList = new ArrayListWritable<Writable>();
+		newList.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+
+		try {
+	        assertEquals(newList.get(0).toString(), "hi");
+			IntWritable i=(IntWritable)(newList.get(1));
+			assertEquals(i.get(), 1);
+	        assertTrue(false);
+        } catch (Exception e) {
+        }
+	}
+
+	@Test
+	public void testToString() {
+		ArrayListWritable<Text> list = new ArrayListWritable<Text>();
+
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+
+		assertEquals(list.toString(), "[hi, there]");
+	}
+
+	@Test
+	public void testClear() {
+		ArrayListWritable<Text> list = new ArrayListWritable<Text>();
+
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+		list.clear();
+		
+		assertEquals(list.size(), 0);
+	}
+
+	@Test
+	public void testEmpty() throws IOException {
+		ArrayListWritable<Text> list = new ArrayListWritable<Text>();
+		
+		assertTrue(list.size() == 0);
+		
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		list.write(dataOut);
+
+		ArrayListWritable<Text> newList = new ArrayListWritable<Text>();
+		newList.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+		assertTrue(newList.size() == 0);
+		
+		newList.add(new Text("Hey"));
+		assertEquals(newList.get(0),new Text("Hey"));
+
+	}
+	
+	/*@Test
+	public void testTypeSafety() {
+		ArrayListWritable<WritableComparable> list = new ArrayListWritable<WritableComparable> ();
+		list.add(new Text("Hello"));
+		list.add(new Text("Are you there"));
+		
+		try {
+			list.add(new IntWritable(5));
+			assertTrue(false); // should throw an exception before reaching this line.
+		} catch (IllegalArgumentException e) {
+			assertTrue(true);
+		}
+		
+		ArrayList<WritableComparable> otherList = new ArrayList<WritableComparable>();
+		otherList.add(new Text("Test"));
+		otherList.add(new Text("Test 2"));
+		
+		assertTrue(list.addAll(otherList));
+		
+		otherList.add(new IntWritable(6));
+		try {
+			list.addAll(otherList);
+			assertTrue(false);
+		} catch (IllegalArgumentException e) {
+			assertTrue(true);
+		}
+	}*/
+	
+	@Test 
+	public void testListMethods() {
+		IntWritable a = new IntWritable(1);
+		IntWritable b = new IntWritable(2);
+		IntWritable c = new IntWritable(3);
+		IntWritable d = new IntWritable(4);
+		IntWritable e = new IntWritable(5);
+		
+		ArrayListWritable<IntWritable> list = new ArrayListWritable<IntWritable>();
+		assertTrue(list.isEmpty());
+		list.add(a);
+		list.add(b);
+		list.add(c);
+		list.add(d);
+		list.add(e);
+		
+		int pos = 0;
+		for (IntWritable i : list) {
+			assertEquals(i, list.get(pos));
+			++pos;
+		}
+		
+		assertTrue(list.indexOf(d) == 3);
+		list.add(2, a);
+		assertTrue(list.lastIndexOf(a) == 2);
+		assertEquals(list.get(2), list.get(0));
+		assertTrue(list.size() == 6);
+		
+		assertTrue(list.contains(c));
+		assertTrue(!list.contains(new IntWritable(123)));
+		
+		ArrayList<IntWritable> otherList = new ArrayList<IntWritable>();
+		otherList.add(a);
+		otherList.add(b);
+		otherList.add(c);
+		
+		assertTrue(list.containsAll(otherList));
+		
+		otherList.add(new IntWritable(200));
+		assertTrue(!list.containsAll(otherList));
+		
+		assertEquals(a, otherList.remove(0));
+		assertTrue(list.remove(d));
+		
+	}
+	
+	public static junit.framework.Test suite() {
+		return new JUnit4TestAdapter(ArrayListWritableTest.class);
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/HashMapWritable.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/HashMapWritable.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/HashMapWritable.java	(revision 20)
@@ -0,0 +1,102 @@
+
+package tw.org.nchc.tuple;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashMap;
+
+
+import org.apache.hadoop.io.Writable;
+
+public class HashMapWritable<K extends Writable, V extends Writable> extends HashMap<K, V> implements
+        Writable {
+
+	/**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    /**
+	 * Creates a HashMapWritable object.
+	 */
+	public HashMapWritable() {
+		super();
+	}
+	
+	/**
+	 * Creates a HashMapWritable object from a regular HashMap.
+	 */
+	public HashMapWritable(HashMap<K, V> map) {
+		super(map);
+	}
+
+	/**
+	 * Deserializes the array.
+	 * 
+	 * @param in
+	 *            source for raw byte representation
+	 */
+	public void readFields(DataInput in) throws IOException {
+		
+		this.clear();
+
+		int numEntries = in.readInt();
+		if(numEntries==0) return;
+		
+		String keyClassName = in.readUTF();
+		String valueClassName = in.readUTF();
+		
+		K objK;
+		V objV;
+		try {
+			Class keyClass = Class.forName(keyClassName);
+			Class valueClass = Class.forName(valueClassName);
+			for (int i = 0; i < numEntries; i++) {
+				objK = (K) keyClass.newInstance();
+				objK.readFields(in);
+				objV = (V) valueClass.newInstance();
+				objV.readFields(in);
+				put(objK, objV);
+			}
+
+		} catch (ClassNotFoundException e) {
+			e.printStackTrace();
+		} catch (IllegalAccessException e) {
+			e.printStackTrace();
+		} catch (InstantiationException e) {
+			e.printStackTrace();
+		}
+		
+	}
+
+	/**
+	 * Serializes this array.
+	 * 
+	 * @param out
+	 *            where to write the raw byte representation
+	 */
+	public void write(DataOutput out) throws IOException {
+		// Write out the number of entries in the map
+	    out.writeInt(size());
+	    if(size()==0) return;
+	    
+	    // Write out the class names for keys and values
+	    // assuming that data is homogeneuos (i.e., all entries have same types)
+	    Set<Map.Entry<K, V>> entries = entrySet();
+	    Map.Entry<K, V> first = entries.iterator().next();
+	    K objK = first.getKey();
+	    V objV = first.getValue();
+	    out.writeUTF(objK.getClass().getCanonicalName());
+	    out.writeUTF(objV.getClass().getCanonicalName());
+
+	    // Then write out each key/value pair
+	    for (Map.Entry<K, V> e: entrySet()) {
+	      e.getKey().write(out);
+	      e.getValue().write(out);
+	    }
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/HashMapWritableTest.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/HashMapWritableTest.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/HashMapWritableTest.java	(revision 20)
@@ -0,0 +1,184 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.JUnit4TestAdapter;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+
+public class HashMapWritableTest {
+
+	@Test
+	public void testBasic() throws IOException {
+		HashMapWritable<Text, IntWritable> map = new HashMapWritable<Text, IntWritable>();
+
+		map.put(new Text("hi"), new IntWritable(5));
+		map.put(new Text("there"), new IntWritable(22));
+
+		Text key;
+		IntWritable value;
+
+		assertEquals(map.size(), 2);
+		
+		key=new Text("hi");
+		value=map.get(key);
+		assertTrue(value!=null);
+		assertEquals(value.get(), 5);
+		
+		value=map.remove(key);
+		assertEquals(map.size(), 1);
+		
+		key=new Text("there");
+		value=map.get(key);
+		assertTrue(value!=null);
+		assertEquals(value.get(), 22);
+	}
+
+	@Test
+	public void testSerialize1() throws IOException {
+		HashMapWritable<Text, IntWritable> origMap = new HashMapWritable<Text, IntWritable>();
+
+		origMap.put(new Text("hi"), new IntWritable(5));
+		origMap.put(new Text("there"), new IntWritable(22));
+	
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		origMap.write(dataOut);
+
+		HashMapWritable<Text, IntWritable> map = new HashMapWritable<Text, IntWritable>();
+
+		map.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+
+		Text key;
+		IntWritable value;
+
+		assertEquals(map.size(), 2);
+		
+		key=new Text("hi");
+		value=map.get(key);
+		assertTrue(value!=null);
+		assertEquals(value.get(), 5);
+		
+		value=map.remove(key);
+		assertEquals(map.size(), 1);
+		
+		key=new Text("there");
+		value=map.get(key);
+		assertTrue(value!=null);
+		assertEquals(value.get(), 22);
+	}
+
+	@Test
+	public void testSerialize2() throws IOException {
+		HashMapWritable<Text, LongWritable> origMap = new HashMapWritable<Text, LongWritable>();
+
+		origMap.put(new Text("hi"), new LongWritable(52));
+		origMap.put(new Text("there"), new LongWritable(77));
+	
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		origMap.write(dataOut);
+
+		HashMapWritable<Text, LongWritable> map = new HashMapWritable<Text, LongWritable>();
+
+		map.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+
+		Text key;
+		LongWritable value;
+
+		assertEquals(map.size(), 2);
+		
+		key=new Text("hi");
+		value=map.get(key);
+		assertTrue(value!=null);
+		assertEquals(value.get(), 52);
+		
+		value=map.remove(key);
+		assertEquals(map.size(), 1);
+		
+		key=new Text("there");
+		value=map.get(key);
+		assertTrue(value!=null);
+		assertEquals(value.get(), 77);
+	}
+
+
+	@Test
+	public void testTypeSafety() throws IOException {
+		HashMapWritable<Writable, Writable> origMap = new HashMapWritable<Writable, Writable>();
+
+		origMap.put(new Text("hi"), new FloatWritable(5.3f));
+		origMap.put(new Text("there"), new Text("bbb"));
+	
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		origMap.write(dataOut);
+
+		HashMapWritable<Writable, Writable> map = new HashMapWritable<Writable, Writable>();
+
+		try {
+	        map.readFields(new DataInputStream(new ByteArrayInputStream(
+	        		bytesOut.toByteArray())));
+	        assertTrue(false);
+        } catch (Exception e) {
+        }
+	}
+
+
+
+	@Test
+	public void testSerializeEmpty() throws IOException {
+		HashMapWritable<IntWritable, Text> map = new HashMapWritable<IntWritable, Text>();
+		
+		assertTrue(map.size() == 0);
+		
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		map.write(dataOut);
+
+		HashMapWritable<IntWritable, Text> newList = new HashMapWritable<IntWritable, Text>();
+		newList.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+		assertTrue(newList.size() == 0);
+	}
+	
+	public static junit.framework.Test suite() {
+		return new JUnit4TestAdapter(HashMapWritableTest.class);
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/ListWritable.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/ListWritable.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/ListWritable.java	(revision 20)
@@ -0,0 +1,421 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * <p>
+ * Class that represents a list in Hadoop's data type system. Elements in the
+ * list must be homogeneous and must implement Hadoop's WritableComparable
+ * interface. This class, combined with {@link Tuple}, allows the user to
+ * define arbitrarily complex data structures.
+ * </p>
+ * 
+ * @see Tuple
+ * @param <E>
+ *            type of list element
+ */
+public class ListWritable<E extends WritableComparable> implements WritableComparable, Iterable<E>, List<E> {
+
+	private List<E> mList;
+
+	private Class<?> listElementClass;
+
+	/**
+	 * Creates a ListWritable object.
+	 */
+	public ListWritable() {
+		mList = new ArrayList<E>();
+	}
+
+	/**
+	 * Appends the specified element to the end of this list.
+	 * 
+	 * @param e
+	 *            element to be appended to this list
+	 */
+	public boolean add(E e) {
+		if (mList.size() == 0) 
+			listElementClass = e.getClass();
+		else if (!e.getClass().equals(listElementClass))
+			throw new IllegalArgumentException("Cannot add element of type " + e.getClass().getCanonicalName() + " to list of type " + listElementClass.getCanonicalName());
+		return mList.add(e);
+	}
+
+	/**
+	 * Returns the element at the specified position in this list
+	 * 
+	 * @param index
+	 *            index of the element to return
+	 * @return the element at the specified position in this list
+	 */
+	public E get(int index) {
+		if (index < 0 || index >= mList.size()) {
+			throw new IndexOutOfBoundsException();
+		}
+
+		return mList.get(index);
+	}
+
+	/**
+	 * Removes all elements from this list.
+	 */
+	public void clear() {
+		mList.clear();
+	}
+
+	/**
+	 * Replaces the element at the specified position in this list with the
+	 * specified element.
+	 * 
+	 * @param index
+	 *            index of the element to replace
+	 * @param element
+	 *            element to be stored at the specified position
+	 */
+	public E set(int index, E element) {
+        if(mList.size() > 0 && !element.getClass().equals(listElementClass)) {
+			throw new IllegalArgumentException("Cannot add element of type " + element.getClass().getCanonicalName() + " to list of type " + listElementClass.getCanonicalName());
+        }
+		return mList.set(index, element);
+	}
+
+	/**
+	 * Returns the number of elements in this list.
+	 * 
+	 * @return the number of elements in this list
+	 */
+	public int size() {
+		return mList.size();
+	}
+
+	/**
+	 * Deserializes the Tuple.
+	 * 
+	 * @param in
+	 *            source for raw byte representation
+	 */
+	@SuppressWarnings("unchecked")
+	public void readFields(DataInput in) throws IOException {
+
+		mList.clear();
+
+		int numFields = in.readInt();
+		String className = in.readUTF();
+		E obj;
+		try {
+			Class c = Class.forName(className);
+			listElementClass = c;
+
+			for (int i = 0; i < numFields; i++) {
+				obj = (E) c.newInstance();
+				int sz = in.readInt();
+				byte[] bytes = new byte[sz];
+				in.readFully(bytes);
+
+				obj.readFields(new DataInputStream(new ByteArrayInputStream(bytes)));
+				this.add(obj);
+			}
+
+		} catch (ClassNotFoundException e) {
+			e.printStackTrace();
+		} catch (IllegalAccessException e) {
+			e.printStackTrace();
+		} catch (InstantiationException e) {
+			e.printStackTrace();
+		}
+	}
+
+	/**
+	 * Serializes this Tuple.
+	 * 
+	 * @param out
+	 *            where to write the raw byte representation
+	 */
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(mList.size());
+		if (mList.size() > 0)
+			out.writeUTF(listElementClass.getCanonicalName());
+		else
+			out.writeUTF(WritableComparable.class.getCanonicalName());
+
+		for (int i = 0; i < mList.size(); i++) {
+			if (mList.get(i) == null) {
+				throw new IOException("Cannot serialize null fields!");
+			}
+
+			ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+			DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+			mList.get(i).write(dataOut);
+
+			out.writeInt(bytesOut.size());
+			out.write(bytesOut.toByteArray());
+		}
+	}
+
+	/**
+	 * Generates human-readable String representation of this Tuple.
+	 * 
+	 * @return human-readable String representation of this Tuple
+	 */
+	public String toString() {
+		StringBuffer sb = new StringBuffer();
+		sb.append("[");
+		for (int i = 0; i < this.size(); i++) {
+			if (i != 0)
+				sb.append(", ");
+			sb.append(this.get(i));
+		}
+		sb.append("]");
+
+		return sb.toString();
+	}
+
+	/**
+	 * <p>
+	 * Defines a natural sort order for the ListWritable class. Following
+	 * standard convention, this method returns a value less than zero, a value
+	 * greater than zero, or zero if this ListWritable should be sorted before,
+	 * sorted after, or is equal to <code>obj</code>. The sort order is
+	 * defined as follows:
+	 * </p>
+	 * 
+	 * <ul>
+	 * <li>Each element in the list is compared sequentially from first to
+	 * last.</li>
+	 * <li>Lists are sorted with respect to the natural order of the current
+	 * list element under consideration, by calling its <code>compareTo</code>
+	 * method.</li>
+	 * <li>If the current list elements are equal, the next set of elements are
+	 * considered.</li>
+	 * <li>If all compared elements are equal, but lists are different lengths,
+	 * the shorter list is sorted first.</li>
+	 * <li>If all list elements are equal and the lists are equal in length,
+	 * then the lists are considered equal</li>
+	 * </ul>
+	 * 
+	 * @return a value less than zero, a value greater than zero, or zero if
+	 *         this Tuple should be sorted before, sorted after, or is equal to
+	 *         <code>obj</code>.
+	 */
+	public int compareTo(Object obj) {
+		ListWritable<?> that = (ListWritable<?>) obj;
+
+		// iterate through the fields
+		for (int i = 0; i < this.size(); i++) {
+			// sort shorter list first
+			if (i >= that.size())
+				return 1;
+
+			@SuppressWarnings("unchecked")
+			Comparable<Object> thisField = this.get(i);
+			@SuppressWarnings("unchecked")
+			Comparable<Object> thatField = that.get(i);
+
+			if (thisField.equals(thatField)) {
+				// if we're down to the last field, sort shorter list first
+				if (i == this.size() - 1) {
+					if (this.size() > that.size())
+						return 1;
+
+					if (this.size() < that.size())
+						return -1;
+				}
+				// otherwise, move to next field
+			} else {
+				return thisField.compareTo(thatField);
+			}
+		}
+
+		return 0;
+	}
+
+	/**
+	 * @return an iterator over the elements in this list in proper sequence.
+	 */
+	public Iterator<E> iterator() {
+		return this.mList.iterator();
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#add(int, java.lang.Object)
+	 */
+	public void add(int pos, E element) {
+		
+        if(mList.size() > 0 && !element.getClass().equals(listElementClass)) {
+			throw new IllegalArgumentException("Cannot add element of type " + element.getClass().getCanonicalName() + " to list of type " + listElementClass.getCanonicalName());
+        }
+		mList.add(pos, element);
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#addAll(java.util.Collection)
+	 */
+	public boolean addAll(Collection<? extends E> elements) {
+		boolean failure = false;
+		Iterator<? extends E> it = elements.iterator();
+		while (it.hasNext()) {
+			E obj = it.next();
+			if (mList.size() == 0) 
+				listElementClass = obj.getClass();
+			else if (!obj.getClass().equals(listElementClass))
+				throw new IllegalArgumentException("Cannot add element of type " + obj.getClass().getCanonicalName() + " to list of type " + listElementClass.getCanonicalName());
+			
+			if (!mList.add(obj)) failure = true;
+		}
+		
+		
+		return !failure;
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#addAll(int, java.util.Collection)
+	 */
+	public boolean addAll(int pos, Collection<? extends E> elements) {
+		// TODO: Check the return type of this method.
+		Iterator<? extends E> it = elements.iterator();
+		int curPos = pos;
+		while (it.hasNext()) {
+			E obj = it.next();
+			if (mList.size() == 0) 
+				listElementClass = obj.getClass();
+			else if (!obj.getClass().equals(listElementClass))
+				throw new IllegalArgumentException("Cannot add element of type " + obj.getClass().getCanonicalName() + " to list of type " + listElementClass.getCanonicalName());
+			
+			mList.add(curPos, obj);
+			++curPos;
+		}
+		
+		
+		return true;
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#contains(java.lang.Object)
+	 */
+	public boolean contains(Object element) {
+		return mList.contains(element);
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#containsAll(java.util.Collection)
+	 */
+	public boolean containsAll(Collection<?> elements) {
+		return mList.containsAll(elements);
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#indexOf(java.lang.Object)
+	 */
+	public int indexOf(Object element) {
+		return mList.indexOf(element);
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#isEmpty()
+	 */
+	public boolean isEmpty() {
+		return mList.isEmpty();
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#lastIndexOf(java.lang.Object)
+	 */
+	public int lastIndexOf(Object element) {
+		return mList.lastIndexOf(element);
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#listIterator()
+	 */
+	public ListIterator<E> listIterator() {
+		return mList.listIterator();
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#listIterator(int)
+	 */
+	public ListIterator<E> listIterator(int arg0) {
+		return mList.listIterator(arg0);
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#remove(java.lang.Object)
+	 */
+	public boolean remove(Object element) {
+		return mList.remove(element);
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#remove(int)
+	 */
+	public E remove(int pos) {
+		return mList.remove(pos);
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#removeAll(java.util.Collection)
+	 */
+	public boolean removeAll(Collection<?> elements) {
+		return mList.removeAll(elements);
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#retainAll(java.util.Collection)
+	 */
+	public boolean retainAll(Collection<?> elements) {
+		return mList.retainAll(elements);
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#subList(int, int)
+	 */
+	public List<E> subList(int arg0, int arg1) {
+		// TODO Consider making this return a type of ListWritable rather than of ArrayList.
+		return mList.subList(arg0, arg1);
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#toArray()
+	 */
+	public Object[] toArray() {
+		return mList.toArray();
+	}
+
+	/* (non-Javadoc)
+	 * @see java.util.List#toArray(T[])
+	 */
+	public <T> T[] toArray(T[] arg0) {
+		return mList.toArray(arg0);
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/ListWritableTest.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/ListWritableTest.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/ListWritableTest.java	(revision 20)
@@ -0,0 +1,280 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import junit.framework.JUnit4TestAdapter;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Test;
+
+public class ListWritableTest {
+
+	@Test
+	public void testBasic() throws IOException {
+		ListWritable<Text> list = new ListWritable<Text>();
+
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+
+		assertEquals(list.get(0).toString(), "hi");
+		assertEquals(list.get(1).toString(), "there");
+	}
+
+	@Test
+	public void testSerialize1() throws IOException {
+		ListWritable<Text> list = new ListWritable<Text>();
+
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		list.write(dataOut);
+
+		ListWritable<Text> newList = new ListWritable<Text>();
+		newList.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+
+		assertEquals(newList.get(0).toString(), "hi");
+		assertEquals(newList.get(1).toString(), "there");
+	}
+
+	@Test
+	public void testSerialize2() throws IOException {
+		ListWritable<FloatWritable> list = new ListWritable<FloatWritable>();
+
+		list.add(new FloatWritable(0.3f));
+		list.add(new FloatWritable(3244.2f));
+
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		list.write(dataOut);
+
+		ListWritable<FloatWritable> newList = new ListWritable<FloatWritable>();
+		newList.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+
+		assertTrue(newList.get(0).get() == 0.3f);
+		assertTrue(newList.get(1).get() == 3244.2f);
+	}
+
+	@Test
+	public void testToString() {
+		ListWritable<Text> list = new ListWritable<Text>();
+
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+
+		assertEquals(list.toString(), "[hi, there]");
+	}
+
+	@Test
+	public void testClear() {
+		ListWritable<Text> list = new ListWritable<Text>();
+
+		list.add(new Text("hi"));
+		list.add(new Text("there"));
+		list.clear();
+		
+		assertEquals(list.size(), 0);
+	}
+
+	@Test
+	public void testSorting1() {
+		ListWritable<Text> list1 = new ListWritable<Text>();
+		ListWritable<Text> list2 = new ListWritable<Text>();
+
+		list1.add(new Text("a"));
+
+		assertTrue(list1.compareTo(list2) > 0);
+	}
+	
+	@Test
+	public void testSorting2() {
+		ListWritable<Text> list1 = new ListWritable<Text>();
+		ListWritable<Text> list2 = new ListWritable<Text>();
+
+		list1.add(new Text("a"));
+		list2.add(new Text("b"));
+
+		assertTrue(list1.compareTo(list2) < 0);
+		assertTrue(list2.compareTo(list1) > 0);
+		
+		list2.clear();
+		list2.add(new Text("a"));
+		
+		assertTrue(list1.compareTo(list2) == 0);
+		
+		list1.add(new Text("a"));
+		list2.add(new Text("b"));
+		
+		// list 1 is now [a, a]
+		// list 2 is now [a, b]
+		assertTrue(list1.compareTo(list2) < 0);
+		assertTrue(list2.compareTo(list1) > 0);
+
+		// list 1 is now [a, a, a]
+		list1.add(new Text("a"));
+		
+		assertTrue(list1.compareTo(list2) < 0);
+	}
+
+	@Test
+	public void testSorting3() {
+		ListWritable<Text> list1 = new ListWritable<Text>();
+		ListWritable<Text> list2 = new ListWritable<Text>();
+		ListWritable<Text> list3 = new ListWritable<Text>();
+
+		list1.add(new Text("a"));
+		
+		list2.add(new Text("a"));
+		list2.add(new Text("a"));
+		
+		list3.add(new Text("a"));
+		list3.add(new Text("a"));
+		
+		assertTrue(list2.compareTo(list3) == 0);
+
+		list3.add(new Text("a"));
+		
+		// list 1 is [a]
+		// list 2 is [a, a]
+		// list 3 is [a, a, a]
+		
+		assertTrue(list1.compareTo(list2) < 0);
+		assertTrue(list1.compareTo(list3) < 0);
+		assertTrue(list2.compareTo(list1) > 0);
+		assertTrue(list2.compareTo(list3) < 0);
+		assertTrue(list3.compareTo(list1) > 0);
+		assertTrue(list3.compareTo(list2) > 0);
+	}
+	
+	@Test
+	public void testEmpty() throws IOException {
+		ListWritable<Text> list = new ListWritable<Text>();
+		
+		assertTrue(list.size() == 0);
+		
+		ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+		DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+		list.write(dataOut);
+
+		ListWritable<Text> newList = new ListWritable<Text>();
+		newList.readFields(new DataInputStream(new ByteArrayInputStream(
+				bytesOut.toByteArray())));
+		assertTrue(newList.size() == 0);
+		
+		newList.add(new Text("Hey"));
+		assertEquals(newList.get(0),new Text("Hey"));
+
+	}
+	
+	@Test
+	public void testTypeSafety() {
+		ListWritable<WritableComparable> list = new ListWritable<WritableComparable> ();
+		list.add(new Text("Hello"));
+		list.add(new Text("Are you there"));
+		
+		try {
+			list.add(new IntWritable(5));
+			assertTrue(false); // should throw an exception before reaching this line.
+		} catch (IllegalArgumentException e) {
+			assertTrue(true);
+		}
+		
+		ArrayList<WritableComparable> otherList = new ArrayList<WritableComparable>();
+		otherList.add(new Text("Test"));
+		otherList.add(new Text("Test 2"));
+		
+		assertTrue(list.addAll(otherList));
+		
+		otherList.add(new IntWritable(6));
+		try {
+			list.addAll(otherList);
+			assertTrue(false);
+		} catch (IllegalArgumentException e) {
+			assertTrue(true);
+		}
+	}
+	
+	@Test 
+	public void testListMethods() {
+		IntWritable a = new IntWritable(1);
+		IntWritable b = new IntWritable(2);
+		IntWritable c = new IntWritable(3);
+		IntWritable d = new IntWritable(4);
+		IntWritable e = new IntWritable(5);
+		
+		ListWritable<IntWritable> list = new ListWritable<IntWritable>();
+		assertTrue(list.isEmpty());
+		list.add(a);
+		list.add(b);
+		list.add(c);
+		list.add(d);
+		list.add(e);
+		
+		int pos = 0;
+		for (IntWritable i : list) {
+			assertEquals(i, list.get(pos));
+			++pos;
+		}
+		
+		assertTrue(list.indexOf(d) == 3);
+		list.add(2, a);
+		assertTrue(list.lastIndexOf(a) == 2);
+		assertEquals(list.get(2), list.get(0));
+		assertTrue(list.size() == 6);
+		
+		assertTrue(list.contains(c));
+		assertTrue(!list.contains(new IntWritable(123)));
+		
+		ArrayList<IntWritable> otherList = new ArrayList<IntWritable>();
+		otherList.add(a);
+		otherList.add(b);
+		otherList.add(c);
+		
+		assertTrue(list.containsAll(otherList));
+		
+		otherList.add(new IntWritable(200));
+		assertTrue(!list.containsAll(otherList));
+		
+		assertEquals(a, otherList.remove(0));
+		assertTrue(list.remove(d));
+		
+	}
+	
+	public static junit.framework.Test suite() {
+		return new JUnit4TestAdapter(ListWritableTest.class);
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/Schema.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/Schema.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/Schema.java	(revision 20)
@@ -0,0 +1,556 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <p>
+ * Description of a Tuple's structure. The Schema class keeps track of column
+ * names, data types, and default values. The following code fragment
+ * illustrates the use of this class:
+ * </p>
+ * 
+ * <pre>
+ * public static final Schema MYSCHEMA = new Schema();
+ * static {
+ * 	MYSCHEMA.addField(&quot;token&quot;, String.class, &quot;&quot;);
+ * 	MYSCHEMA.addField(&quot;int&quot;, Integer.class, new Integer(1));
+ * }
+ * </pre>
+ * 
+ * <p>
+ * The following field types are allowed:
+ * </p>
+ * 
+ * <ul>
+ * <li>Basic Java primitives: Boolean, Integer, Long, Float, Double, String</li>
+ * <li>Classes that implement Writable</li>
+ * </ul>
+ * 
+ * <p>
+ * Schema instances can be locked to prevent further changes. Any attempt to
+ * alter a locked Schema will result in a runtime exception being thrown. If a
+ * Schema is not locked, callers are free to add new fields and edit default
+ * values.
+ * </p>
+ * 
+ * <p>
+ * New Tuple instances can be created directly from Schema objects through the
+ * use of the {@link #instantiate()} method. A call to that method implicitly
+ * locks the Schema.
+ * </p>
+ * 
+ * <p>
+ * <b>Acknowledgments:</b> much of this code was adapted from the <a
+ * href="http://prefuse.org/">Prefuse Visualization Toolkit</a>.
+ * </p>
+ * 
+ */
+public class Schema implements Cloneable {
+
+	private String[] mFieldNames;
+	private Class<?>[] mFieldTypes;
+	private Object[] mDefaultValues;
+	private Map<String, Integer> mFieldLookup;
+	private int mFieldCount;
+	private boolean mLocked;
+
+	// ------------------------------------------------------------------------
+	// Constructors
+
+	/**
+	 * Creates a new empty Schema.
+	 */
+	public Schema() {
+		this(10);
+	}
+
+	/**
+	 * Creates a new empty Schema with a starting capacity for a given number of
+	 * fields.
+	 * 
+	 * @param n
+	 *            the number of columns in this schema
+	 */
+	public Schema(int n) {
+		mFieldNames = new String[n];
+		mFieldTypes = new Class<?>[n];
+		mDefaultValues = new Object[n];
+		mFieldCount = 0;
+		mLocked = false;
+	}
+
+	/**
+	 * Create a new Schema consisting of the given field names and types.
+	 * 
+	 * @param names
+	 *            the field names
+	 * @param types
+	 *            the field types (as Class instances)
+	 */
+	public Schema(String[] names, Class<?>[] types) {
+		this(names.length);
+
+		// check the schema validity
+		if (names.length != types.length) {
+			throw new IllegalArgumentException(
+					"Input arrays should be the same length");
+		}
+		for (int i = 0; i < names.length; ++i) {
+			addField(names[i], types[i], null);
+		}
+	}
+
+	/**
+	 * Create a new Schema consisting of the given field names, types, and
+	 * default field values.
+	 * 
+	 * @param names
+	 *            the field names
+	 * @param types
+	 *            the field types (as Class instances)
+	 * @param defaults
+	 *            the default values for each field
+	 */
+	public Schema(String[] names, Class<?>[] types, Object[] defaults) {
+		this(names.length);
+
+		// check the schema validity
+		if (names.length != types.length || types.length != defaults.length) {
+			throw new IllegalArgumentException(
+					"Input arrays should be the same length");
+		}
+		for (int i = 0; i < names.length; ++i) {
+			addField(names[i], types[i], defaults[i]);
+		}
+	}
+
+	/**
+	 * Creates a copy of this Schema. Cloned copies of a locked Schema will not
+	 * inherit the locked status.
+	 * 
+	 * @see java.lang.Object#clone()
+	 */
+	public Object clone() {
+		Schema s = new Schema(mFieldCount);
+		for (int i = 0; i < mFieldCount; ++i) {
+			s.addField(mFieldNames[i], mFieldTypes[i], mDefaultValues[i]);
+		}
+		return s;
+	}
+
+	/**
+	 * Lazily construct the lookup table for this schema. Used to accelerate
+	 * name-based lookups of schema information.
+	 */
+	protected void initLookup() {
+		mFieldLookup = new HashMap<String, Integer>();
+		for (int i = 0; i < mFieldNames.length; ++i) {
+			mFieldLookup.put(mFieldNames[i], new Integer(i));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Accessors / Mutators
+
+	/**
+	 * Locks the Schema, preventing any additional changes. Locked Schemas
+	 * cannot be unlocked! Cloned copies of a locked schema will not inherit
+	 * this locked status.
+	 * 
+	 * @return a reference to this schema
+	 */
+	public Schema lockSchema() {
+		mLocked = true;
+		return this;
+	}
+
+	/**
+	 * Checks if this schema is locked. Locked Schemas can not be edited.
+	 * 
+	 * @return true if this Schema is locked, false otherwise
+	 */
+	public boolean isLocked() {
+		return mLocked;
+	}
+
+	/**
+	 * Adds a field to this Schema.
+	 * 
+	 * @param name
+	 *            the field name
+	 * @param type
+	 *            the field type (as a Class instance)
+	 * @throws IllegalArgumentException
+	 *             if either name or type are null or the name already exists in
+	 *             this schema.
+	 */
+	public void addField(String name, Class<?> type) {
+		addField(name, type, null);
+	}
+
+	/**
+	 * Adds a field to this schema.
+	 * 
+	 * @param name
+	 *            the field name
+	 * @param type
+	 *            the field type (as a Class instance)
+	 * @throws IllegalArgumentException
+	 *             if either name or type are null or the name already exists in
+	 *             this schema.
+	 */
+	public void addField(String name, Class<?> type, Object defaultValue) {
+		if (!(type == Integer.class || type == Boolean.class
+				|| type == Long.class || type == Float.class
+				|| type == Double.class || type == String.class || (!type
+				.isInterface() && Writable.class.isAssignableFrom(type)))) {
+			throw new SchemaException("Illegal field type: "
+					+ type.getCanonicalName());
+		}
+
+		// check lock status
+		if (mLocked) {
+			throw new IllegalStateException(
+					"Can not add column to a locked Schema.");
+		}
+		// check for validity
+		if (name == null) {
+			throw new IllegalArgumentException(
+					"Null column names are not allowed.");
+		}
+		if (type == null) {
+			throw new IllegalArgumentException(
+					"Null column types are not allowed.");
+		}
+		for (int i = 0; i < mFieldCount; ++i) {
+			if (mFieldNames[i].equals(name)) {
+				throw new IllegalArgumentException(
+						"Duplicate column names are not allowed: "
+								+ mFieldNames[i]);
+			}
+		}
+
+		// resize if necessary
+		if (mFieldNames.length == mFieldCount) {
+			int capacity = (3 * mFieldNames.length) / 2 + 1;
+			String[] names = new String[capacity];
+			Class<?>[] types = new Class[capacity];
+			Object[] dflts = new Object[capacity];
+			System.arraycopy(mFieldNames, 0, names, 0, mFieldCount);
+			System.arraycopy(mFieldTypes, 0, types, 0, mFieldCount);
+			System.arraycopy(mDefaultValues, 0, dflts, 0, mFieldCount);
+			mFieldNames = names;
+			mFieldTypes = types;
+			mDefaultValues = dflts;
+		}
+
+		mFieldNames[mFieldCount] = name;
+		mFieldTypes[mFieldCount] = type;
+		mDefaultValues[mFieldCount] = defaultValue;
+
+		if (mFieldLookup != null)
+			mFieldLookup.put(name, new Integer(mFieldCount));
+
+		mFieldCount++;
+	}
+
+	/**
+	 * Returns the number of fields in this Schema.
+	 * 
+	 * @return the number of fields in this Schema
+	 */
+	public int getFieldCount() {
+		return mFieldCount;
+	}
+
+	/**
+	 * Returns the name of the field at the given position.
+	 * 
+	 * @param index
+	 *            the field index
+	 * @return the field name
+	 */
+	public String getFieldName(int index) {
+		return mFieldNames[index];
+	}
+
+	/**
+	 * Returns the position of a field given its name.
+	 * 
+	 * @param field
+	 *            the field name
+	 * @return the field position index
+	 */
+	public int getFieldIndex(String field) {
+		if (mFieldLookup == null)
+			initLookup();
+
+		Integer idx = (Integer) mFieldLookup.get(field);
+		return (idx == null ? -1 : idx.intValue());
+	}
+
+	/**
+	 * Returns the type of the field at the given position.
+	 * 
+	 * @param index
+	 *            the column index
+	 * @return the column type
+	 */
+	public Class<?> getFieldType(int index) {
+		return mFieldTypes[index];
+	}
+
+	/**
+	 * Returns the type of the field given its name.
+	 * 
+	 * @param field
+	 *            the field name
+	 * @return the field type
+	 */
+	public Class<?> getFieldType(String field) {
+		int idx = getFieldIndex(field);
+		return (idx < 0 ? null : mFieldTypes[idx]);
+	}
+
+	/**
+	 * Returns the default value of the field at the given position.
+	 * 
+	 * @param index
+	 *            the field index
+	 * @return the field's default value
+	 */
+	public Object getDefault(int index) {
+		return mDefaultValues[index];
+	}
+
+	/**
+	 * Returns the default value of the field with the given name.
+	 * 
+	 * @param field
+	 *            the field name
+	 * @return the field's default value
+	 */
+	public Object getDefault(String field) {
+		int idx = getFieldIndex(field);
+		return (idx < 0 ? null : mDefaultValues[idx]);
+	}
+
+	/**
+	 * Sets the default value for the given field.
+	 * 
+	 * @param index
+	 *            the index position of the field to set the default for
+	 * @param val
+	 *            the new default value
+	 */
+	public void setDefault(int index, Object val) {
+		// check lock status
+		if (mLocked) {
+			throw new IllegalStateException(
+					"Can not update default values of a locked Schema.");
+		}
+		mDefaultValues[index] = val;
+	}
+
+	/**
+	 * Sets the default value for the given field.
+	 * 
+	 * @param field
+	 *            the name of field to set the default for
+	 * @param val
+	 *            the new default value
+	 */
+	public void setDefault(String field, Object val) {
+		// check lock status
+		if (mLocked) {
+			throw new IllegalStateException(
+					"Can not update default values of a locked Schema.");
+		}
+		int idx = getFieldIndex(field);
+		mDefaultValues[idx] = val;
+	}
+
+	/**
+	 * Sets the default value for the given field as an <code>int</code>.
+	 * 
+	 * @param field
+	 *            the name of field to set the default for
+	 * @param val
+	 *            the new default value
+	 */
+	public void setDefault(String field, int val) {
+		setDefault(field, new Integer(val));
+	}
+
+	/**
+	 * Set the default value for the given field as a <code>long</code>.
+	 * 
+	 * @param field
+	 *            the name of field to set the default for
+	 * @param val
+	 *            the new default value
+	 */
+	public void setDefault(String field, long val) {
+		setDefault(field, new Long(val));
+	}
+
+	/**
+	 * Set the default value for the given field as a <code>float</code>.
+	 * 
+	 * @param field
+	 *            the name of field to set the default for
+	 * @param val
+	 *            the new default value
+	 */
+	public void setDefault(String field, float val) {
+		setDefault(field, new Float(val));
+	}
+
+	/**
+	 * Set the default value for the given field as a <code>double</code>.
+	 * 
+	 * @param field
+	 *            the name of field to set the default for
+	 * @param val
+	 *            the new default value
+	 */
+	public void setDefault(String field, double val) {
+		setDefault(field, new Double(val));
+	}
+
+	/**
+	 * Set the default value for the given field as a <code>boolean</code>.
+	 * 
+	 * @param field
+	 *            the name of field to set the default for
+	 * @param val
+	 *            the new default value
+	 */
+	public void setDefault(String field, boolean val) {
+		setDefault(field, val ? Boolean.TRUE : Boolean.FALSE);
+	}
+
+	// ------------------------------------------------------------------------
+	// Comparison and Hashing
+
+	/**
+	 * Compares this Schema with another one for equality.
+	 */
+	public boolean equals(Object o) {
+		if (!(o instanceof Schema))
+			return false;
+
+		Schema s = (Schema) o;
+		if (mFieldCount != s.getFieldCount())
+			return false;
+
+		for (int i = 0; i < mFieldCount; ++i) {
+			if (!(mFieldNames[i].equals(s.getFieldName(i))
+					&& mFieldTypes[i].equals(s.getFieldType(i)) && mDefaultValues[i]
+					.equals(s.getDefault(i)))) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	/**
+	 * Computes a hashcode for this schema.
+	 */
+	public int hashCode() {
+		int hashcode = 0;
+		for (int i = 0; i < mFieldCount; ++i) {
+			int idx = i + 1;
+			int code = idx * mFieldNames[i].hashCode();
+			code ^= idx * mFieldTypes[i].hashCode();
+			if (mDefaultValues[i] != null)
+				code ^= mDefaultValues[i].hashCode();
+			hashcode ^= code;
+		}
+		return hashcode;
+	}
+
+	/**
+	 * Returns a descriptive String for this schema.
+	 */
+	public String toString() {
+		StringBuffer sbuf = new StringBuffer();
+		sbuf.append("Schema[");
+		for (int i = 0; i < mFieldCount; ++i) {
+			if (i > 0)
+				sbuf.append(' ');
+			sbuf.append('(').append(mFieldNames[i]).append(", ");
+			sbuf.append(mFieldTypes[i].getName()).append(", ");
+			sbuf.append(mDefaultValues[i]).append(')');
+		}
+		sbuf.append(']');
+		return sbuf.toString();
+	}
+
+	// ------------------------------------------------------------------------
+	// Tuple Operations
+
+	/**
+	 * Instantiate a new Tuple instance with this Schema. Fields of the newly
+	 * instantiated Tuple are set to default value.
+	 * 
+	 * @return a new Tuple with this Schema
+	 */
+	public Tuple instantiate() {
+		lockSchema();
+
+		Object[] objects = new Object[mFieldCount];
+		System.arraycopy(mDefaultValues, 0, objects, 0, mFieldCount);
+
+		String[] symbols = new String[mFieldCount];
+
+		String[] fields = new String[mFieldCount];
+		System.arraycopy(mFieldNames, 0, fields, 0, mFieldCount);
+
+		Class<?>[] types = new Class<?>[mFieldCount];
+		System.arraycopy(mFieldTypes, 0, types, 0, mFieldCount);
+
+		return new Tuple(objects, symbols, fields, types);
+	}
+
+	/**
+	 * Instantiate a new Tuple instance with this Schema.
+	 * 
+	 * @param objects
+	 *            values of each field
+	 * @return a new Tuple with this Schema
+	 */
+	public Tuple instantiate(Object... objects) {
+		lockSchema();
+
+		String[] symbols = new String[mFieldCount];
+
+		String[] fields = new String[mFieldCount];
+		System.arraycopy(mFieldNames, 0, fields, 0, mFieldCount);
+
+		Class<?>[] types = new Class[mFieldCount];
+		System.arraycopy(mFieldTypes, 0, types, 0, mFieldCount);
+
+		return new Tuple(objects, symbols, fields, types);
+	}
+
+} // end of class Schema
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/SchemaException.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/SchemaException.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/SchemaException.java	(revision 20)
@@ -0,0 +1,25 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+public class SchemaException extends RuntimeException {
+	public static final long serialVersionUID = 2673497691l;
+	
+	public SchemaException(String message) {
+		super(message);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/SchemaTest.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/SchemaTest.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/SchemaTest.java	(revision 20)
@@ -0,0 +1,88 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashMap;
+
+import junit.framework.JUnit4TestAdapter;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+public class SchemaTest {
+
+	public static final Schema SCHEMA1 = new Schema();
+	static {
+		SCHEMA1.addField("field1", String.class, "default");
+		SCHEMA1.addField("field2", Integer.class, new Integer(1));
+	}
+
+	@Test
+	public void test1() {
+		Tuple tuple = SCHEMA1.instantiate();
+
+		assertEquals(tuple.get(0), "default");
+		assertEquals(tuple.get(1), new Integer(1));
+
+		assertEquals(tuple.get("field1"), "default");
+		assertEquals(tuple.get("field2"), new Integer(1));
+	}
+
+	@Test
+	public void test2() {
+		Tuple tuple = SCHEMA1.instantiate("Hello world!", new Integer(5));
+		assertEquals(tuple.get(0), "Hello world!");
+		assertEquals(tuple.get(1), new Integer(5));
+	}
+
+	@Test(expected = SchemaException.class)
+	public void testIllegalFieldsException1() {
+		Schema schema = new Schema();
+		schema.addField("field0", Integer.class, 0);
+		schema.addField("field1", HashMap.class, null);
+	}
+
+	@Test(expected = SchemaException.class)
+	public void testIllegalFieldsException2() {
+		Schema schema = new Schema();
+		schema.addField("field0", Integer.class, 0);
+		// throws exception because Writable isn't a concrete class
+		schema.addField("field1", Writable.class, null);
+	}
+
+	@Test
+	public void testWritableFields() {
+		Schema schema = new Schema();
+		schema.addField("field0", Integer.class, 0);
+		schema.addField("field1", IntWritable.class, new IntWritable(0));
+		schema.addField("field2", Text.class, new Text("default"));
+
+		Tuple t = schema.instantiate();
+		assertEquals(t.get(0), 0);
+		assertEquals(t.get(1), new IntWritable(0));
+		assertEquals(t.get(2), new Text("default"));
+	}
+
+	public static junit.framework.Test suite() {
+		return new JUnit4TestAdapter(SchemaTest.class);
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/Tuple.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/Tuple.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/Tuple.java	(revision 20)
@@ -0,0 +1,566 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * <p>
+ * Class that represents a tuple in Hadoop's data type system. Tuples are
+ * instantiated from a {@link Schema}. The Tuple class implements
+ * WritableComparable, so it can be directly used as MapReduce keys and values.
+ * The natural sort order of tuples is defined by an internally-generated byte
+ * representation and is not based on field values. This class, combined with
+ * {@link ListWritable}, allows the user to define arbitrarily complex data
+ * structures.
+ * </p>
+ * 
+ * <p>
+ * All fields can either be indexed via its integer position or its field name.
+ * Each field is typed, which can be determined via {@link #getFieldType(int)}.
+ * Fields can either contain an object of the specified type or a special symbol
+ * String. The method {@link #containsSymbol(int)} can be used to check if a
+ * field contains a special symbol. If the field contains a special symbol,
+ * {@link #get(int)} will return <code>null</code>. If the field does not
+ * contain a special symbol, {@link #getSymbol(int)} will return
+ * <code>null</code>.
+ * </p>
+ * 
+ * <p>
+ * Here is a typical usage scenario for special symbols: say you had tuples that
+ * represented <code>count(a, b)</code>, where <code>a</code> and
+ * <code>b</code> are tokens you observe. There is often a need to compute
+ * <code>count(a, *)</code>, for example, to derive conditional
+ * probabilities. In this case, you can use a special symbol to represent the
+ * <code>*</code>, and distinguish it from the lexical token '<code>*</code>'.
+ * </p>
+ * 
+ * <p>
+ * The natural sort order of the Tuple is defined by {@link #compareTo(Object)}.
+ * Tuples are sorted by field, with special symbols always appearing first
+ * within each field.
+ * </p>
+ * 
+ * @see ListWritable
+ * @see Schema
+ * 
+ */
+public class Tuple implements WritableComparable {
+
+	protected static final byte SYMBOL = 0;
+	protected static final byte INT = 1;
+	protected static final byte BOOLEAN = 2;
+	protected static final byte LONG = 3;
+	protected static final byte FLOAT = 4;
+	protected static final byte DOUBLE = 5;
+	protected static final byte STRING = 6;
+	protected static final byte WRITABLE = 7;
+
+	private Object[] mObjects;
+	private String[] mSymbols;
+	private String[] mFields;
+	private Class<?>[] mTypes;
+
+	private Map<String, Integer> mFieldLookup = null;
+
+	protected Tuple(Object[] objects, String[] symbols, String[] fields,
+			Class<?>[] types) {
+		mObjects = objects;
+		mSymbols = symbols;
+		mFields = fields;
+		mTypes = types;
+	}
+
+	/**
+	 * Creates an empty Tuple. This constructor is needed by Hadoop's framework
+	 * for deserializing Writable objects. The preferred way to instantiate
+	 * tuples is through {@link Schema#instantiate(Object...)}.
+	 */
+	public Tuple() {
+	}
+
+	/**
+	 * Factory method for deserializing a Tuple object.
+	 * 
+	 * @param in
+	 *            raw byte source of the Tuple
+	 * @return a new Tuple
+	 * @throws IOException
+	 */
+	public static Tuple createFrom(DataInput in) throws IOException {
+		Tuple tuple = new Tuple();
+		tuple.readFields(in);
+
+		return tuple;
+	}
+
+	/**
+	 * Sets the object at a particular field (by position) in this Tuple.
+	 * 
+	 * @param i
+	 *            field position
+	 * @param o
+	 *            object to set at the specified field
+	 */
+	public void set(int i, Object o) {
+		if (o == null) {
+			throw new TupleException(
+					"Null values are not allowed for tuple fields!");
+		}
+
+		if (!o.getClass().equals(mTypes[i])) {
+			throw new TupleException("Field value of wrong type, expected "
+					+ mTypes[i] + "!");
+		}
+
+		mObjects[i] = o;
+	}
+
+	/**
+	 * Sets the object at a particular field (by name) in this Tuple.
+	 * 
+	 * @param field
+	 *            field name
+	 * @param o
+	 *            object to set at the specified field
+	 */
+	public void set(String field, Object o) {
+		if (mFieldLookup == null)
+			initLookup();
+
+		if (!mFieldLookup.containsKey(field)) {
+			throw new TupleException("Field '" + field + "' does not exist!");
+		}
+
+		set(mFieldLookup.get(field), o);
+	}
+
+	/**
+	 * Sets a special symbol at a particular field (by position) in this Tuple.
+	 * 
+	 * @param i
+	 *            field position
+	 * @param s
+	 *            special symbol to set at specified field
+	 */
+	public void setSymbol(int i, String s) {
+		if (s == null) {
+			throw new TupleException("Null is not a valid symbol!");
+		}
+
+		mObjects[i] = null;
+		mSymbols[i] = s;
+	}
+
+	/**
+	 * Sets a special symbol at a particular field (by name) in this Tuple.
+	 * 
+	 * @param field
+	 *            field name
+	 * @param s
+	 *            special symbol to set at specified field
+	 */
+	public void setSymbol(String field, String s) {
+		if (mFieldLookup == null)
+			initLookup();
+
+		if (!mFieldLookup.containsKey(field)) {
+			throw new TupleException("Field '" + field + "' does not exist!");
+		}
+
+		setSymbol(mFieldLookup.get(field), s);
+	}
+
+	/**
+	 * Returns object at a particular field (by position) in this Tuple. Returns
+	 * <code>null</code> if the field contains a special symbol.
+	 * 
+	 * @param i
+	 *            field position
+	 * @return object at field, or <code>null</code> if the field contains a
+	 *         special symbol
+	 */
+	public Object get(int i) {
+		return mObjects[i];
+	}
+
+	/**
+	 * Returns object at a particular field (by name) in this Tuple. Returns
+	 * <code>null</code> if the field contains a special symbol.
+	 * 
+	 * @param field
+	 *            field name
+	 * @return object at field, or <code>null</code> if the field contains a
+	 *         special symbol
+	 */
+	public Object get(String field) {
+		if (mFieldLookup == null)
+			initLookup();
+
+		if (!mFieldLookup.containsKey(field)) {
+			throw new TupleException("Field '" + field + "' does not exist!");
+		}
+
+		return get(mFieldLookup.get(field));
+	}
+
+	/**
+	 * Returns special symbol at a particular field (by position). Returns
+	 * <code>null</code> if the field does not contain a special symbol.
+	 * 
+	 * @param i
+	 *            field position
+	 * @return special symbol at field, or <code>null</code> if the field does
+	 *         not contain a special symbol.
+	 */
+	public String getSymbol(int i) {
+		if (mObjects[i] != null)
+			return null;
+
+		return mSymbols[i];
+	}
+
+	/**
+	 * Returns special symbol at a particular field (by name). Returns
+	 * <code>null</code> if the field does not contain a special symbol.
+	 * 
+	 * @param field
+	 *            field name
+	 * @return special symbol at field, or <code>null</code> if the field does
+	 *         not contain a special symbol.
+	 */
+	public String getSymbol(String field) {
+		if (mFieldLookup == null)
+			initLookup();
+
+		if (!mFieldLookup.containsKey(field)) {
+			throw new TupleException("Field '" + field + "' does not exist!");
+		}
+
+		return getSymbol(mFieldLookup.get(field));
+	}
+
+	/**
+	 * Determines if a particular field (by position) contains a special symbol.
+	 * 
+	 * @param i
+	 *            field position
+	 * @return <code>true</code> if the field contains a special symbol, or
+	 *         <code>false</code> otherwise
+	 */
+	public boolean containsSymbol(int i) {
+		return mObjects[i] == null;
+	}
+
+	/**
+	 * Determines if a particular field (by name) contains a special symbol.
+	 * 
+	 * @param field
+	 *            field name
+	 * @return <code>true</code> if the field contains a special symbol, or
+	 *         <code>false</code> otherwise
+	 */
+	public boolean containsSymbol(String field) {
+		if (mFieldLookup == null)
+			initLookup();
+
+		if (!mFieldLookup.containsKey(field)) {
+			throw new TupleException("Field '" + field + "' does not exist!");
+		}
+
+		return containsSymbol(mFieldLookup.get(field));
+	}
+
+	/**
+	 * Returns the type of a particular field (by position).
+	 * 
+	 * @param i
+	 *            field position
+	 * @return type of the field
+	 */
+	public Class<?> getFieldType(int i) {
+		return mTypes[i];
+	}
+
+	/**
+	 * Returns the type of a particular field (by name).
+	 * 
+	 * @param field
+	 *            field name
+	 * @return type of the field
+	 */
+	public Class<?> getFieldType(String field) {
+		if (mFieldLookup == null)
+			initLookup();
+
+		if (!mFieldLookup.containsKey(field)) {
+			throw new TupleException("Field '" + field + "' does not exist!");
+		}
+
+		return getFieldType(mFieldLookup.get(field));
+	}
+
+	public int getFieldCount() {
+		return mFields.length;
+	}
+
+	/**
+	 * Lazily construct the lookup table for this schema. Used to accelerate
+	 * name-based lookups of schema information.
+	 */
+	private void initLookup() {
+		mFieldLookup = new HashMap<String, Integer>();
+		for (int i = 0; i < mFields.length; ++i) {
+			mFieldLookup.put(mFields[i], new Integer(i));
+		}
+	}
+
+	/**
+	 * Deserializes the Tuple.
+	 * 
+	 * @param in
+	 *            source for raw byte representation
+	 */
+	public void readFields(DataInput in) throws IOException {
+		int numFields = in.readInt();
+
+		mObjects = new Object[numFields];
+		mSymbols = new String[numFields];
+		mFields = new String[numFields];
+		mTypes = new Class[numFields];
+
+		for (int i = 0; i < numFields; i++) {
+			mFields[i] = in.readUTF();
+		}
+
+		for (int i = 0; i < numFields; i++) {
+			byte type = in.readByte();
+
+			if (type == SYMBOL) {
+				String className = in.readUTF();
+				try {
+					mTypes[i] = Class.forName(className);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				mObjects[i] = null;
+				mSymbols[i] = in.readUTF();
+			} else if (type == INT) {
+				mTypes[i] = Integer.class;
+				mObjects[i] = in.readInt();
+			} else if (type == BOOLEAN) {
+				mTypes[i] = Boolean.class;
+				mObjects[i] = in.readBoolean();
+			} else if (type == LONG) {
+				mTypes[i] = Long.class;
+				mObjects[i] = in.readLong();
+			} else if (type == FLOAT) {
+				mTypes[i] = Float.class;
+				mObjects[i] = in.readFloat();
+			} else if (type == DOUBLE) {
+				mTypes[i] = Double.class;
+				mObjects[i] = in.readDouble();
+			} else if (type == STRING) {
+				mTypes[i] = String.class;
+				mObjects[i] = in.readUTF();
+			} else {
+				try {
+					String className = in.readUTF();
+					mTypes[i] = Class.forName(className);
+
+					int sz = in.readInt();
+					byte[] bytes = new byte[sz];
+					in.readFully(bytes);
+
+					Writable obj = (Writable) mTypes[i].newInstance();
+					obj.readFields(new DataInputStream(
+							new ByteArrayInputStream(bytes)));
+					mObjects[i] = obj;
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Serializes this Tuple.
+	 * 
+	 * @param out
+	 *            where to write the raw byte representation
+	 */
+	public void write(DataOutput out) throws IOException {
+		out.writeInt(mFields.length);
+		for (int i = 0; i < mFields.length; i++) {
+			out.writeUTF(mFields[i]);
+		}
+
+		for (int i = 0; i < mFields.length; i++) {
+			if (mObjects[i] == null && mSymbols[i] == null) {
+				throw new TupleException("Cannot serialize null fields!");
+			}
+
+			if (containsSymbol(i)) {
+				out.writeByte(SYMBOL);
+				out.writeUTF(mTypes[i].getCanonicalName());
+				out.writeUTF(mSymbols[i]);
+			} else if (mTypes[i] == Integer.class) {
+				out.writeByte(INT);
+				out.writeInt((Integer) mObjects[i]);
+			} else if (mTypes[i] == Boolean.class) {
+				out.writeByte(BOOLEAN);
+				out.writeBoolean((Boolean) mObjects[i]);
+			} else if (mTypes[i] == Long.class) {
+				out.writeByte(LONG);
+				out.writeLong((Long) mObjects[i]);
+			} else if (mTypes[i] == Float.class) {
+				out.writeByte(FLOAT);
+				out.writeFloat((Float) mObjects[i]);
+			} else if (mTypes[i] == Double.class) {
+				out.writeByte(DOUBLE);
+				out.writeDouble((Double) mObjects[i]);
+			} else if (mTypes[i] == String.class) {
+				out.writeByte(STRING);
+				out.writeUTF(mObjects[i].toString());
+			} else {
+				out.writeByte(WRITABLE);
+
+				ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+				DataOutputStream dataOut = new DataOutputStream(bytesOut);
+
+				out.writeUTF(mTypes[i].getCanonicalName());
+				((Writable) mObjects[i]).write(dataOut);
+				out.writeInt(bytesOut.size());
+				out.write(bytesOut.toByteArray());
+			}
+		}
+	}
+
+	/**
+	 * Generates human-readable String representation of this Tuple.
+	 * 
+	 * @return human-readable String representation of this Tuple
+	 */
+	public String toString() {
+		StringBuffer sb = new StringBuffer();
+
+		for (int i = 0; i < mFields.length; i++) {
+			if (i != 0)
+				sb.append(", ");
+			if (mSymbols[i] != null) {
+				sb.append(mSymbols[i]);
+			} else {
+				sb.append(mObjects[i]);
+			}
+		}
+
+		return "(" + sb.toString() + ")";
+	}
+
+	/**
+	 * <p>
+	 * Defines a natural sort order for the Tuple class. Following standard
+	 * convention, this method returns a value less than zero, a value greater
+	 * than zero, or zero if this Tuple should be sorted before, sorted after,
+	 * or is equal to <code>obj</code>. The sort order is defined as follows:
+	 * </p>
+	 * 
+	 * <ul>
+	 * <li>Each field in the Tuple is compared sequentially from first to last.</li>
+	 * <li>Within each field, all special symbols are sorted before actual
+	 * field tokens (i.e., the actual String, Integer, or whatever the field may
+	 * contain).</li>
+	 * <li>The special symbols are sorted lexicographically (being Strings).</li>
+	 * <li>The field tokens are sorted by their natural order.</li>
+	 * <li>If the field contents are identical (both contain same special
+	 * symbol or field token), the next field in the tuple is considered.</li>
+	 * <li>Two tuples are considered equal if all their fields are identical.</li>
+	 * </ul>
+	 * 
+	 * @return a value less than zero, a value greater than zero, or zero if
+	 *         this Tuple should be sorted before, sorted after, or is equal to
+	 *         <code>obj</code>.
+	 */
+	public int compareTo(Object obj) {
+		Tuple that = (Tuple) obj;
+
+		// iterate through the fields
+		for (int i = 0; i < this.getFieldCount(); i++) {
+			// if both contain special symbol, then sort special symbols
+			if (this.containsSymbol(i) && that.containsSymbol(i)) {
+				String thisSymbol = this.getSymbol(i);
+				String thatSymbol = that.getSymbol(i);
+
+				// special symbols identical; move to next field
+				if (!thisSymbol.equals(thatSymbol)) {
+					return thisSymbol.compareTo(thatSymbol);
+				}
+			} else {
+				// special symbols always come first
+				if (this.containsSymbol(i))
+					return -1;
+
+				if (that.containsSymbol(i))
+					return 1;
+
+				@SuppressWarnings("unchecked")
+				Comparable<Object> thisField = (Comparable<Object>) this.get(i);
+
+				@SuppressWarnings("unchecked")
+				Comparable<Object> thatField = (Comparable<Object>) that.get(i);
+
+				// if the field tokens are identical, move to next field
+				if (!thisField.equals(thatField)) {
+					return thisField.compareTo(thatField);
+				}
+			}
+		}
+
+		return 0;
+	}
+
+	/**
+	 * Returns a hash code for this Tuple.
+	 * 
+	 * @return hash code for this Tuple
+	 */
+	public int hashCode() {
+		int hash = 0;
+
+		for (int i = 0; i < mObjects.length; i++) {
+			if (mObjects[i] != null) {
+				hash += mObjects[i].hashCode();
+			} else {
+				hash += mSymbols[i].hashCode();
+			}
+		}
+
+		return hash;
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/tuple/TupleException.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/tuple/TupleException.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/tuple/TupleException.java	(revision 20)
@@ -0,0 +1,28 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.tuple;
+
+/**
+ * Exception for all Tuple-related errors.
+ */
+public class TupleException extends RuntimeException {
+	public static final long serialVersionUID = 640927654842l;
+
+	public TupleException(String message) {
+		super(message);
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/util/InstanceCounter.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/util/InstanceCounter.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/util/InstanceCounter.java	(revision 20)
@@ -0,0 +1,169 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * A class for keeping track of the number of times an object has been
+ * encountered. This is useful for counting things in a stream, e.g., POS tags,
+ * terms, etc.
+ */
+public class InstanceCounter<T extends Comparable<T>> {
+
+	// internal representation---although the scores are doubles, counts are
+	// obviously integers
+	private ScoreSortedMap<T> mMap;
+
+	private int mTotalCount = 0;
+
+	/**
+	 * Constructs an <code>InstanceCounter</code>.
+	 */
+	public InstanceCounter() {
+		mMap = new ScoreSortedMap<T>();
+	}
+
+	/**
+	 * Adds an instance to the set of observations.
+	 * 
+	 * @param instance
+	 *            the instance observed
+	 */
+	public void count(T instance) {
+		if (mMap.containsKey(instance)) {
+			mMap.put(instance, mMap.get(instance) + 1);
+		} else {
+			mMap.put(instance, 1.0);
+		}
+		mTotalCount++;
+	}
+
+	/**
+	 * Prints each instance and how many times its been observed, sorted by the
+	 * counts.
+	 */
+	public void printCounts() {
+		for (Map.Entry<T, Double> map : mMap.getSortedEntries()) {
+			System.out.println(map.getValue().intValue() + "\t" + map.getKey());
+		}
+	}
+
+	/**
+	 * Returns a list of <code>InstanceCount</code> objects, sorted by count.
+	 */
+	public List<InstanceCount> getCounts() {
+		List<InstanceCount> l = new ArrayList<InstanceCount>();
+
+		for (Map.Entry<T, Double> map : mMap.getSortedEntries()) {
+			l.add(new InstanceCount(map.getKey(), map.getValue().intValue(),
+					map.getValue() / (double) mTotalCount));
+		}
+
+		return Collections.unmodifiableList(l);
+	}
+
+	/**
+	 * Returns the total number of observations.
+	 * 
+	 * @return the total number of observations
+	 */
+	public int getTotalCount() {
+		return mTotalCount;
+	}
+
+	/**
+	 * Returns the number of times a particular instance has been observed.
+	 * 
+	 * @param inst
+	 *            the instance
+	 * @return the count of the instance
+	 */
+	public int getCount(T inst) {
+		if (mMap.containsKey(inst)) {
+			return mMap.get(inst).intValue();
+		}
+
+		return 0;
+	}
+
+	/**
+	 * Returns a collection of all objects observed, sorted by their natural
+	 * order.
+	 * 
+	 * @return a collection of all objects observed, sorted by their natural
+	 *         order.
+	 */
+	public SortedSet<T> getObservedObjects() {
+		SortedSet<T> t = new TreeSet<T>();
+
+		for (T obj : mMap.keySet()) {
+			t.add(obj);
+		}
+
+		return t;
+	}
+
+	/**
+	 * A class that holds an instance, its count, and its frequency.
+	 */
+	public class InstanceCount {
+		private T mInstance;
+
+		private int mCount;
+
+		private double mFreq;
+
+		private InstanceCount(T instance, int cnt, double freq) {
+			mInstance = instance;
+			mCount = cnt;
+			mFreq = freq;
+		}
+
+		/**
+		 * Returns the instance.
+		 */
+		public T getInstance() {
+			return mInstance;
+		}
+
+		/**
+		 * Returns the number of times the instance has been observed.
+		 */
+		public int getCount() {
+			return mCount;
+		}
+
+		/**
+		 * Returns the frequency that this instance has been observed. Frequency
+		 * is the count divided by the total number of observed instances.
+		 */
+		public double getFrequency() {
+			return mFreq;
+		}
+	}
+
+	public void clear() {
+		mMap.clear();
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/util/KeyValueProcess.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/util/KeyValueProcess.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/util/KeyValueProcess.java	(revision 20)
@@ -0,0 +1,94 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * <p>
+ * Interface that defines the callback associated with
+ * {@link SequenceFileProcessor}. For each key-value pair, the
+ * <code>SequenceFileProcessor</code> calls {@link #process}; this needs to
+ * be instantiated by the user. After all the key-value pairs are processed,
+ * <code>SequenceFileProcessor</code> calls {@link #report}; this also needs
+ * to be instantiated by the user. Results of computations are retrieved using
+ * {@link #getProperty(String)}.
+ * </p>
+ * 
+ * @param <K>
+ *            type of key
+ * @param <V>
+ *            type of value
+ */
+public abstract class KeyValueProcess<K extends WritableComparable, V extends Writable> {
+	private Map<String, Object> mHash = new HashMap<String, Object>();
+
+	/**
+	 * Creates a new <code>KeyValueProcess</code>
+	 */
+	public KeyValueProcess() {
+	}
+
+	/**
+	 * Called by {@link SequenceFileProcessor} for every key-value pair. This
+	 * method needs to be defined by the user.
+	 * 
+	 * @param key
+	 *            the key
+	 * @param value
+	 *            the value
+	 */
+	public abstract void process(K key, V value);
+
+	/**
+	 * Called by {@link SequenceFileProcessor} after all key-value pairs have
+	 * been processed. This methods needs to be defined by the user; typical
+	 * instantiations would record results of the computation using
+	 * {@link #setProperty(String, Object)}.
+	 */
+	public abstract void report();
+
+	/**
+	 * Sets a property. Used for recording the results of computational
+	 * performed by this class.
+	 * 
+	 * @param property
+	 *            property
+	 * @param value
+	 *            value of the property
+	 */
+	public void setProperty(String property, Object value) {
+		mHash.put(property, value);
+	}
+
+	/**
+	 * Retrieves a property. Used for retrieving results of a computational
+	 * performed by this class.
+	 * 
+	 * @param property
+	 *            property
+	 * @return value of the property
+	 */
+	public Object getProperty(String property) {
+		return mHash.get(property);
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/util/LocalSequenceFile.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/util/LocalSequenceFile.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/util/LocalSequenceFile.java	(revision 20)
@@ -0,0 +1,43 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.util;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+
+public class LocalSequenceFile {
+
+	private LocalSequenceFile() {
+	}
+
+	public static SequenceFile.Reader createReader(String file) {
+		JobConf config = new JobConf();
+		SequenceFile.Reader reader = null;
+
+		try {
+			reader = new SequenceFile.Reader(FileSystem.get(config), new Path(
+					file), config);
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+
+		return reader;
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/util/LocalTupleRecordReader.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/util/LocalTupleRecordReader.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/util/LocalTupleRecordReader.java	(revision 20)
@@ -0,0 +1,44 @@
+package tw.org.nchc.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+
+import tw.org.nchc.tuple.Tuple;
+
+public class LocalTupleRecordReader {
+	private LongWritable mKey = new LongWritable();
+	private SequenceFile.Reader mReader;
+
+	private long cnt = 0;
+
+	public LocalTupleRecordReader(String file) throws IOException {
+		JobConf config = new JobConf();
+
+		mReader = new SequenceFile.Reader(FileSystem.get(config),
+				new Path(file), config);
+	}
+
+	public boolean read(Tuple tuple) throws IOException {
+		if (mReader.next(mKey, tuple) == true) {
+			cnt++;
+		} else {
+			return false;
+		}
+
+		return true;
+	}
+
+	public long getRecordCount() {
+		return cnt;
+	}
+
+	public void close() throws IOException {
+		mReader.close();
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/util/LocalTupleRecordWriter.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/util/LocalTupleRecordWriter.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/util/LocalTupleRecordWriter.java	(revision 20)
@@ -0,0 +1,41 @@
+package tw.org.nchc.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.JobConf;
+
+import tw.org.nchc.tuple.Tuple;
+
+public class LocalTupleRecordWriter {
+
+	private LongWritable mLong = new LongWritable();
+
+	private long mCnt = 0;
+
+	private SequenceFile.Writer writer;
+
+	public LocalTupleRecordWriter(String file) throws IOException {
+		JobConf config = new JobConf();
+
+		writer = SequenceFile.createWriter(FileSystem.get(config), config,
+				new Path(file), LongWritable.class, Tuple.class);
+	}
+
+	public void add(Tuple tuple) throws IOException {
+		mLong.set(mCnt);
+		writer.append(mLong, tuple);
+		mCnt++;
+	}
+
+	public long getRecordCount() {
+		return mCnt;
+	}
+
+	public void close() throws IOException {
+		writer.close();
+	}
+}
Index: /sample/hadoop-0.17/tw/org/nchc/util/MapReduceTask.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/util/MapReduceTask.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/util/MapReduceTask.java	(revision 20)
@@ -0,0 +1,12 @@
+package tw.org.nchc.util;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface MapReduceTask {
+	
+	public void initialize(Configuration config);
+	
+	public void run(Configuration config) throws Exception;
+	
+	public void run() throws Exception;
+}
Index: /sample/hadoop-0.17/tw/org/nchc/util/ScoreSortedMap.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/util/ScoreSortedMap.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/util/ScoreSortedMap.java	(revision 20)
@@ -0,0 +1,227 @@
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package tw.org.nchc.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * A Map that holds scores (doubles) associated with each object (key) and
+ * supports iteration by score. Many applications call for this type of
+ * functionality: the ability to associate scores with objects coupled with the
+ * ability to sort entries by their scores.
+ * 
+ * @param <K>
+ *            type of key
+ */
+public class ScoreSortedMap<K extends Comparable<K>> extends HashMap<K, Double> {
+
+	private static final long serialVersionUID = 2983410765L;
+
+	/**
+	 * Constructs a <code>ScoreSortedMap</code>.
+	 */
+	public ScoreSortedMap() {
+		super();
+	}
+
+	/**
+	 * Returns the all entries sorted by scores.
+	 * 
+	 * @return a sorted set view of the entries sorted by scores
+	 */
+	public SortedSet<Map.Entry<K, Double>> getSortedEntries() {
+		SortedSet<Map.Entry<K, Double>> entries = new TreeSet<Map.Entry<K, Double>>(
+				new Comparator<Map.Entry<K, Double>>() {
+					public int compare(Map.Entry<K, Double> e1,
+							Map.Entry<K, Double> e2) {
+						if (e1.getValue() > e2.getValue()) {
+							return -1;
+						} else if (e1.getValue() < e2.getValue()) {
+							return 1;
+						}
+						return e1.getKey().compareTo(e2.getKey());
+					}
+				});
+
+		for (Map.Entry<K, Double> entry : this.entrySet()) {
+			entries.add(entry);
+		}
+
+		return Collections.unmodifiableSortedSet(entries);
+	}
+
+	/**
+	 * Returns the <i>n</i> top entries sorted by scores.
+	 * 
+	 * @param n
+	 *            number of entries to retrieve
+	 * @return a Set view of the entries sorted by scores
+	 */
+	public SortedSet<Map.Entry<K, Double>> getSortedEntries(int n) {
+
+		SortedSet<Map.Entry<K, Double>> entries = new TreeSet<Map.Entry<K, Double>>(
+				new Comparator<Map.Entry<K, Double>>() {
+					public int compare(Map.Entry<K, Double> e1,
+							Map.Entry<K, Double> e2) {
+						if (e1.getValue() > e2.getValue()) {
+							return -1;
+						} else if (e1.getValue() < e2.getValue()) {
+							return 1;
+						}
+						return e1.getKey().compareTo(e2.getKey());
+					}
+				});
+
+		int cnt = 0;
+		for (Map.Entry<K, Double> entry : getSortedEntries()) {
+			entries.add(entry);
+			cnt++;
+			if (cnt >= n)
+				break;
+		}
+
+		return Collections.unmodifiableSortedSet(entries);
+	}
+
+	/**
+	 * Returns the top-scoring entry.
+	 * 
+	 * @return the top-scoring entry
+	 */
+	public Map.Entry<K, Double> getTopEntry() {
+		return getSortedEntries().first();
+	}
+
+	/**
+	 * Returns the <i>i</i>th scoring entry.
+	 * 
+	 * @param i
+	 *            the rank
+	 * @return the <i>i</i>th scoring entry
+	 */
+	public Map.Entry<K, Double> getEntryByRank(int i) {
+		if (i > this.size())
+			throw new NoSuchElementException("Error: index out of bounds");
+
+		Iterator<Map.Entry<K, Double>> iter = getSortedEntries().iterator();
+
+		int n = 0;
+		while (n++ < i - 1)
+			iter.next();
+
+		return iter.next();
+	}
+
+	/**
+	 * Returns a list of the keys, sorted by score.
+	 * 
+	 * @return a list of the keys, sorted by score
+	 */
+	public List<K> getSortedKeys() {
+		List<K> list = new ArrayList<K>();
+
+		for (Map.Entry<K, Double> entry : getSortedEntries()) {
+			list.add(entry.getKey());
+		}
+
+		return list;
+	}
+
+	/**
+	 * Normalizes all scores to a value between zero and one. Note that if all
+	 * keys have a single score, no action is performed.
+	 */
+	public void normalizeScores() {
+		double max = Double.NEGATIVE_INFINITY;
+		double min = Double.POSITIVE_INFINITY;
+
+		for (Map.Entry<K, Double> entry : this.entrySet()) {
+			double score = entry.getValue();
+
+			if (score > max)
+				max = score;
+
+			if (score < min)
+				min = score;
+
+		}
+
+		// if there's only one value, then meaningless to normalize
+		if (max == min)
+			return;
+
+		for (Map.Entry<K, Double> entry : this.entrySet()) {
+			K cur = entry.getKey();
+			double score = entry.getValue();
+
+			this.put(cur, (score - min) / (max - min));
+		}
+
+	}
+
+	/**
+	 * Returns a new <code>ScoreSortedMap</code> where the score of each key
+	 * in this object has been linearly interpolated with scores drawn from
+	 * another <code>ScoreSortedMap</code>. A weight of <code>lambda</code>
+	 * is given to the score from this object, and a weight of (1-<code>lambda</code>)
+	 * is given to the score from the other <code>ScoreSortedMap</code>. Both
+	 * <code>ScoreSortedMap</code>s are first normalized. Note that if a key
+	 * is not contained in this object, but present in the other
+	 * <code>ScoreSortedMap</code>, it will <b>not</b> be present in the new
+	 * <code>ScoreSortedMap</code>.
+	 * 
+	 * @param s
+	 *            the other <code>ScoreSortedMap</code>
+	 * @param lambda
+	 *            weight assigned to scores from this object
+	 * @return a new <code>ScoreSortedMap</code> with linearly-interpolated
+	 *         scores
+	 */
+	public ScoreSortedMap<K> linearInterpolationWith(ScoreSortedMap<K> s,
+			double lambda) {
+		this.normalizeScores();
+		s.normalizeScores();
+
+		ScoreSortedMap<K> entries = new ScoreSortedMap<K>();
+
+		for (Map.Entry<K, Double> entry : getSortedEntries()) {
+			double score1 = entry.getValue();
+			double score2 = 0.0d;
+
+			if (s.containsKey(entry.getKey())) {
+				score2 = s.get(entry.getKey());
+			}
+
+			double newscore = lambda * score1 + (1 - lambda) * score2;
+			// System.out.println(lambda + " * " + score1 + " + (1-" + lambda +
+			// ") * " + score2 + " = " + newscore);
+			entries.put(entry.getKey(), newscore);
+		}
+
+		return entries;
+	}
+
+}
Index: /sample/hadoop-0.17/tw/org/nchc/util/SequenceFileProcessor.java
===================================================================
--- /sample/hadoop-0.17/tw/org/nchc/util/SequenceFileProcessor.java	(revision 20)
+++ /sample/hadoop-0.17/tw/org/nchc/util/SequenceFileProcessor.java	(revision 20)
@@ -0,0 +1,150 @@
+/**
+ * Program: BuildHTable.java
+ * Editor: Waue Chen 
+ * From :  NCHC. Taiwn
+ * Last Update Date: 07/02/2008
+ * Upgrade to 0.17
+ * Re-code from : Cloud9: A MapReduce Library for Hadoop
+ */
+
+
+package tw.org.nchc.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Upgrade from hadoop 0.16 to 0.17 
+ * <p>
+ * Harness for processing one or more {@link SequenceFile}s within a single
+ * process. This class is useful when you want to iterate through all key-value
+ * pairs in a SequenceFile outside the context of a MapReduce task (or where
+ * writing the computation as a MapReduce would be overkill). One example usage
+ * case is to sum up all the values in a SequenceFile &mdash; this may be useful
+ * if you want to make sure probabilities sum to one. Here's the code fragment
+ * that would accomplish this:
+ * </p>
+ * 
+ * <pre>
+ * KeyValueProcess&lt;Tuple, FloatWritable&gt; process = SequenceFileProcessor
+ * 		.&lt;Tuple, FloatWritable&gt; process(&quot;foo&quot;,
+ * 				new KeyValueProcess&lt;Tuple, FloatWritable&gt;() {
+ * 					public float sum = 0.0f;
+ * 
+ * 					public void process(Tuple tuple, FloatWritable f) {
+ * 						sum += f.get();
+ * 					}
+ * 
+ * 					public void report() {
+ * 						setProperty(&quot;sum&quot;, sum);
+ * 					}
+ * 				});
+ * 
+ * float sum = (Float) process.getProperty(&quot;sum&quot;);
+ * </pre>
+ * 
+ * <p>
+ * The static method takes a path and and a {@link KeyValueProcess}. This
+ * example uses an anonymous inner class to make the code more concise; the
+ * static method returns the <code>KeyValueProcess</code> so that you can
+ * retrieve results from it. The path can either be a file or a directory; if it
+ * is a directory, all files in that directory are processed.
+ * </p>
+ * 
+ * @param <K>
+ *            type of key
+ * @param <V>
+ *            type of value
+ */
+public class SequenceFileProcessor<K extends WritableComparable, V extends Writable> {
+
+	private Path mPath;
+	private JobConf conf;
+	private KeyValueProcess<K, V> mProcessor;
+	private SequenceFile.Reader mReader;
+	private K mKey;
+	private V mValue;
+
+	/**
+	 * Processes one or more <code>SequenceFile</code>s. The
+	 * {@link KeyValueProcess} is applied to every key-value pair in the file if
+	 * <code>path</code> denotes a file, or all files in the directory if
+	 * <code>path</code> denotes a directory.
+	 * 
+	 * @param <K1>
+	 *            type of key
+	 * @param <V1>
+	 *            type of value
+	 * @param path
+	 *            either a file or a directory
+	 * @param p
+	 *            the KeyValueProcess to apply
+	 * @return the KeyValueProcess applied
+	 */
+	public static <K1 extends WritableComparable, V1 extends Writable> KeyValueProcess<K1, V1> process(
+			String path, KeyValueProcess<K1, V1> p) {
+
+		try {
+			SequenceFileProcessor<K1, V1> processor = new SequenceFileProcessor<K1, V1>(
+					path, p);
+			processor.run();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+
+		return p;
+	}
+
+	private SequenceFileProcessor(String location, KeyValueProcess<K, V> p)
+			throws IOException {
+
+		mPath = new Path(location);
+		conf = new JobConf();
+
+		mProcessor = p;
+
+	}
+
+	private void run() throws IOException {
+		if (!FileSystem.get(conf).isFile(mPath)) {
+			Path[] pa = new Path[] { mPath };
+			Path p;
+			// hadoop 0.17 -> listStatus();
+			FileStatus[] fi = FileSystem.get(conf).listStatus(pa);
+			for (int i =0 ; i<fi.length ; i++) {
+				p = fi[i].getPath();
+				// System.out.println("Applying to " + p);
+				applyToFile(p);
+			}
+		} else {
+			applyToFile(mPath);
+		}
+
+	}
+
+	@SuppressWarnings("unchecked")
+	private void applyToFile(Path path) throws IOException {
+		mReader = new SequenceFile.Reader(FileSystem.get(conf), path, conf);
+
+		try {
+			mKey = (K) mReader.getKeyClass().newInstance();
+			mValue = (V) mReader.getValueClass().newInstance();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+
+		while (mReader.next(mKey, mValue) == true) {
+			mProcessor.process(mKey, mValue);
+		}
+
+		mReader.close();
+		mProcessor.report();
+	}
+}
