| 1 | {{{ |
| 2 | |
| 3 | /** |
| 4 | * Copyright 2008 The Apache Software Foundation |
| 5 | * |
| 6 | * Licensed to the Apache Software Foundation (ASF) under one |
| 7 | * or more contributor license agreements. See the NOTICE file |
| 8 | * distributed with this work for additional information |
| 9 | * regarding copyright ownership. The ASF licenses this file |
| 10 | * to you under the Apache License, Version 2.0 (the |
| 11 | * "License"); you may not use this file except in compliance |
| 12 | * with the License. You may obtain a copy of the License at |
| 13 | * |
| 14 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 15 | * |
| 16 | * Unless required by applicable law or agreed to in writing, software |
| 17 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 18 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 19 | * See the License for the specific language governing permissions and |
| 20 | * limitations under the License. |
| 21 | */ |
| 22 | package hbase; |
| 23 | |
| 24 | import java.io.IOException; |
| 25 | import java.util.Map; |
| 26 | |
| 27 | import org.apache.hadoop.conf.Configuration; |
| 28 | import org.apache.hadoop.fs.Path; |
| 29 | import org.apache.hadoop.hbase.HBaseConfiguration; |
| 30 | import org.apache.hadoop.hbase.io.Cell; |
| 31 | import org.apache.hadoop.hbase.io.HbaseMapWritable; |
| 32 | import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
| 33 | import org.apache.hadoop.hbase.io.RowResult; |
| 34 | import org.apache.hadoop.hbase.util.Bytes; |
| 35 | import org.apache.hadoop.mapred.FileOutputFormat; |
| 36 | import org.apache.hadoop.mapred.JobClient; |
| 37 | import org.apache.hadoop.mapred.JobConf; |
| 38 | import org.apache.hadoop.mapred.OutputCollector; |
| 39 | import org.apache.hadoop.mapred.Reporter; |
| 40 | import org.apache.hadoop.mapred.lib.IdentityReducer; |
| 41 | import org.apache.hadoop.util.Tool; |
| 42 | import org.apache.hadoop.util.ToolRunner; |
| 43 | |
| 44 | /** |
| 45 | * A job with a map to count rows. |
| 46 | * Map outputs table rows IF the input row has columns that have content. |
| 47 | * Uses an {@link IdentityReducer} |
| 48 | */ |
| 49 | public class RowCounter extends TableMap<ImmutableBytesWritable, RowResult> implements Tool { |
| 50 | /* Name of this 'program' |
| 51 | */ |
| 52 | static final String NAME = "rowcounter"; |
| 53 | |
| 54 | private Configuration conf; |
| 55 | private final RowResult EMPTY_RESULT_VALUE = |
| 56 | new RowResult(Bytes.toBytes("dummy"),new HbaseMapWritable<byte [], Cell>()); |
| 57 | private static enum Counters {ROWS} |
| 58 | |
| 59 | @Override |
| 60 | public void map(ImmutableBytesWritable row, RowResult value, |
| 61 | OutputCollector<ImmutableBytesWritable, RowResult> output, |
| 62 | @SuppressWarnings("unused") Reporter reporter) |
| 63 | throws IOException { |
| 64 | boolean content = false; |
| 65 | for (Map.Entry<byte [], Cell> e: value.entrySet()) { |
| 66 | Cell cell = e.getValue(); |
| 67 | if (cell != null && cell.getValue().length > 0) { |
| 68 | content = true; |
| 69 | break; |
| 70 | } |
| 71 | } |
| 72 | if (!content) { |
| 73 | return; |
| 74 | } |
| 75 | // Give out same value every time. We're only interested in the row/key |
| 76 | reporter.incrCounter(Counters.ROWS, 1); |
| 77 | output.collect(row, EMPTY_RESULT_VALUE); |
| 78 | } |
| 79 | |
| 80 | /** |
| 81 | * @param args |
| 82 | * @return the JobConf |
| 83 | * @throws IOException |
| 84 | */ |
| 85 | @SuppressWarnings({ "unused", "deprecation" }) |
| 86 | public JobConf createSubmittableJob(String[] args) throws IOException { |
| 87 | JobConf c = new JobConf(getConf(), RowCounter.class); |
| 88 | c.setJobName(NAME); |
| 89 | // Columns are space delimited |
| 90 | StringBuilder sb = new StringBuilder(); |
| 91 | final int columnoffset = 2; |
| 92 | for (int i = columnoffset; i < args.length; i++) { |
| 93 | if (i > columnoffset) { |
| 94 | sb.append(" "); |
| 95 | } |
| 96 | sb.append(args[i]); |
| 97 | } |
| 98 | // Second argument is the table name. |
| 99 | TableMap.initJob(args[1], sb.toString(), this.getClass(), |
| 100 | ImmutableBytesWritable.class, RowResult.class, c); |
| 101 | c.setReducerClass(IdentityReducer.class); |
| 102 | // First arg is the output directory. |
| 103 | FileOutputFormat.setOutputPath(c, new Path(args[0])); |
| 104 | return c; |
| 105 | } |
| 106 | |
| 107 | static int printUsage() { |
| 108 | System.out.println(NAME + |
| 109 | " <outputdir> <tablename> <column1> [<column2>...]"); |
| 110 | return -1; |
| 111 | } |
| 112 | |
| 113 | public int run(final String[] args) throws Exception { |
| 114 | // Make sure there are at least 3 parameters |
| 115 | if (args.length < 3) { |
| 116 | System.err.println("ERROR: Wrong number of parameters: " + args.length); |
| 117 | return printUsage(); |
| 118 | } |
| 119 | JobClient.runJob(createSubmittableJob(args)); |
| 120 | return 0; |
| 121 | } |
| 122 | |
| 123 | public Configuration getConf() { |
| 124 | return this.conf; |
| 125 | } |
| 126 | |
| 127 | public void setConf(final Configuration c) { |
| 128 | this.conf = c; |
| 129 | } |
| 130 | |
| 131 | /** |
| 132 | * @param args |
| 133 | * @throws Exception |
| 134 | */ |
| 135 | public static void main(String[] args) throws Exception { |
| 136 | String[] argv = {"/user/waue/hba","t1","f2:c1"}; |
| 137 | args = argv; |
| 138 | HBaseConfiguration c = new HBaseConfiguration(); |
| 139 | int errCode = ToolRunner.run(c, new RowCounter(), args); |
| 140 | System.exit(errCode); |
| 141 | } |
| 142 | } |
| 143 | |
| 144 | }}} |