/** * Program: HBaseRecordPro.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 07/02/2008 */ /* * Cloud9: A MapReduce Library for Hadoop */ package tw.org.nchc.demo; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import tw.org.nchc.tuple.ListWritable; import tw.org.nchc.tuple.Schema; import tw.org.nchc.tuple.Tuple; /** *

* Demo that illustrates the use of the tuple library ({@link Tuple} and * {@link ListWritable} class). Input comes from Bible+Shakespeare sample * collection, encoded with {@link DemoPackRecords2}. Otherwise, this demo is * exactly the same as {@link DemoWordCountTuple}. *

*/ public class DemoWordCountTuple2 { // create the schema for the tuple that will serve as the key private static final Schema KEY_SCHEMA = new Schema(); // define the schema statically static { KEY_SCHEMA.addField("Token", String.class, ""); KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1)); } // mapper that emits tuple as the key, and value '1' for each occurrence private static class MapClass extends MapReduceBase implements Mapper { // define value '1' statically so we can reuse the object, i.e., avoid // unnecessary object creation private final static IntWritable one = new IntWritable(1); // once again, reuse tuples if possible private Tuple tupleOut = KEY_SCHEMA.instantiate(); public void map(LongWritable key, Tuple tupleIn, OutputCollector output, Reporter reporter) throws IOException { @SuppressWarnings("unchecked") ListWritable list = (ListWritable) tupleIn.get(1); for (int i = 0; i < list.size(); i++) { Text t = (Text) list.get(i); String token = t.toString(); // put new values into the tuple tupleOut.set("Token", token); tupleOut.set("EvenOrOdd", ((Integer) tupleIn.get(0)) % 2); // emit key-value pair output.collect(tupleOut, one); } } } // reducer counts up tuple occurrences private static class ReduceClass extends MapReduceBase implements Reducer { private final static IntWritable SumValue = new IntWritable(); public synchronized void reduce(Tuple tupleKey, Iterator values, OutputCollector output, Reporter reporter) throws IOException { // sum values int sum = 0; while (values.hasNext()) { sum += values.next().get(); } // keep original tuple key, emit sum of counts as value SumValue.set(sum); output.collect(tupleKey, SumValue); } } // dummy constructor private DemoWordCountTuple2() { } /** * Runs the demo. */ public static void main(String[] args) throws IOException { String inPath = "/shared/sample-input/bible+shakes.nopunc.packed2"; String outputPath = "word-counts2-tuple"; int numMapTasks = 20; int numReduceTasks = 20; JobConf conf = new JobConf(DemoWordCountTuple2.class); conf.setJobName("wordcount"); conf.setNumMapTasks(numMapTasks); conf.setNumReduceTasks(numReduceTasks); // 0.16 conf.setInputPath(new Path(inPath)); // Convert.setInputPath(conf,new Path(inPath)); conf.setInputFormat(SequenceFileInputFormat.class); // 0.16 conf.setOutputPath(new Path(outputPath)); // Convert.setInputPath(conf, new Path(outputPath)); conf.setOutputKeyClass(Tuple.class); conf.setOutputValueClass(IntWritable.class); conf.setOutputFormat(SequenceFileOutputFormat.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(ReduceClass.class); conf.setReducerClass(ReduceClass.class); JobClient.runJob(conf); } }