| 1 | {{{ |
| 2 | import java.io.File; |
| 3 | import java.io.IOException; |
| 4 | import java.util.Map; |
| 5 | |
| 6 | import org.apache.commons.logging.Log; |
| 7 | import org.apache.commons.logging.LogFactory; |
| 8 | import org.apache.hadoop.fs.FileUtil; |
| 9 | import org.apache.hadoop.hbase.HColumnDescriptor; |
| 10 | import org.apache.hadoop.hbase.HConstants; |
| 11 | import org.apache.hadoop.hbase.HTableDescriptor; |
| 12 | import org.apache.hadoop.hbase.MultiRegionTable; |
| 13 | import org.apache.hadoop.hbase.client.HTable; |
| 14 | import org.apache.hadoop.hbase.client.Result; |
| 15 | import org.apache.hadoop.hbase.client.Scan; |
| 16 | import org.apache.hadoop.hbase.client.ResultScanner; |
| 17 | import org.apache.hadoop.hbase.io.BatchUpdate; |
| 18 | import org.apache.hadoop.hbase.io.Cell; |
| 19 | import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| 20 | import org.apache.hadoop.hbase.io.RowResult; |
| 21 | import org.apache.hadoop.hbase.util.Bytes; |
| 22 | import org.apache.hadoop.mapred.JobClient; |
| 23 | import org.apache.hadoop.mapred.JobConf; |
| 24 | import org.apache.hadoop.mapred.MapReduceBase; |
| 25 | import org.apache.hadoop.mapred.MiniMRCluster; |
| 26 | import org.apache.hadoop.mapred.OutputCollector; |
| 27 | import org.apache.hadoop.mapred.Reporter; |
| 28 | |
| 29 | /** |
| 30 | * Test Map/Reduce job over HBase tables. The map/reduce process we're testing |
| 31 | * on our tables is simple - take every row in the table, reverse the value of |
| 32 | * a particular cell, and write it back to the table. |
| 33 | */ |
| 34 | public class TestTableMapReduce extends MultiRegionTable { |
| 35 | private static final Log LOG = |
| 36 | LogFactory.getLog(TestTableMapReduce.class.getName()); |
| 37 | |
| 38 | static final String MULTI_REGION_TABLE_NAME = "mrtest"; |
| 39 | static final String INPUT_COLUMN = "contents:"; |
| 40 | static final String OUTPUT_COLUMN = "text:"; |
| 41 | |
| 42 | private static final byte [][] columns = new byte [][] { |
| 43 | Bytes.toBytes(INPUT_COLUMN), |
| 44 | Bytes.toBytes(OUTPUT_COLUMN) |
| 45 | }; |
| 46 | |
| 47 | /** constructor */ |
| 48 | public TestTableMapReduce() { |
| 49 | super(INPUT_COLUMN); |
| 50 | desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME); |
| 51 | desc.addFamily(new HColumnDescriptor(INPUT_COLUMN)); |
| 52 | desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN)); |
| 53 | } |
| 54 | |
| 55 | /** |
| 56 | * Pass the given key and processed record reduce |
| 57 | */ |
| 58 | public static class ProcessContentsMapper |
| 59 | extends MapReduceBase |
| 60 | implements TableMap<ImmutableBytesWritable, BatchUpdate> { |
| 61 | /** |
| 62 | * Pass the key, and reversed value to reduce |
| 63 | * @param key |
| 64 | * @param value |
| 65 | * @param output |
| 66 | * @param reporter |
| 67 | * @throws IOException |
| 68 | */ |
| 69 | public void map(ImmutableBytesWritable key, RowResult value, |
| 70 | OutputCollector<ImmutableBytesWritable, BatchUpdate> output, |
| 71 | Reporter reporter) |
| 72 | throws IOException { |
| 73 | if (value.size() != 1) { |
| 74 | throw new IOException("There should only be one input column"); |
| 75 | } |
| 76 | byte [][] keys = value.keySet().toArray(new byte[value.size()][]); |
| 77 | if(!Bytes.equals(keys[0], Bytes.toBytes(INPUT_COLUMN))) { |
| 78 | throw new IOException("Wrong input column. Expected: '" + INPUT_COLUMN |
| 79 | + "' but got: '" + Bytes.toString(keys[0]) + "'"); |
| 80 | } |
| 81 | |
| 82 | // Get the original value and reverse it |
| 83 | |
| 84 | String originalValue = |
| 85 | new String(value.get(keys[0]).getValue(), HConstants.UTF8_ENCODING); |
| 86 | StringBuilder newValue = new StringBuilder(); |
| 87 | for(int i = originalValue.length() - 1; i >= 0; i--) { |
| 88 | newValue.append(originalValue.charAt(i)); |
| 89 | } |
| 90 | |
| 91 | // Now set the value to be collected |
| 92 | |
| 93 | BatchUpdate outval = new BatchUpdate(key.get()); |
| 94 | outval.put(OUTPUT_COLUMN, Bytes.toBytes(newValue.toString())); |
| 95 | output.collect(key, outval); |
| 96 | } |
| 97 | } |
| 98 | |
| 99 | /** |
| 100 | * Test a map/reduce against a multi-region table |
| 101 | * @throws IOException |
| 102 | */ |
| 103 | public void testMultiRegionTable() throws IOException { |
| 104 | runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME)); |
| 105 | } |
| 106 | |
| 107 | private void runTestOnTable(HTable table) throws IOException { |
| 108 | MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1); |
| 109 | |
| 110 | JobConf jobConf = null; |
| 111 | try { |
| 112 | LOG.info("Before map/reduce startup"); |
| 113 | jobConf = new JobConf(conf, TestTableMapReduce.class); |
| 114 | jobConf.setJobName("process column contents"); |
| 115 | jobConf.setNumReduceTasks(1); |
| 116 | TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()), |
| 117 | INPUT_COLUMN, ProcessContentsMapper.class, |
| 118 | ImmutableBytesWritable.class, BatchUpdate.class, jobConf); |
| 119 | TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()), |
| 120 | IdentityTableReduce.class, jobConf); |
| 121 | |
| 122 | LOG.info("Started " + Bytes.toString(table.getTableName())); |
| 123 | JobClient.runJob(jobConf); |
| 124 | LOG.info("After map/reduce completion"); |
| 125 | |
| 126 | // verify map-reduce results |
| 127 | verify(Bytes.toString(table.getTableName())); |
| 128 | } finally { |
| 129 | mrCluster.shutdown(); |
| 130 | if (jobConf != null) { |
| 131 | FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir"))); |
| 132 | } |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | private void verify(String tableName) throws IOException { |
| 137 | HTable table = new HTable(conf, tableName); |
| 138 | boolean verified = false; |
| 139 | long pause = conf.getLong("hbase.client.pause", 5 * 1000); |
| 140 | int numRetries = conf.getInt("hbase.client.retries.number", 5); |
| 141 | for (int i = 0; i < numRetries; i++) { |
| 142 | try { |
| 143 | LOG.info("Verification attempt #" + i); |
| 144 | verifyAttempt(table); |
| 145 | verified = true; |
| 146 | break; |
| 147 | } catch (NullPointerException e) { |
| 148 | // If here, a cell was empty. Presume its because updates came in |
| 149 | // after the scanner had been opened. Wait a while and retry. |
| 150 | LOG.debug("Verification attempt failed: " + e.getMessage()); |
| 151 | } |
| 152 | try { |
| 153 | Thread.sleep(pause); |
| 154 | } catch (InterruptedException e) { |
| 155 | // continue |
| 156 | } |
| 157 | } |
| 158 | assertTrue(verified); |
| 159 | } |
| 160 | |
| 161 | /** |
| 162 | * Looks at every value of the mapreduce output and verifies that indeed |
| 163 | * the values have been reversed. |
| 164 | * @param table Table to scan. |
| 165 | * @throws IOException |
| 166 | * @throws NullPointerException if we failed to find a cell value |
| 167 | */ |
| 168 | private void verifyAttempt(final HTable table) throws IOException, NullPointerException { |
| 169 | Scan scan = new Scan(); |
| 170 | scan.addColumns(columns); |
| 171 | ResultScanner scanner = table.getScanner(scan); |
| 172 | try { |
| 173 | for (Result r : scanner) { |
| 174 | if (LOG.isDebugEnabled()) { |
| 175 | if (r.size() > 2 ) { |
| 176 | throw new IOException("Too many results, expected 2 got " + |
| 177 | r.size()); |
| 178 | } |
| 179 | } |
| 180 | byte[] firstValue = null; |
| 181 | byte[] secondValue = null; |
| 182 | int count = 0; |
| 183 | for(Map.Entry<byte [], Cell> e: r.getRowResult().entrySet()) { |
| 184 | if (count == 0) { |
| 185 | firstValue = e.getValue().getValue(); |
| 186 | } |
| 187 | if (count == 1) { |
| 188 | secondValue = e.getValue().getValue(); |
| 189 | } |
| 190 | count++; |
| 191 | if (count == 2) { |
| 192 | break; |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | String first = ""; |
| 197 | if (firstValue == null) { |
| 198 | throw new NullPointerException(Bytes.toString(r.getRow()) + |
| 199 | ": first value is null"); |
| 200 | } |
| 201 | first = new String(firstValue, HConstants.UTF8_ENCODING); |
| 202 | |
| 203 | String second = ""; |
| 204 | if (secondValue == null) { |
| 205 | throw new NullPointerException(Bytes.toString(r.getRow()) + |
| 206 | ": second value is null"); |
| 207 | } |
| 208 | byte[] secondReversed = new byte[secondValue.length]; |
| 209 | for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { |
| 210 | secondReversed[i] = secondValue[j]; |
| 211 | } |
| 212 | second = new String(secondReversed, HConstants.UTF8_ENCODING); |
| 213 | |
| 214 | if (first.compareTo(second) != 0) { |
| 215 | if (LOG.isDebugEnabled()) { |
| 216 | LOG.debug("second key is not the reverse of first. row=" + |
| 217 | r.getRow() + ", first value=" + first + ", second value=" + |
| 218 | second); |
| 219 | } |
| 220 | fail(); |
| 221 | } |
| 222 | } |
| 223 | } finally { |
| 224 | scanner.close(); |
| 225 | } |
| 226 | } |
| 227 | } |
| 228 | }}} |