Package org.apache.hadoop.hbase.mapred

Provides HBase MapReduce Input/OutputFormats, a table indexing MapReduce job, and utility

See:
          Description

Class Summary
BuildTableIndex Example table column indexing class.
Driver Driver for hbase mapreduce jobs.
GroupingTableMap Extract grouping columns from input record
IdentityTableMap Pass the given key and record as-is to reduce
IdentityTableReduce Write to table each key, record pair
IndexConfiguration Configuration parameters for building a Lucene index
IndexConfiguration.ColumnConf  
IndexOutputFormat Create a local index, unwrap Lucene documents created by reduce, add them to the index, and copy the index to the destination.
IndexTableReduce Construct a Lucene document per row, which is consumed by IndexOutputFormat to build a Lucene index
RowCounter A job with a map to count rows.
TableInputFormat Convert HBase tabular data into a format that is consumable by Map/Reduce.
TableInputFormatBase A Base for TableInputFormats.
TableMap<K extends WritableComparable,V extends Writable> Scan an HBase table to sort by a specified sort column.
TableOutputFormat Convert Map/Reduce output and write it to an HBase table
TableReduce<K extends WritableComparable,V extends Writable> Write a table, sorting by the input key
TableSplit A table split corresponds to a key range [low, high)
 

Package org.apache.hadoop.hbase.mapred Description

Provides HBase MapReduce Input/OutputFormats, a table indexing MapReduce job, and utility

HBase, MapReduce and the CLASSPATH

MapReduce jobs deployed to a MapReduce cluster do not by default have access to the HBase configuration under $HBASE_CONF_DIR nor to HBase classes. You could add hbase-site.xml to $HADOOP_HOME/conf and add hbase-X.X.X.jar to the $HADOOP_HOME/lib and copy these changes across your cluster but the cleanest means of adding hbase configuration and classes to the cluster CLASSPATH is by uncommenting HADOOP_CLASSPATH in $HADOOP_HOME/conf/hadoop-env.sh and adding the path to the hbase jar and $HBASE_CONF_DIR directory. Then copy the amended configuration around the cluster. You'll probably need to restart the MapReduce cluster if you want it to notice the new configuration (You may not have to).

For example, here is how you would amend hadoop-env.sh adding the hbase jar, conf, and the PerformanceEvaluation class from hbase test classes to the hadoop CLASSPATH

# Extra Java CLASSPATH elements.  Optional.
# export HADOOP_CLASSPATH=
export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hbase-X.X.X.jar:$HBASE_HOME/build/hbase-X.X.X-test.jar:$HBASE_HOME/conf

Expand $HBASE_HOME appropriately in the in accordance with your local environment.

This is how you would run the PerformanceEvaluation MR job to put up 4 clients:

$HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite 4
The PerformanceEvaluation class wil be found on the CLASSPATH because you added $HBASE_HOME/build/test to HADOOP_CLASSPATH

HBase as MapReduce job data source and sink

HBase can be used as a data source, TableInputFormat, and data sink, TableOutputFormat, for MapReduce jobs. Writing MapReduce jobs that read or write HBase, you'll probably want to subclass TableMap and/or TableReduce. See the do-nothing pass-through classes IdentityTableMap and IdentityTableReduce for basic usage. For a more involved example, see BuildTableIndex or review the org.apache.hadoop.hbase.mapred.TestTableMapReduce unit test.

Running mapreduce jobs that have hbase as source or sink, you'll need to specify source/sink table and column names in your configuration.

Reading from hbase, the !TableInputFormat asks hbase for the list of regions and makes a map-per-region. Writing, its better to have lots of reducers so load is spread across the hbase cluster.

Example Code

Sample Row Counter

See RowCounter. You should be able to run it by doing: % ./bin/hadoop jar hbase-X.X.X.jar. This will invoke the hbase MapReduce Driver class. Select 'rowcounter' from the choice of jobs offered.

Sample MR Bulk Uploader

