source: sample/hadoop-0.16/tw/org/nchc/util/SequenceFileProcessor.java @ 26

Last change on this file since 26 was 25, checked in by waue, 16 years ago

downgrade from 0.17 to 0.16
test for work -> not yet

File size: 4.3 KB
Line 
1/*
2 * Cloud9: A MapReduce Library for Hadoop
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you
5 * may not use this file except in compliance with the License. You may
6 * obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 * implied. See the License for the specific language governing
14 * permissions and limitations under the License.
15 */
16
17package tw.org.nchc.util;
18
19import java.io.IOException;
20
21import org.apache.hadoop.fs.FileSystem;
22import org.apache.hadoop.fs.Path;
23import org.apache.hadoop.io.SequenceFile;
24import org.apache.hadoop.io.Writable;
25import org.apache.hadoop.io.WritableComparable;
26import org.apache.hadoop.mapred.JobConf;
27
28/**
29 * <p>
30 * Harness for processing one or more {@link SequenceFile}s within a single
31 * process. This class is useful when you want to iterate through all key-value
32 * pairs in a SequenceFile outside the context of a MapReduce task (or where
33 * writing the computation as a MapReduce would be overkill). One example usage
34 * case is to sum up all the values in a SequenceFile &mdash; this may be useful
35 * if you want to make sure probabilities sum to one. Here's the code fragment
36 * that would accomplish this:
37 * </p>
38 *
39 * <pre>
40 * KeyValueProcess&lt;Tuple, FloatWritable&gt; process = SequenceFileProcessor
41 *    .&lt;Tuple, FloatWritable&gt; process(&quot;foo&quot;,
42 *        new KeyValueProcess&lt;Tuple, FloatWritable&gt;() {
43 *          public float sum = 0.0f;
44 *
45 *          public void process(Tuple tuple, FloatWritable f) {
46 *            sum += f.get();
47 *          }
48 *
49 *          public void report() {
50 *            setProperty(&quot;sum&quot;, sum);
51 *          }
52 *        });
53 *
54 * float sum = (Float) process.getProperty(&quot;sum&quot;);
55 * </pre>
56 *
57 * <p>
58 * The static method takes a path and and a {@link KeyValueProcess}. This
59 * example uses an anonymous inner class to make the code more concise; the
60 * static method returns the <code>KeyValueProcess</code> so that you can
61 * retrieve results from it. The path can either be a file or a directory; if it
62 * is a directory, all files in that directory are processed.
63 * </p>
64 *
65 * @param <K>
66 *            type of key
67 * @param <V>
68 *            type of value
69 */
70public class SequenceFileProcessor<K extends WritableComparable, V extends Writable> {
71
72  private Path mPath;
73  private JobConf conf;
74  private KeyValueProcess<K, V> mProcessor;
75  private SequenceFile.Reader mReader;
76  private K mKey;
77  private V mValue;
78
79  /**
80   * Processes one or more <code>SequenceFile</code>s. The
81   * {@link KeyValueProcess} is applied to every key-value pair in the file if
82   * <code>path</code> denotes a file, or all files in the directory if
83   * <code>path</code> denotes a directory.
84   *
85   * @param <K1>
86   *            type of key
87   * @param <V1>
88   *            type of value
89   * @param path
90   *            either a file or a directory
91   * @param p
92   *            the KeyValueProcess to apply
93   * @return the KeyValueProcess applied
94   */
95  public static <K1 extends WritableComparable, V1 extends Writable> KeyValueProcess<K1, V1> process(
96      String path, KeyValueProcess<K1, V1> p) {
97
98    try {
99      SequenceFileProcessor<K1, V1> processor = new SequenceFileProcessor<K1, V1>(
100          path, p);
101      processor.run();
102    } catch (Exception e) {
103      e.printStackTrace();
104    }
105
106    return p;
107  }
108
109  private SequenceFileProcessor(String location, KeyValueProcess<K, V> p)
110      throws IOException {
111
112    mPath = new Path(location);
113    conf = new JobConf();
114
115    mProcessor = p;
116
117  }
118
119  private void run() throws IOException {
120    if (!FileSystem.get(conf).isFile(mPath)) {
121      for (Path p : FileSystem.get(conf).listPaths(new Path[] { mPath })) {
122        // System.out.println("Applying to " + p);
123        applyToFile(p);
124      }
125    } else {
126      applyToFile(mPath);
127    }
128
129  }
130
131  @SuppressWarnings("unchecked")
132  private void applyToFile(Path path) throws IOException {
133    mReader = new SequenceFile.Reader(FileSystem.get(conf), path, conf);
134
135    try {
136      mKey = (K) mReader.getKeyClass().newInstance();
137      mValue = (V) mReader.getValueClass().newInstance();
138    } catch (Exception e) {
139      e.printStackTrace();
140    }
141
142    while (mReader.next(mKey, mValue) == true) {
143      mProcessor.process(mKey, mValue);
144    }
145
146    mReader.close();
147    mProcessor.report();
148  }
149}
Note: See TracBrowser for help on using the repository browser.