source: sample/hadoop-0.16/tw/org/nchc/util/SequenceFileProcessor.java @ 28

Last change on this file since 28 was 28, checked in by waue, 16 years ago

resolve 0.17 ->0.16 problem . all? not sure ! XD

File size: 4.6 KB
Line 
1/*
2 * Cloud9: A MapReduce Library for Hadoop
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you
5 * may not use this file except in compliance with the License. You may
6 * obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 * implied. See the License for the specific language governing
14 * permissions and limitations under the License.
15 */
16
17package tw.org.nchc.util;
18
19import java.io.IOException;
20
21import org.apache.hadoop.fs.FileStatus;
22import org.apache.hadoop.fs.FileSystem;
23import org.apache.hadoop.fs.Path;
24import org.apache.hadoop.io.SequenceFile;
25import org.apache.hadoop.io.Writable;
26import org.apache.hadoop.io.WritableComparable;
27import org.apache.hadoop.mapred.JobConf;
28
29/**
30 * <p>
31 * Harness for processing one or more {@link SequenceFile}s within a single
32 * process. This class is useful when you want to iterate through all key-value
33 * pairs in a SequenceFile outside the context of a MapReduce task (or where
34 * writing the computation as a MapReduce would be overkill). One example usage
35 * case is to sum up all the values in a SequenceFile &mdash; this may be useful
36 * if you want to make sure probabilities sum to one. Here's the code fragment
37 * that would accomplish this:
38 * </p>
39 *
40 * <pre>
41 * KeyValueProcess&lt;Tuple, FloatWritable&gt; process = SequenceFileProcessor
42 *    .&lt;Tuple, FloatWritable&gt; process(&quot;foo&quot;,
43 *        new KeyValueProcess&lt;Tuple, FloatWritable&gt;() {
44 *          public float sum = 0.0f;
45 *
46 *          public void process(Tuple tuple, FloatWritable f) {
47 *            sum += f.get();
48 *          }
49 *
50 *          public void report() {
51 *            setProperty(&quot;sum&quot;, sum);
52 *          }
53 *        });
54 *
55 * float sum = (Float) process.getProperty(&quot;sum&quot;);
56 * </pre>
57 *
58 * <p>
59 * The static method takes a path and and a {@link KeyValueProcess}. This
60 * example uses an anonymous inner class to make the code more concise; the
61 * static method returns the <code>KeyValueProcess</code> so that you can
62 * retrieve results from it. The path can either be a file or a directory; if it
63 * is a directory, all files in that directory are processed.
64 * </p>
65 *
66 * @param <K>
67 *            type of key
68 * @param <V>
69 *            type of value
70 */
71public class SequenceFileProcessor<K extends WritableComparable, V extends Writable> {
72
73  private Path mPath;
74  private JobConf conf;
75  private KeyValueProcess<K, V> mProcessor;
76  private SequenceFile.Reader mReader;
77  private K mKey;
78  private V mValue;
79
80  /**
81   * Processes one or more <code>SequenceFile</code>s. The
82   * {@link KeyValueProcess} is applied to every key-value pair in the file if
83   * <code>path</code> denotes a file, or all files in the directory if
84   * <code>path</code> denotes a directory.
85   *
86   * @param <K1>
87   *            type of key
88   * @param <V1>
89   *            type of value
90   * @param path
91   *            either a file or a directory
92   * @param p
93   *            the KeyValueProcess to apply
94   * @return the KeyValueProcess applied
95   */
96  public static <K1 extends WritableComparable, V1 extends Writable> KeyValueProcess<K1, V1> process(
97      String path, KeyValueProcess<K1, V1> p) {
98
99    try {
100      SequenceFileProcessor<K1, V1> processor = new SequenceFileProcessor<K1, V1>(
101          path, p);
102      processor.run();
103    } catch (Exception e) {
104      e.printStackTrace();
105    }
106
107    return p;
108  }
109
110  private SequenceFileProcessor(String location, KeyValueProcess<K, V> p)
111      throws IOException {
112
113    mPath = new Path(location);
114    conf = new JobConf();
115
116    mProcessor = p;
117
118  }
119
120  private void run() throws IOException {
121    FileSystem fs = FileSystem.get(conf);
122    if (!fs.isFile(mPath)) {
123            for (Path p : listPaths(fs,mPath)) {
124        // System.out.println("Applying to " + p);
125        applyToFile(p);
126      }
127    } else {
128      applyToFile(mPath);
129    }
130
131  }
132  static public Path[] listPaths(FileSystem fsm,Path path) throws IOException
133  {
134    FileStatus[] fss = fsm.listStatus(path);
135    int length = fss.length;
136    Path[] pi = new Path[length];
137    for (int i=0 ; i< length; i++)
138    {
139      pi[i] = fss[i].getPath();
140    }
141    return pi;
142  }
143  @SuppressWarnings("unchecked")
144  private void applyToFile(Path path) throws IOException {
145    mReader = new SequenceFile.Reader(FileSystem.get(conf), path, conf);
146
147    try {
148      mKey = (K) mReader.getKeyClass().newInstance();
149      mValue = (V) mReader.getValueClass().newInstance();
150    } catch (Exception e) {
151      e.printStackTrace();
152    }
153
154    while (mReader.next(mKey, mValue) == true) {
155      mProcessor.process(mKey, mValue);
156    }
157
158    mReader.close();
159    mProcessor.report();
160  }
161}
Note: See TracBrowser for help on using the repository browser.