Read the class comment below for specification of inputs, prerequisites, etc.

package org.apache.hadoop.hbase.mapred;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 Sample uploader.
 
 This is EXAMPLE code.  You will need to change it to work for your context.
 
 Uses TableReduce to put the data into hbase. Change the InputFormat to suit
 your data. Use the map to massage the input so it fits hbase.  Currently its
 just a pass-through map.  In the reduce, you need to output a row and a
 map of columns to cells.  Change map and reduce to suit your input.
 
 <p>The below is wired up to handle an input whose format is a text file
 which has a line format as follow:
 <pre>
 row columnname columndata
 </pre>
 
 <p>The table and columnfamily we're to insert into must preexist.
 
 <p> To run, edit your hadoop-env.sh and add hbase classes and conf to your
 HADOOP_CLASSPATH.  For example:
 <pre>
 export HADOOP_CLASSPATH=/Users/stack/Documents/checkouts/hbase/branches/0.1/build/classes:/Users/stack/Documents/checkouts/hbase/branches/0.1/conf
 </pre>
 <p>Restart your MR cluster after making the following change (You need to 
 be running in pseudo-distributed mode at a minimum for the hadoop to see
 the above additions to your CLASSPATH).
 
 <p>Start up your hbase cluster.
 
 <p>Next do the following to start the MR job:
 <pre>
 ./bin/hadoop org.apache.hadoop.hbase.mapred.SampleUploader /tmp/input.txt TABLE_NAME
 </pre>
 
 <p>This code was written against hbase 0.1 branch.
/
public class SampleUploader extends MapReduceBase
implements Mapper, Tool {
  private static final String NAME = "SampleUploader";
  private Configuration conf;

  public JobConf createSubmittableJob(String[] args) {
    JobConf c = new JobConf(getConf(), SampleUploader.class);
    c.setJobName(NAME);
    c.setInputPath(new Path(args[0]));
    c.setMapperClass(this.getClass());
    c.setMapOutputKeyClass(Text.class);
    c.setMapOutputValueClass(MapWritable.class);
    c.setReducerClass(TableUploader.class);
    TableReduce.initJob(args[1], TableUploader.class, c);
    return c;
  } 

  public void map(LongWritable k, Text v,
    OutputCollector output, Reporter r)
  throws IOException {
    // Lines are space-delimited; first item is row, next the columnname and
    // then the third the cell value.
    String tmp = v.toString();
    if (tmp.length() == 0) {
      return;
    }
    String [] splits = v.toString().split(" ");
    MapWritable mw = new MapWritable();
    mw.put(new Text(splits[1]),
      new ImmutableBytesWritable(splits[2].getBytes()));
    String row = splits[0];
    r.setStatus("Map emitting " + row + " for record " + k.toString());
    output.collect(new Text(row), mw);
  }

  public static class TableUploader
  extends TableReduce<Text, MapWritable> {
    @Override
    public void reduce(Text k, Iterator v,
      OutputCollector output, Reporter r)
    throws IOException {
      while (v.hasNext()) {
        r.setStatus("Reducer committing " + k);
        output.collect(k, v.next());
      }
    }
  }

  static int printUsage() {
    System.out.println(NAME + "<input> <table_name>");
    return -1;
  } 

  public int run(@SuppressWarnings("unused") String[] args) throws Exception {
    // Make sure there are exactly 2 parameters left.
    if (args.length != 2) {
      System.out.println("ERROR: Wrong number of parameters: " +
        args.length + " instead of 2.");
      return printUsage();
    }
    JobClient.runJob(createSubmittableJob(args));
    return 0;
  }

  public Configuration getConf() {
    return this.conf;
  } 

  public void setConf(final Configuration c) {
    this.conf = c;
  }

  public static void main(String[] args) throws Exception {
    int errCode = ToolRunner.run(new Configuration(), new SampleUploader(),
      args);
    System.exit(errCode);
  }
}



Copyright © 2008 The Apache Software Foundation