| | 1 | {{{ |
| | 2 | |
| | 3 | /** |
| | 4 | * Copyright 2008 The Apache Software Foundation |
| | 5 | * |
| | 6 | * Licensed to the Apache Software Foundation (ASF) under one |
| | 7 | * or more contributor license agreements. See the NOTICE file |
| | 8 | * distributed with this work for additional information |
| | 9 | * regarding copyright ownership. The ASF licenses this file |
| | 10 | * to you under the Apache License, Version 2.0 (the |
| | 11 | * "License"); you may not use this file except in compliance |
| | 12 | * with the License. You may obtain a copy of the License at |
| | 13 | * |
| | 14 | * http://www.apache.org/licenses/LICENSE-2.0 |
| | 15 | * |
| | 16 | * Unless required by applicable law or agreed to in writing, software |
| | 17 | * distributed under the License is distributed on an "AS IS" BASIS, |
| | 18 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| | 19 | * See the License for the specific language governing permissions and |
| | 20 | * limitations under the License. |
| | 21 | */ |
| | 22 | package hbase; |
| | 23 | |
| | 24 | import java.io.IOException; |
| | 25 | import java.util.Map; |
| | 26 | |
| | 27 | import org.apache.hadoop.conf.Configuration; |
| | 28 | import org.apache.hadoop.fs.Path; |
| | 29 | import org.apache.hadoop.hbase.HBaseConfiguration; |
| | 30 | import org.apache.hadoop.hbase.io.Cell; |
| | 31 | import org.apache.hadoop.hbase.io.HbaseMapWritable; |
| | 32 | import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| | 33 | import org.apache.hadoop.hbase.io.RowResult; |
| | 34 | import org.apache.hadoop.hbase.util.Bytes; |
| | 35 | import org.apache.hadoop.mapred.FileOutputFormat; |
| | 36 | import org.apache.hadoop.mapred.JobClient; |
| | 37 | import org.apache.hadoop.mapred.JobConf; |
| | 38 | import org.apache.hadoop.mapred.OutputCollector; |
| | 39 | import org.apache.hadoop.mapred.Reporter; |
| | 40 | import org.apache.hadoop.mapred.lib.IdentityReducer; |
| | 41 | import org.apache.hadoop.util.Tool; |
| | 42 | import org.apache.hadoop.util.ToolRunner; |
| | 43 | |
| | 44 | /** |
| | 45 | * A job with a map to count rows. |
| | 46 | * Map outputs table rows IF the input row has columns that have content. |
| | 47 | * Uses an {@link IdentityReducer} |
| | 48 | */ |
| | 49 | public class RowCounter extends TableMap<ImmutableBytesWritable, RowResult> implements Tool { |
| | 50 | /* Name of this 'program' |
| | 51 | */ |
| | 52 | static final String NAME = "rowcounter"; |
| | 53 | |
| | 54 | private Configuration conf; |
| | 55 | private final RowResult EMPTY_RESULT_VALUE = |
| | 56 | new RowResult(Bytes.toBytes("dummy"),new HbaseMapWritable<byte [], Cell>()); |
| | 57 | private static enum Counters {ROWS} |
| | 58 | |
| | 59 | @Override |
| | 60 | public void map(ImmutableBytesWritable row, RowResult value, |
| | 61 | OutputCollector<ImmutableBytesWritable, RowResult> output, |
| | 62 | @SuppressWarnings("unused") Reporter reporter) |
| | 63 | throws IOException { |
| | 64 | boolean content = false; |
| | 65 | for (Map.Entry<byte [], Cell> e: value.entrySet()) { |
| | 66 | Cell cell = e.getValue(); |
| | 67 | if (cell != null && cell.getValue().length > 0) { |
| | 68 | content = true; |
| | 69 | break; |
| | 70 | } |
| | 71 | } |
| | 72 | if (!content) { |
| | 73 | return; |
| | 74 | } |
| | 75 | // Give out same value every time. We're only interested in the row/key |
| | 76 | reporter.incrCounter(Counters.ROWS, 1); |
| | 77 | output.collect(row, EMPTY_RESULT_VALUE); |
| | 78 | } |
| | 79 | |
| | 80 | /** |
| | 81 | * @param args |
| | 82 | * @return the JobConf |
| | 83 | * @throws IOException |
| | 84 | */ |
| | 85 | @SuppressWarnings({ "unused", "deprecation" }) |
| | 86 | public JobConf createSubmittableJob(String[] args) throws IOException { |
| | 87 | JobConf c = new JobConf(getConf(), RowCounter.class); |
| | 88 | c.setJobName(NAME); |
| | 89 | // Columns are space delimited |
| | 90 | StringBuilder sb = new StringBuilder(); |
| | 91 | final int columnoffset = 2; |
| | 92 | for (int i = columnoffset; i < args.length; i++) { |
| | 93 | if (i > columnoffset) { |
| | 94 | sb.append(" "); |
| | 95 | } |
| | 96 | sb.append(args[i]); |
| | 97 | } |
| | 98 | // Second argument is the table name. |
| | 99 | TableMap.initJob(args[1], sb.toString(), this.getClass(), |
| | 100 | ImmutableBytesWritable.class, RowResult.class, c); |
| | 101 | c.setReducerClass(IdentityReducer.class); |
| | 102 | // First arg is the output directory. |
| | 103 | FileOutputFormat.setOutputPath(c, new Path(args[0])); |
| | 104 | return c; |
| | 105 | } |
| | 106 | |
| | 107 | static int printUsage() { |
| | 108 | System.out.println(NAME + |
| | 109 | " <outputdir> <tablename> <column1> [<column2>...]"); |
| | 110 | return -1; |
| | 111 | } |
| | 112 | |
| | 113 | public int run(final String[] args) throws Exception { |
| | 114 | // Make sure there are at least 3 parameters |
| | 115 | if (args.length < 3) { |
| | 116 | System.err.println("ERROR: Wrong number of parameters: " + args.length); |
| | 117 | return printUsage(); |
| | 118 | } |
| | 119 | JobClient.runJob(createSubmittableJob(args)); |
| | 120 | return 0; |
| | 121 | } |
| | 122 | |
| | 123 | public Configuration getConf() { |
| | 124 | return this.conf; |
| | 125 | } |
| | 126 | |
| | 127 | public void setConf(final Configuration c) { |
| | 128 | this.conf = c; |
| | 129 | } |
| | 130 | |
| | 131 | /** |
| | 132 | * @param args |
| | 133 | * @throws Exception |
| | 134 | */ |
| | 135 | public static void main(String[] args) throws Exception { |
| | 136 | String[] argv = {"/user/waue/hba","t1","f2:c1"}; |
| | 137 | args = argv; |
| | 138 | HBaseConfiguration c = new HBaseConfiguration(); |
| | 139 | int errCode = ToolRunner.run(c, new RowCounter(), args); |
| | 140 | System.exit(errCode); |
| | 141 | } |
| | 142 | } |
| | 143 | |
| | 144 | }}} |