source: sample/hadoop-0.17/tw/org/nchc/util/SequenceFileProcessor.java @ 20

Last change on this file since 20 was 20, checked in by waue, 16 years ago

將改完的 hadoop 0.17版package 放來備份
目前繼續開發 hadoop 0.16 + hbase 1.3

File size: 4.3 KB
Line 
1/**
2 * Program: BuildHTable.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 * Upgrade to 0.17
7 * Re-code from : Cloud9: A MapReduce Library for Hadoop
8 */
9
10
11package tw.org.nchc.util;
12
13import java.io.IOException;
14
15import org.apache.hadoop.fs.FileStatus;
16import org.apache.hadoop.fs.FileSystem;
17import org.apache.hadoop.fs.Path;
18import org.apache.hadoop.io.SequenceFile;
19import org.apache.hadoop.io.Writable;
20import org.apache.hadoop.io.WritableComparable;
21import org.apache.hadoop.mapred.JobConf;
22
23/**
24 * Upgrade from hadoop 0.16 to 0.17
25 * <p>
26 * Harness for processing one or more {@link SequenceFile}s within a single
27 * process. This class is useful when you want to iterate through all key-value
28 * pairs in a SequenceFile outside the context of a MapReduce task (or where
29 * writing the computation as a MapReduce would be overkill). One example usage
30 * case is to sum up all the values in a SequenceFile &mdash; this may be useful
31 * if you want to make sure probabilities sum to one. Here's the code fragment
32 * that would accomplish this:
33 * </p>
34 *
35 * <pre>
36 * KeyValueProcess&lt;Tuple, FloatWritable&gt; process = SequenceFileProcessor
37 *    .&lt;Tuple, FloatWritable&gt; process(&quot;foo&quot;,
38 *        new KeyValueProcess&lt;Tuple, FloatWritable&gt;() {
39 *          public float sum = 0.0f;
40 *
41 *          public void process(Tuple tuple, FloatWritable f) {
42 *            sum += f.get();
43 *          }
44 *
45 *          public void report() {
46 *            setProperty(&quot;sum&quot;, sum);
47 *          }
48 *        });
49 *
50 * float sum = (Float) process.getProperty(&quot;sum&quot;);
51 * </pre>
52 *
53 * <p>
54 * The static method takes a path and and a {@link KeyValueProcess}. This
55 * example uses an anonymous inner class to make the code more concise; the
56 * static method returns the <code>KeyValueProcess</code> so that you can
57 * retrieve results from it. The path can either be a file or a directory; if it
58 * is a directory, all files in that directory are processed.
59 * </p>
60 *
61 * @param <K>
62 *            type of key
63 * @param <V>
64 *            type of value
65 */
66public class SequenceFileProcessor<K extends WritableComparable, V extends Writable> {
67
68  private Path mPath;
69  private JobConf conf;
70  private KeyValueProcess<K, V> mProcessor;
71  private SequenceFile.Reader mReader;
72  private K mKey;
73  private V mValue;
74
75  /**
76   * Processes one or more <code>SequenceFile</code>s. The
77   * {@link KeyValueProcess} is applied to every key-value pair in the file if
78   * <code>path</code> denotes a file, or all files in the directory if
79   * <code>path</code> denotes a directory.
80   *
81   * @param <K1>
82   *            type of key
83   * @param <V1>
84   *            type of value
85   * @param path
86   *            either a file or a directory
87   * @param p
88   *            the KeyValueProcess to apply
89   * @return the KeyValueProcess applied
90   */
91  public static <K1 extends WritableComparable, V1 extends Writable> KeyValueProcess<K1, V1> process(
92      String path, KeyValueProcess<K1, V1> p) {
93
94    try {
95      SequenceFileProcessor<K1, V1> processor = new SequenceFileProcessor<K1, V1>(
96          path, p);
97      processor.run();
98    } catch (Exception e) {
99      e.printStackTrace();
100    }
101
102    return p;
103  }
104
105  private SequenceFileProcessor(String location, KeyValueProcess<K, V> p)
106      throws IOException {
107
108    mPath = new Path(location);
109    conf = new JobConf();
110
111    mProcessor = p;
112
113  }
114
115  private void run() throws IOException {
116    if (!FileSystem.get(conf).isFile(mPath)) {
117      Path[] pa = new Path[] { mPath };
118      Path p;
119      // hadoop 0.17 -> listStatus();
120      FileStatus[] fi = FileSystem.get(conf).listStatus(pa);
121      for (int i =0 ; i<fi.length ; i++) {
122        p = fi[i].getPath();
123        // System.out.println("Applying to " + p);
124        applyToFile(p);
125      }
126    } else {
127      applyToFile(mPath);
128    }
129
130  }
131
132  @SuppressWarnings("unchecked")
133  private void applyToFile(Path path) throws IOException {
134    mReader = new SequenceFile.Reader(FileSystem.get(conf), path, conf);
135
136    try {
137      mKey = (K) mReader.getKeyClass().newInstance();
138      mValue = (V) mReader.getValueClass().newInstance();
139    } catch (Exception e) {
140      e.printStackTrace();
141    }
142
143    while (mReader.next(mKey, mValue) == true) {
144      mProcessor.process(mKey, mValue);
145    }
146
147    mReader.close();
148    mProcessor.report();
149  }
150}
Note: See TracBrowser for help on using the repository browser.