/**
* Program: HBaseRecordPro.java
* Editor: Waue Chen
* From : NCHC. Taiwn
* Last Update Date: 07/02/2008
* Upgrade to 0.17
*/
/*
* Cloud9: A MapReduce Library for Hadoop
*/
package tw.org.nchc.demo;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import tw.org.nchc.code.Convert;
import tw.org.nchc.tuple.ListWritable;
import tw.org.nchc.tuple.Schema;
import tw.org.nchc.tuple.Tuple;
/**
*
* Demo that illustrates the use of the tuple library ({@link Tuple} and
* {@link ListWritable} class). Input comes from Bible+Shakespeare sample
* collection, encoded with {@link DemoPackRecords2}. Otherwise, this demo is
* exactly the same as {@link DemoWordCountTuple}.
*
*/
public class DemoWordCountTuple2 {
// create the schema for the tuple that will serve as the key
private static final Schema KEY_SCHEMA = new Schema();
// define the schema statically
static {
KEY_SCHEMA.addField("Token", String.class, "");
KEY_SCHEMA.addField("EvenOrOdd", Integer.class, new Integer(1));
}
// mapper that emits tuple as the key, and value '1' for each occurrence
private static class MapClass extends MapReduceBase implements
Mapper {
// define value '1' statically so we can reuse the object, i.e., avoid
// unnecessary object creation
private final static IntWritable one = new IntWritable(1);
// once again, reuse tuples if possible
private Tuple tupleOut = KEY_SCHEMA.instantiate();
public void map(LongWritable key, Tuple tupleIn,
OutputCollector output, Reporter reporter)
throws IOException {
@SuppressWarnings("unchecked")
ListWritable list = (ListWritable) tupleIn.get(1);
for (int i = 0; i < list.size(); i++) {
Text t = (Text) list.get(i);
String token = t.toString();
// put new values into the tuple
tupleOut.set("Token", token);
tupleOut.set("EvenOrOdd", ((Integer) tupleIn.get(0)) % 2);
// emit key-value pair
output.collect(tupleOut, one);
}
}
}
// reducer counts up tuple occurrences
private static class ReduceClass extends MapReduceBase implements
Reducer {
private final static IntWritable SumValue = new IntWritable();
public synchronized void reduce(Tuple tupleKey,
Iterator values,
OutputCollector output, Reporter reporter)
throws IOException {
// sum values
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
// keep original tuple key, emit sum of counts as value
SumValue.set(sum);
output.collect(tupleKey, SumValue);
}
}
// dummy constructor
private DemoWordCountTuple2() {
}
/**
* Runs the demo.
*/
public static void main(String[] args) throws IOException {
String inPath = "/shared/sample-input/bible+shakes.nopunc.packed2";
String outputPath = "word-counts2-tuple";
int numMapTasks = 20;
int numReduceTasks = 20;
JobConf conf = new JobConf(DemoWordCountTuple2.class);
conf.setJobName("wordcount");
conf.setNumMapTasks(numMapTasks);
conf.setNumReduceTasks(numReduceTasks);
// 0.16
// conf.setInputPath(new Path(inPath));
Convert.setInputPath(conf,new Path(inPath));
conf.setInputFormat(SequenceFileInputFormat.class);
// 0.16
// conf.setOutputPath(new Path(outputPath));
Convert.setInputPath(conf, new Path(outputPath));
conf.setOutputKeyClass(Tuple.class);
conf.setOutputValueClass(IntWritable.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(ReduceClass.class);
conf.setReducerClass(ReduceClass.class);
JobClient.runJob(conf);
}
}