/* * Cloud9: A MapReduce Library for Hadoop * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You may * obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. */ package tw.org.nchc.util; import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; /** *

* Harness for processing one or more {@link SequenceFile}s within a single * process. This class is useful when you want to iterate through all key-value * pairs in a SequenceFile outside the context of a MapReduce task (or where * writing the computation as a MapReduce would be overkill). One example usage * case is to sum up all the values in a SequenceFile — this may be useful * if you want to make sure probabilities sum to one. Here's the code fragment * that would accomplish this: *

* *
 * KeyValueProcess<Tuple, FloatWritable> process = SequenceFileProcessor
 * 		.<Tuple, FloatWritable> process("foo",
 * 				new KeyValueProcess<Tuple, FloatWritable>() {
 * 					public float sum = 0.0f;
 * 
 * 					public void process(Tuple tuple, FloatWritable f) {
 * 						sum += f.get();
 * 					}
 * 
 * 					public void report() {
 * 						setProperty("sum", sum);
 * 					}
 * 				});
 * 
 * float sum = (Float) process.getProperty("sum");
 * 
* *

* The static method takes a path and and a {@link KeyValueProcess}. This * example uses an anonymous inner class to make the code more concise; the * static method returns the KeyValueProcess so that you can * retrieve results from it. The path can either be a file or a directory; if it * is a directory, all files in that directory are processed. *

* * @param * type of key * @param * type of value */ public class SequenceFileProcessor { private Path mPath; private JobConf conf; private KeyValueProcess mProcessor; private SequenceFile.Reader mReader; private K mKey; private V mValue; /** * Processes one or more SequenceFiles. The * {@link KeyValueProcess} is applied to every key-value pair in the file if * path denotes a file, or all files in the directory if * path denotes a directory. * * @param * type of key * @param * type of value * @param path * either a file or a directory * @param p * the KeyValueProcess to apply * @return the KeyValueProcess applied */ public static KeyValueProcess process( String path, KeyValueProcess p) { try { SequenceFileProcessor processor = new SequenceFileProcessor( path, p); processor.run(); } catch (Exception e) { e.printStackTrace(); } return p; } private SequenceFileProcessor(String location, KeyValueProcess p) throws IOException { mPath = new Path(location); conf = new JobConf(); mProcessor = p; } private void run() throws IOException { FileSystem fs = FileSystem.get(conf); if (!fs.isFile(mPath)) { for (Path p : listPaths(fs,mPath)) { // System.out.println("Applying to " + p); applyToFile(p); } } else { applyToFile(mPath); } } static public Path[] listPaths(FileSystem fsm,Path path) throws IOException { FileStatus[] fss = fsm.listStatus(path); int length = fss.length; Path[] pi = new Path[length]; for (int i=0 ; i< length; i++) { pi[i] = fss[i].getPath(); } return pi; } @SuppressWarnings("unchecked") private void applyToFile(Path path) throws IOException { mReader = new SequenceFile.Reader(FileSystem.get(conf), path, conf); try { mKey = (K) mReader.getKeyClass().newInstance(); mValue = (V) mReader.getValueClass().newInstance(); } catch (Exception e) { e.printStackTrace(); } while (mReader.next(mKey, mValue) == true) { mProcessor.process(mKey, mValue); } mReader.close(); mProcessor.report(); } }