import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MultiRegionTable;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
/**
* Test Map/Reduce job over HBase tables. The map/reduce process we're testing
* on our tables is simple - take every row in the table, reverse the value of
* a particular cell, and write it back to the table.
*/
public class TestTableMapReduce extends MultiRegionTable {
private static final Log LOG =
LogFactory.getLog(TestTableMapReduce.class.getName());
static final String MULTI_REGION_TABLE_NAME = "mrtest";
static final String INPUT_COLUMN = "contents:";
static final String OUTPUT_COLUMN = "text:";
private static final byte [][] columns = new byte [][] {
Bytes.toBytes(INPUT_COLUMN),
Bytes.toBytes(OUTPUT_COLUMN)
};
/** constructor */
public TestTableMapReduce() {
super(INPUT_COLUMN);
desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
}
/**
* Pass the given key and processed record reduce
*/
public static class ProcessContentsMapper
extends MapReduceBase
implements TableMap<ImmutableBytesWritable, BatchUpdate> {
/**
* Pass the key, and reversed value to reduce
* @param key
* @param value
* @param output
* @param reporter
* @throws IOException
*/
public void map(ImmutableBytesWritable key, RowResult value,
OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
Reporter reporter)
throws IOException {
if (value.size() != 1) {
throw new IOException("There should only be one input column");
}
byte [][] keys = value.keySet().toArray(new byte[value.size()][]);
if(!Bytes.equals(keys[0], Bytes.toBytes(INPUT_COLUMN))) {
throw new IOException("Wrong input column. Expected: '" + INPUT_COLUMN
+ "' but got: '" + Bytes.toString(keys[0]) + "'");
}
// Get the original value and reverse it
String originalValue =
new String(value.get(keys[0]).getValue(), HConstants.UTF8_ENCODING);
StringBuilder newValue = new StringBuilder();
for(int i = originalValue.length() - 1; i >= 0; i--) {
newValue.append(originalValue.charAt(i));
}
// Now set the value to be collected
BatchUpdate outval = new BatchUpdate(key.get());
outval.put(OUTPUT_COLUMN, Bytes.toBytes(newValue.toString()));
output.collect(key, outval);
}
}
/**
* Test a map/reduce against a multi-region table
* @throws IOException
*/
public void testMultiRegionTable() throws IOException {
runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
}
private void runTestOnTable(HTable table) throws IOException {
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
JobConf jobConf = null;
try {
LOG.info("Before map/reduce startup");
jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf.setJobName("process column contents");
jobConf.setNumReduceTasks(1);
TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()),
INPUT_COLUMN, ProcessContentsMapper.class,
ImmutableBytesWritable.class, BatchUpdate.class, jobConf);
TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()),
IdentityTableReduce.class, jobConf);
LOG.info("Started " + Bytes.toString(table.getTableName()));
JobClient.runJob(jobConf);
LOG.info("After map/reduce completion");
// verify map-reduce results
verify(Bytes.toString(table.getTableName()));
} finally {
mrCluster.shutdown();
if (jobConf != null) {
FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
}
}
}
private void verify(String tableName) throws IOException {
HTable table = new HTable(conf, tableName);
boolean verified = false;
long pause = conf.getLong("hbase.client.pause", 5 * 1000);
int numRetries = conf.getInt("hbase.client.retries.number", 5);
for (int i = 0; i < numRetries; i++) {
try {
LOG.info("Verification attempt #" + i);
verifyAttempt(table);
verified = true;
break;
} catch (NullPointerException e) {
// If here, a cell was empty. Presume its because updates came in
// after the scanner had been opened. Wait a while and retry.
LOG.debug("Verification attempt failed: " + e.getMessage());
}
try {
Thread.sleep(pause);
} catch (InterruptedException e) {
// continue
}
}
assertTrue(verified);
}
/**
* Looks at every value of the mapreduce output and verifies that indeed
* the values have been reversed.
* @param table Table to scan.
* @throws IOException
* @throws NullPointerException if we failed to find a cell value
*/
private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
Scan scan = new Scan();
scan.addColumns(columns);
ResultScanner scanner = table.getScanner(scan);
try {
for (Result r : scanner) {
if (LOG.isDebugEnabled()) {
if (r.size() > 2 ) {
throw new IOException("Too many results, expected 2 got " +
r.size());
}
}
byte[] firstValue = null;
byte[] secondValue = null;
int count = 0;
for(Map.Entry<byte [], Cell> e: r.getRowResult().entrySet()) {
if (count == 0) {
firstValue = e.getValue().getValue();
}
if (count == 1) {
secondValue = e.getValue().getValue();
}
count++;
if (count == 2) {
break;
}
}
String first = "";
if (firstValue == null) {
throw new NullPointerException(Bytes.toString(r.getRow()) +
": first value is null");
}
first = new String(firstValue, HConstants.UTF8_ENCODING);
String second = "";
if (secondValue == null) {
throw new NullPointerException(Bytes.toString(r.getRow()) +
": second value is null");
}
byte[] secondReversed = new byte[secondValue.length];
for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
secondReversed[i] = secondValue[j];
}
second = new String(secondReversed, HConstants.UTF8_ENCODING);
if (first.compareTo(second) != 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("second key is not the reverse of first. row=" +
r.getRow() + ", first value=" + first + ", second value=" +
second);
}
fail();
}
}
} finally {
scanner.close();
}
}
}