source: sample/hadoop-0.16/tw/org/nchc/demo/DemoWordCondProb.java @ 229

Last change on this file since 229 was 27, checked in by waue, 16 years ago

test!

File size: 6.6 KB
Line 
1/**
2 * Program: DemoWordCondProb.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 * Re-code from : Cloud9: A MapReduce Library for Hadoop
7 */
8/*
9 * Cloud9: A MapReduce Library for Hadoop
10 */
11
12package tw.org.nchc.demo;
13
14import java.io.IOException;
15import java.rmi.UnexpectedException;
16import java.util.HashMap;
17import java.util.Iterator;
18import java.util.StringTokenizer;
19
20import org.apache.hadoop.fs.Path;
21import org.apache.hadoop.io.FloatWritable;
22import org.apache.hadoop.io.LongWritable;
23import org.apache.hadoop.mapred.JobClient;
24import org.apache.hadoop.mapred.JobConf;
25import org.apache.hadoop.mapred.MapReduceBase;
26import org.apache.hadoop.mapred.Mapper;
27import org.apache.hadoop.mapred.OutputCollector;
28import org.apache.hadoop.mapred.Partitioner;
29import org.apache.hadoop.mapred.Reducer;
30import org.apache.hadoop.mapred.Reporter;
31import org.apache.hadoop.mapred.SequenceFileInputFormat;
32import org.apache.hadoop.mapred.TextOutputFormat;
33import org.apache.hadoop.mapred.lib.IdentityReducer;
34
35import tw.org.nchc.tuple.Schema;
36import tw.org.nchc.tuple.Tuple;
37
38/**
39 * <p>
40 * Demo that illustrates the use of a Partitioner and special symbols in Tuple
41 * to compute conditional probabilities. Demo builds on
42 * {@link DemoWordCountTuple}, and has similar structure. Input comes from
43 * Bible+Shakespeare sample collection, encoded as single-field tuples; see
44 * {@link DemoPackRecords}. Sample of final output:
45 *
46 * <pre>
47 * ...
48 * (admirable, *)   15.0
49 * (admirable, 0)   0.6
50 * (admirable, 1)   0.4
51 * (admiral, *)     6.0
52 * (admiral, 0)     0.33333334
53 * (admiral, 1)     0.6666667
54 * (admiration, *)  16.0
55 * (admiration, 0)  0.625
56 * (admiration, 1)  0.375
57 * (admire, *)      8.0
58 * (admire, 0)      0.625
59 * (admire, 1)      0.375
60 * (admired, *)     19.0
61 * (admired, 0)     0.6315789
62 * (admired, 1)     0.36842105
63 * ...
64 * </pre>
65 *
66 * <p>
67 * The first field of the key tuple contains a token. If the second field
68 * contains the special symbol '*', then the value indicates the count of the
69 * token in the collection. Otherwise, the value indicates p(EvenOrOdd|Token),
70 * the probability that a line is odd-length or even-length, given the
71 * occurrence of a token.
72 * </p>
73 */
74public class DemoWordCondProb {
75
76  // create the schema for the tuple that will serve as the key
77  private static final Schema KEY_SCHEMA = new Schema();
78
79  // define the schema statically
80  static {
81    KEY_SCHEMA.addField("Token", String.class, "");
82    KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
83  }
84
85  // mapper that emits tuple as the key, and value '1' for each occurrence
86  private static class MapClass extends MapReduceBase implements
87      Mapper<LongWritable, Tuple, Tuple, FloatWritable> {
88    private final static FloatWritable one = new FloatWritable(1);
89    private Tuple tupleOut = KEY_SCHEMA.instantiate();
90
91    public void map(LongWritable key, Tuple tupleIn,
92        OutputCollector<Tuple, FloatWritable> output, Reporter reporter)
93        throws IOException {
94
95      // the input value is a tuple; get field 0
96      // see DemoPackRecords of how input SequenceFile is generated
97      String line = (String) ((Tuple) tupleIn).get(0);
98      StringTokenizer itr = new StringTokenizer(line);
99      while (itr.hasMoreTokens()) {
100        String token = itr.nextToken();
101
102        // emit key-value pair for either even-length or odd-length line
103        tupleOut.set("Token", token);
104        tupleOut.set("EvenOrOdd", line.length() % 2);
105        output.collect(tupleOut, one);
106
107        // emit key-value pair for the total count
108        tupleOut.set("Token", token);
109        // use special symbol in field 2
110        tupleOut.setSymbol("EvenOrOdd", "*");
111        output.collect(tupleOut, one);
112      }
113    }
114  }
115
116  // reducer computes conditional probabilities
117  private static class ReduceClass extends MapReduceBase implements
118      Reducer<Tuple, FloatWritable, Tuple, FloatWritable> {
119    // HashMap keeps track of total counts
120    private final static HashMap<String, Integer> TotalCounts = new HashMap<String, Integer>();
121
122    public synchronized void reduce(Tuple tupleKey,
123        Iterator<FloatWritable> values,
124        OutputCollector<Tuple, FloatWritable> output, Reporter reporter)
125        throws IOException {
126      // sum values
127      int sum = 0;
128      while (values.hasNext()) {
129        sum += values.next().get();
130      }
131
132      String tok = (String) tupleKey.get("Token");
133
134      // check if the second field is a special symbol
135      if (tupleKey.containsSymbol("EvenOrOdd")) {
136        // emit total count
137        output.collect(tupleKey, new FloatWritable(sum));
138        // record total count
139        TotalCounts.put(tok, sum);
140      } else {
141        if (!TotalCounts.containsKey(tok))
142          throw new UnexpectedException("Don't have total counts!");
143
144        // divide sum by total count to obtain conditional probability
145        float p = (float) sum / TotalCounts.get(tok);
146
147        // emit P(EvenOrOdd|Token)
148        output.collect(tupleKey, new FloatWritable(p));
149      }
150    }
151  }
152
153  // partition by first field of the tuple, so that tuples corresponding
154  // to the same token will be sent to the same reducer
155  private static class MyPartitioner implements
156      Partitioner<Tuple, FloatWritable> {
157    public void configure(JobConf job) {
158    }
159
160    public int getPartition(Tuple key, FloatWritable value,
161        int numReduceTasks) {
162      return (key.get("Token").hashCode() & Integer.MAX_VALUE)
163          % numReduceTasks;
164    }
165  }
166
167  // dummy constructor
168  private DemoWordCondProb() {
169  }
170
171  /**
172   * Runs the demo.
173   */
174  public static void main(String[] args) throws IOException {
175    String inPath = "/shared/sample-input/bible+shakes.nopunc.packed";
176    String output1Path = "condprob";
177    int numMapTasks = 20;
178    int numReduceTasks = 10;
179
180    // first MapReduce cycle is to do the tuple counting
181    JobConf conf1 = new JobConf(DemoWordCondProb.class);
182    conf1.setJobName("DemoWordCondProb.MR1");
183
184    conf1.setNumMapTasks(numMapTasks);
185    conf1.setNumReduceTasks(numReduceTasks);
186    conf1.setInputPath(new Path(inPath));
187   
188    conf1.setInputFormat(SequenceFileInputFormat.class);
189   
190    conf1.setOutputPath(new Path(output1Path));
191    conf1.setOutputKeyClass(Tuple.class);
192    conf1.setOutputValueClass(FloatWritable.class);
193    conf1.setOutputFormat(TextOutputFormat.class);
194
195    conf1.setMapperClass(MapClass.class);
196    // this is a potential gotcha! can't use ReduceClass for combine because
197    // we have not collected all the counts yet, so we can't divide through
198    // to compute the conditional probabilities
199    conf1.setCombinerClass(IdentityReducer.class);
200    conf1.setReducerClass(ReduceClass.class);
201    conf1.setPartitionerClass(MyPartitioner.class);
202
203    JobClient.runJob(conf1);
204  }
205}
Note: See TracBrowser for help on using the repository browser.