/** * Program: BuildHTable.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 07/02/2008 * Upgrade to 0.17 * Re-code from : Cloud9: A MapReduce Library for Hadoop */ package tw.org.nchc.util; import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; /** * Upgrade from hadoop 0.16 to 0.17 *

* Harness for processing one or more {@link SequenceFile}s within a single * process. This class is useful when you want to iterate through all key-value * pairs in a SequenceFile outside the context of a MapReduce task (or where * writing the computation as a MapReduce would be overkill). One example usage * case is to sum up all the values in a SequenceFile — this may be useful * if you want to make sure probabilities sum to one. Here's the code fragment * that would accomplish this: *

* *
 * KeyValueProcess<Tuple, FloatWritable> process = SequenceFileProcessor
 * 		.<Tuple, FloatWritable> process("foo",
 * 				new KeyValueProcess<Tuple, FloatWritable>() {
 * 					public float sum = 0.0f;
 * 
 * 					public void process(Tuple tuple, FloatWritable f) {
 * 						sum += f.get();
 * 					}
 * 
 * 					public void report() {
 * 						setProperty("sum", sum);
 * 					}
 * 				});
 * 
 * float sum = (Float) process.getProperty("sum");
 * 
* *

* The static method takes a path and and a {@link KeyValueProcess}. This * example uses an anonymous inner class to make the code more concise; the * static method returns the KeyValueProcess so that you can * retrieve results from it. The path can either be a file or a directory; if it * is a directory, all files in that directory are processed. *

* * @param * type of key * @param * type of value */ public class SequenceFileProcessor { private Path mPath; private JobConf conf; private KeyValueProcess mProcessor; private SequenceFile.Reader mReader; private K mKey; private V mValue; /** * Processes one or more SequenceFiles. The * {@link KeyValueProcess} is applied to every key-value pair in the file if * path denotes a file, or all files in the directory if * path denotes a directory. * * @param * type of key * @param * type of value * @param path * either a file or a directory * @param p * the KeyValueProcess to apply * @return the KeyValueProcess applied */ public static KeyValueProcess process( String path, KeyValueProcess p) { try { SequenceFileProcessor processor = new SequenceFileProcessor( path, p); processor.run(); } catch (Exception e) { e.printStackTrace(); } return p; } private SequenceFileProcessor(String location, KeyValueProcess p) throws IOException { mPath = new Path(location); conf = new JobConf(); mProcessor = p; } private void run() throws IOException { if (!FileSystem.get(conf).isFile(mPath)) { Path[] pa = new Path[] { mPath }; Path p; // hadoop 0.17 -> listStatus(); FileStatus[] fi = FileSystem.get(conf).listStatus(pa); for (int i =0 ; i