Changes between Initial Version and Version 1 of waue/2009/0723


Ignore:
Timestamp:
Jul 23, 2009, 6:00:56 PM (15 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2009/0723

    v1 v1  
     1{{{
     2import java.io.File;
     3import java.io.IOException;
     4import java.util.Map;
     5
     6import org.apache.commons.logging.Log;
     7import org.apache.commons.logging.LogFactory;
     8import org.apache.hadoop.fs.FileUtil;
     9import org.apache.hadoop.hbase.HColumnDescriptor;
     10import org.apache.hadoop.hbase.HConstants;
     11import org.apache.hadoop.hbase.HTableDescriptor;
     12import org.apache.hadoop.hbase.MultiRegionTable;
     13import org.apache.hadoop.hbase.client.HTable;
     14import org.apache.hadoop.hbase.client.Result;
     15import org.apache.hadoop.hbase.client.Scan;
     16import org.apache.hadoop.hbase.client.ResultScanner;
     17import org.apache.hadoop.hbase.io.BatchUpdate;
     18import org.apache.hadoop.hbase.io.Cell;
     19import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     20import org.apache.hadoop.hbase.io.RowResult;
     21import org.apache.hadoop.hbase.util.Bytes;
     22import org.apache.hadoop.mapred.JobClient;
     23import org.apache.hadoop.mapred.JobConf;
     24import org.apache.hadoop.mapred.MapReduceBase;
     25import org.apache.hadoop.mapred.MiniMRCluster;
     26import org.apache.hadoop.mapred.OutputCollector;
     27import org.apache.hadoop.mapred.Reporter;
     28
     29/**
     30 * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
     31 * on our tables is simple - take every row in the table, reverse the value of
     32 * a particular cell, and write it back to the table.
     33 */
     34public class TestTableMapReduce extends MultiRegionTable {
     35  private static final Log LOG =
     36    LogFactory.getLog(TestTableMapReduce.class.getName());
     37
     38  static final String MULTI_REGION_TABLE_NAME = "mrtest";
     39  static final String INPUT_COLUMN = "contents:";
     40  static final String OUTPUT_COLUMN = "text:";
     41 
     42  private static final byte [][] columns = new byte [][] {
     43    Bytes.toBytes(INPUT_COLUMN),
     44    Bytes.toBytes(OUTPUT_COLUMN)
     45  };
     46
     47  /** constructor */
     48  public TestTableMapReduce() {
     49    super(INPUT_COLUMN);
     50    desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
     51    desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
     52    desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
     53  }
     54
     55  /**
     56   * Pass the given key and processed record reduce
     57   */
     58  public static class ProcessContentsMapper
     59  extends MapReduceBase
     60  implements TableMap<ImmutableBytesWritable, BatchUpdate> {
     61    /**
     62     * Pass the key, and reversed value to reduce
     63     * @param key
     64     * @param value
     65     * @param output
     66     * @param reporter
     67     * @throws IOException
     68     */
     69    public void map(ImmutableBytesWritable key, RowResult value,
     70      OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
     71      Reporter reporter)
     72    throws IOException {
     73      if (value.size() != 1) {
     74        throw new IOException("There should only be one input column");
     75      }
     76      byte [][] keys = value.keySet().toArray(new byte[value.size()][]);
     77      if(!Bytes.equals(keys[0], Bytes.toBytes(INPUT_COLUMN))) {
     78        throw new IOException("Wrong input column. Expected: '" + INPUT_COLUMN
     79          + "' but got: '" + Bytes.toString(keys[0]) + "'");
     80      }
     81
     82      // Get the original value and reverse it
     83     
     84      String originalValue =
     85        new String(value.get(keys[0]).getValue(), HConstants.UTF8_ENCODING);
     86      StringBuilder newValue = new StringBuilder();
     87      for(int i = originalValue.length() - 1; i >= 0; i--) {
     88        newValue.append(originalValue.charAt(i));
     89      }
     90     
     91      // Now set the value to be collected
     92
     93      BatchUpdate outval = new BatchUpdate(key.get());
     94      outval.put(OUTPUT_COLUMN, Bytes.toBytes(newValue.toString()));
     95      output.collect(key, outval);
     96    }
     97  }
     98 
     99  /**
     100   * Test a map/reduce against a multi-region table
     101   * @throws IOException
     102   */
     103  public void testMultiRegionTable() throws IOException {
     104    runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
     105  }
     106
     107  private void runTestOnTable(HTable table) throws IOException {
     108    MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
     109
     110    JobConf jobConf = null;
     111    try {
     112      LOG.info("Before map/reduce startup");
     113      jobConf = new JobConf(conf, TestTableMapReduce.class);
     114      jobConf.setJobName("process column contents");
     115      jobConf.setNumReduceTasks(1);
     116      TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()),
     117        INPUT_COLUMN, ProcessContentsMapper.class,
     118        ImmutableBytesWritable.class, BatchUpdate.class, jobConf);
     119      TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()),
     120        IdentityTableReduce.class, jobConf);
     121           
     122      LOG.info("Started " + Bytes.toString(table.getTableName()));
     123      JobClient.runJob(jobConf);
     124      LOG.info("After map/reduce completion");
     125
     126      // verify map-reduce results
     127      verify(Bytes.toString(table.getTableName()));
     128    } finally {
     129      mrCluster.shutdown();
     130      if (jobConf != null) {
     131        FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
     132      }
     133    }
     134  }
     135
     136  private void verify(String tableName) throws IOException {
     137    HTable table = new HTable(conf, tableName);
     138    boolean verified = false;
     139    long pause = conf.getLong("hbase.client.pause", 5 * 1000);
     140    int numRetries = conf.getInt("hbase.client.retries.number", 5);
     141    for (int i = 0; i < numRetries; i++) {
     142      try {
     143        LOG.info("Verification attempt #" + i);
     144        verifyAttempt(table);
     145        verified = true;
     146        break;
     147      } catch (NullPointerException e) {
     148        // If here, a cell was empty.  Presume its because updates came in
     149        // after the scanner had been opened.  Wait a while and retry.
     150        LOG.debug("Verification attempt failed: " + e.getMessage());
     151      }
     152      try {
     153        Thread.sleep(pause);
     154      } catch (InterruptedException e) {
     155        // continue
     156      }
     157    }
     158    assertTrue(verified);
     159  }
     160
     161  /**
     162   * Looks at every value of the mapreduce output and verifies that indeed
     163   * the values have been reversed.
     164   * @param table Table to scan.
     165   * @throws IOException
     166   * @throws NullPointerException if we failed to find a cell value
     167   */
     168  private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
     169    Scan scan = new Scan();
     170    scan.addColumns(columns);
     171    ResultScanner scanner = table.getScanner(scan);
     172    try {
     173      for (Result r : scanner) {
     174        if (LOG.isDebugEnabled()) {
     175          if (r.size() > 2 ) {
     176            throw new IOException("Too many results, expected 2 got " +
     177              r.size());
     178          }
     179        }
     180        byte[] firstValue = null;
     181        byte[] secondValue = null;
     182        int count = 0;
     183        for(Map.Entry<byte [], Cell> e: r.getRowResult().entrySet()) {
     184          if (count == 0) {
     185            firstValue = e.getValue().getValue();
     186          }
     187          if (count == 1) {
     188            secondValue = e.getValue().getValue();
     189          }
     190          count++;
     191          if (count == 2) {
     192            break;
     193          }
     194        }
     195       
     196        String first = "";
     197        if (firstValue == null) {
     198          throw new NullPointerException(Bytes.toString(r.getRow()) +
     199            ": first value is null");
     200        }
     201        first = new String(firstValue, HConstants.UTF8_ENCODING);
     202       
     203        String second = "";
     204        if (secondValue == null) {
     205          throw new NullPointerException(Bytes.toString(r.getRow()) +
     206            ": second value is null");
     207        }
     208        byte[] secondReversed = new byte[secondValue.length];
     209        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
     210          secondReversed[i] = secondValue[j];
     211        }
     212        second = new String(secondReversed, HConstants.UTF8_ENCODING);
     213
     214        if (first.compareTo(second) != 0) {
     215          if (LOG.isDebugEnabled()) {
     216            LOG.debug("second key is not the reverse of first. row=" +
     217                r.getRow() + ", first value=" + first + ", second value=" +
     218                second);
     219          }
     220          fail();
     221        }
     222      }
     223    } finally {
     224      scanner.close();
     225    }
     226  }
     227}
     228}}}