/** * Program: DemoWordCondProb.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 07/02/2008 * Re-code from : Cloud9: A MapReduce Library for Hadoop */ /* * Cloud9: A MapReduce Library for Hadoop */ package tw.org.nchc.demo; import java.io.IOException; import java.rmi.UnexpectedException; import java.util.HashMap; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.IdentityReducer; import tw.org.nchc.code.Convert; import tw.org.nchc.tuple.Schema; import tw.org.nchc.tuple.Tuple; /** *

* Demo that illustrates the use of a Partitioner and special symbols in Tuple * to compute conditional probabilities. Demo builds on * {@link DemoWordCountTuple}, and has similar structure. Input comes from * Bible+Shakespeare sample collection, encoded as single-field tuples; see * {@link DemoPackRecords}. Sample of final output: * *

 * ...
 * (admirable, *)   15.0
 * (admirable, 0)   0.6
 * (admirable, 1)   0.4
 * (admiral, *)     6.0
 * (admiral, 0)     0.33333334
 * (admiral, 1)     0.6666667
 * (admiration, *)  16.0
 * (admiration, 0)  0.625
 * (admiration, 1)  0.375
 * (admire, *)      8.0
 * (admire, 0)      0.625
 * (admire, 1)      0.375
 * (admired, *)     19.0
 * (admired, 0)     0.6315789
 * (admired, 1)     0.36842105
 * ...
 * 
* *

* The first field of the key tuple contains a token. If the second field * contains the special symbol '*', then the value indicates the count of the * token in the collection. Otherwise, the value indicates p(EvenOrOdd|Token), * the probability that a line is odd-length or even-length, given the * occurrence of a token. *

*/ public class DemoWordCondProb { // create the schema for the tuple that will serve as the key private static final Schema KEY_SCHEMA = new Schema(); // define the schema statically static { KEY_SCHEMA.addField("Token", String.class, ""); KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1)); } // mapper that emits tuple as the key, and value '1' for each occurrence private static class MapClass extends MapReduceBase implements Mapper { private final static FloatWritable one = new FloatWritable(1); private Tuple tupleOut = KEY_SCHEMA.instantiate(); public void map(LongWritable key, Tuple tupleIn, OutputCollector output, Reporter reporter) throws IOException { // the input value is a tuple; get field 0 // see DemoPackRecords of how input SequenceFile is generated String line = (String) ((Tuple) tupleIn).get(0); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { String token = itr.nextToken(); // emit key-value pair for either even-length or odd-length line tupleOut.set("Token", token); tupleOut.set("EvenOrOdd", line.length() % 2); output.collect(tupleOut, one); // emit key-value pair for the total count tupleOut.set("Token", token); // use special symbol in field 2 tupleOut.setSymbol("EvenOrOdd", "*"); output.collect(tupleOut, one); } } } // reducer computes conditional probabilities private static class ReduceClass extends MapReduceBase implements Reducer { // HashMap keeps track of total counts private final static HashMap TotalCounts = new HashMap(); public synchronized void reduce(Tuple tupleKey, Iterator values, OutputCollector output, Reporter reporter) throws IOException { // sum values int sum = 0; while (values.hasNext()) { sum += values.next().get(); } String tok = (String) tupleKey.get("Token"); // check if the second field is a special symbol if (tupleKey.containsSymbol("EvenOrOdd")) { // emit total count output.collect(tupleKey, new FloatWritable(sum)); // record total count TotalCounts.put(tok, sum); } else { if (!TotalCounts.containsKey(tok)) throw new UnexpectedException("Don't have total counts!"); // divide sum by total count to obtain conditional probability float p = (float) sum / TotalCounts.get(tok); // emit P(EvenOrOdd|Token) output.collect(tupleKey, new FloatWritable(p)); } } } // partition by first field of the tuple, so that tuples corresponding // to the same token will be sent to the same reducer private static class MyPartitioner implements Partitioner { public void configure(JobConf job) { } public int getPartition(Tuple key, FloatWritable value, int numReduceTasks) { return (key.get("Token").hashCode() & Integer.MAX_VALUE) % numReduceTasks; } } // dummy constructor private DemoWordCondProb() { } /** * Runs the demo. */ public static void main(String[] args) throws IOException { String inPath = "/shared/sample-input/bible+shakes.nopunc.packed"; String output1Path = "condprob"; int numMapTasks = 20; int numReduceTasks = 10; // first MapReduce cycle is to do the tuple counting JobConf conf1 = new JobConf(DemoWordCondProb.class); conf1.setJobName("DemoWordCondProb.MR1"); conf1.setNumMapTasks(numMapTasks); conf1.setNumReduceTasks(numReduceTasks); //0.16 // conf1.setInputPath(new Path(inPath)); Convert.setInputPath(conf1, new Path(inPath)); conf1.setInputFormat(SequenceFileInputFormat.class); // 0.16 // conf1.setOutputPath(new Path(output1Path)); Convert.setOutputPath(conf1,new Path(output1Path)); conf1.setOutputKeyClass(Tuple.class); conf1.setOutputValueClass(FloatWritable.class); conf1.setOutputFormat(TextOutputFormat.class); conf1.setMapperClass(MapClass.class); // this is a potential gotcha! can't use ReduceClass for combine because // we have not collected all the counts yet, so we can't divide through // to compute the conditional probabilities conf1.setCombinerClass(IdentityReducer.class); conf1.setReducerClass(ReduceClass.class); conf1.setPartitionerClass(MyPartitioner.class); JobClient.runJob(conf1); } }