source: sample/hadoop-0.17/tw/org/nchc/demo/DemoWordCondProb.java @ 20

Last change on this file since 20 was 20, checked in by waue, 16 years ago

將改完的 hadoop 0.17版package 放來備份
目前繼續開發 hadoop 0.16 + hbase 1.3

File size: 6.8 KB
Line 
1/**
2 * Program: DemoWordCondProb.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 * Re-code from : Cloud9: A MapReduce Library for Hadoop
7 */
8/*
9 * Cloud9: A MapReduce Library for Hadoop
10 */
11
12package tw.org.nchc.demo;
13
14import java.io.IOException;
15import java.rmi.UnexpectedException;
16import java.util.HashMap;
17import java.util.Iterator;
18import java.util.StringTokenizer;
19
20import org.apache.hadoop.fs.Path;
21import org.apache.hadoop.io.FloatWritable;
22import org.apache.hadoop.io.LongWritable;
23import org.apache.hadoop.mapred.JobClient;
24import org.apache.hadoop.mapred.JobConf;
25import org.apache.hadoop.mapred.MapReduceBase;
26import org.apache.hadoop.mapred.Mapper;
27import org.apache.hadoop.mapred.OutputCollector;
28import org.apache.hadoop.mapred.Partitioner;
29import org.apache.hadoop.mapred.Reducer;
30import org.apache.hadoop.mapred.Reporter;
31import org.apache.hadoop.mapred.SequenceFileInputFormat;
32import org.apache.hadoop.mapred.TextOutputFormat;
33import org.apache.hadoop.mapred.lib.IdentityReducer;
34
35import tw.org.nchc.code.Convert;
36import tw.org.nchc.tuple.Schema;
37import tw.org.nchc.tuple.Tuple;
38
39/**
40 * <p>
41 * Demo that illustrates the use of a Partitioner and special symbols in Tuple
42 * to compute conditional probabilities. Demo builds on
43 * {@link DemoWordCountTuple}, and has similar structure. Input comes from
44 * Bible+Shakespeare sample collection, encoded as single-field tuples; see
45 * {@link DemoPackRecords}. Sample of final output:
46 *
47 * <pre>
48 * ...
49 * (admirable, *)   15.0
50 * (admirable, 0)   0.6
51 * (admirable, 1)   0.4
52 * (admiral, *)     6.0
53 * (admiral, 0)     0.33333334
54 * (admiral, 1)     0.6666667
55 * (admiration, *)  16.0
56 * (admiration, 0)  0.625
57 * (admiration, 1)  0.375
58 * (admire, *)      8.0
59 * (admire, 0)      0.625
60 * (admire, 1)      0.375
61 * (admired, *)     19.0
62 * (admired, 0)     0.6315789
63 * (admired, 1)     0.36842105
64 * ...
65 * </pre>
66 *
67 * <p>
68 * The first field of the key tuple contains a token. If the second field
69 * contains the special symbol '*', then the value indicates the count of the
70 * token in the collection. Otherwise, the value indicates p(EvenOrOdd|Token),
71 * the probability that a line is odd-length or even-length, given the
72 * occurrence of a token.
73 * </p>
74 */
75public class DemoWordCondProb {
76
77  // create the schema for the tuple that will serve as the key
78  private static final Schema KEY_SCHEMA = new Schema();
79
80  // define the schema statically
81  static {
82    KEY_SCHEMA.addField("Token", String.class, "");
83    KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
84  }
85
86  // mapper that emits tuple as the key, and value '1' for each occurrence
87  private static class MapClass extends MapReduceBase implements
88      Mapper<LongWritable, Tuple, Tuple, FloatWritable> {
89    private final static FloatWritable one = new FloatWritable(1);
90    private Tuple tupleOut = KEY_SCHEMA.instantiate();
91
92    public void map(LongWritable key, Tuple tupleIn,
93        OutputCollector<Tuple, FloatWritable> output, Reporter reporter)
94        throws IOException {
95
96      // the input value is a tuple; get field 0
97      // see DemoPackRecords of how input SequenceFile is generated
98      String line = (String) ((Tuple) tupleIn).get(0);
99      StringTokenizer itr = new StringTokenizer(line);
100      while (itr.hasMoreTokens()) {
101        String token = itr.nextToken();
102
103        // emit key-value pair for either even-length or odd-length line
104        tupleOut.set("Token", token);
105        tupleOut.set("EvenOrOdd", line.length() % 2);
106        output.collect(tupleOut, one);
107
108        // emit key-value pair for the total count
109        tupleOut.set("Token", token);
110        // use special symbol in field 2
111        tupleOut.setSymbol("EvenOrOdd", "*");
112        output.collect(tupleOut, one);
113      }
114    }
115  }
116
117  // reducer computes conditional probabilities
118  private static class ReduceClass extends MapReduceBase implements
119      Reducer<Tuple, FloatWritable, Tuple, FloatWritable> {
120    // HashMap keeps track of total counts
121    private final static HashMap<String, Integer> TotalCounts = new HashMap<String, Integer>();
122
123    public synchronized void reduce(Tuple tupleKey,
124        Iterator<FloatWritable> values,
125        OutputCollector<Tuple, FloatWritable> output, Reporter reporter)
126        throws IOException {
127      // sum values
128      int sum = 0;
129      while (values.hasNext()) {
130        sum += values.next().get();
131      }
132
133      String tok = (String) tupleKey.get("Token");
134
135      // check if the second field is a special symbol
136      if (tupleKey.containsSymbol("EvenOrOdd")) {
137        // emit total count
138        output.collect(tupleKey, new FloatWritable(sum));
139        // record total count
140        TotalCounts.put(tok, sum);
141      } else {
142        if (!TotalCounts.containsKey(tok))
143          throw new UnexpectedException("Don't have total counts!");
144
145        // divide sum by total count to obtain conditional probability
146        float p = (float) sum / TotalCounts.get(tok);
147
148        // emit P(EvenOrOdd|Token)
149        output.collect(tupleKey, new FloatWritable(p));
150      }
151    }
152  }
153
154  // partition by first field of the tuple, so that tuples corresponding
155  // to the same token will be sent to the same reducer
156  private static class MyPartitioner implements
157      Partitioner<Tuple, FloatWritable> {
158    public void configure(JobConf job) {
159    }
160
161    public int getPartition(Tuple key, FloatWritable value,
162        int numReduceTasks) {
163      return (key.get("Token").hashCode() & Integer.MAX_VALUE)
164          % numReduceTasks;
165    }
166  }
167
168  // dummy constructor
169  private DemoWordCondProb() {
170  }
171
172  /**
173   * Runs the demo.
174   */
175  public static void main(String[] args) throws IOException {
176    String inPath = "/shared/sample-input/bible+shakes.nopunc.packed";
177    String output1Path = "condprob";
178    int numMapTasks = 20;
179    int numReduceTasks = 10;
180
181    // first MapReduce cycle is to do the tuple counting
182    JobConf conf1 = new JobConf(DemoWordCondProb.class);
183    conf1.setJobName("DemoWordCondProb.MR1");
184
185    conf1.setNumMapTasks(numMapTasks);
186    conf1.setNumReduceTasks(numReduceTasks);
187    //0.16
188//    conf1.setInputPath(new Path(inPath));
189    Convert.setInputPath(conf1, new Path(inPath));
190   
191    conf1.setInputFormat(SequenceFileInputFormat.class);
192   
193    // 0.16
194//    conf1.setOutputPath(new Path(output1Path));
195    Convert.setOutputPath(conf1,new Path(output1Path));
196    conf1.setOutputKeyClass(Tuple.class);
197    conf1.setOutputValueClass(FloatWritable.class);
198    conf1.setOutputFormat(TextOutputFormat.class);
199
200    conf1.setMapperClass(MapClass.class);
201    // this is a potential gotcha! can't use ReduceClass for combine because
202    // we have not collected all the counts yet, so we can't divide through
203    // to compute the conditional probabilities
204    conf1.setCombinerClass(IdentityReducer.class);
205    conf1.setReducerClass(ReduceClass.class);
206    conf1.setPartitionerClass(MyPartitioner.class);
207
208    JobClient.runJob(conf1);
209  }
210}
Note: See TracBrowser for help on using the repository browser.