wiki:waue/2010/0201
HBase 程式設計
TableMapper 使用方式

example 1

public static void main(String[] args) throws Exception {
Job myJob = new Job();
myJob.setJobName("myJob");
myJob.setJarByClass(MyClass.class);

myJob.setMapOutputKeyClass(Text.class);
myJob.setMapOutputValueClass(Text.class);

myJob.setOutputKeyClass(Text.class);
myJob.setOutputValueClass(Put.class);

Scan myScan = new Scan("".getBytes(),"12345".getBytes());
myScan.addColumn("Resume:Text".getBytes());

TableMapReduceUtil.initTableMapperJob("inputTable", myScan, Map.class, Text.class, Text.class, myJob);
TableMapReduceUtil.initTableReducerJob("outputTable", Reduce.class, myJob);

myJob.setMapperClass(Map.class);
myJob.setReducerClass(Reduce.class);

myJob.setInputFormatClass(TableInputFormat.class);
myJob.setOutputFormatClass(TableOutputFormat.class);

myJob.setNumReduceTasks(12);

myJob.submit();

while(!myJob.isComplete()) {
Thread.currentThread().sleep(10000);
System.out.println("Map: " + (myJob.mapProgress() * 100) + "% ... Reduce: " + (myJob.reduceProgress() *
100) + "%");
}

if(myJob.isSuccessful()) {
System.out.println("Job Successful.");
} else {
System.out.println("Job Failed.");
}
}

public static class Map extends TableMapper<Text,Text> {
  public void map(ImmutableBytesWritable key, Result value, Mapper.Context context) throws
IOException, InterruptedException {
  }
}

public static class Reduce extends TableReducer<Writable,Writable,Put> {
  public void reduce(Text key, Iterable<Text> values, Reducer.Context context) throws IOException,
InterruptedException {
  }
}

Example 2

    public void map(ImmutableBytesWritable row, Result value,
      Context context)
    throws IOException {
      try {
        context.write(row, resultToPut(row, value));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

完整

/**
 * Copyright 2009 The Apache Software Foundation
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * Import data written by {@link Export}.
 */
public class Import {
  final static String NAME = "import";

  /**
   * Write table content out to files in hdfs.
   */
  static class Importer
  extends TableMapper<ImmutableBytesWritable, Put> {
    /**
     * @param row  The current table row key.
     * @param value  The columns.
     * @param context  The current context.
     * @throws IOException When something is broken with the data.
     * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, 
     *   org.apache.hadoop.mapreduce.Mapper.Context)
     */
    @Override
    public void map(ImmutableBytesWritable row, Result value,
      Context context)
    throws IOException {
      try {
        context.write(row, resultToPut(row, value));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    private static Put resultToPut(ImmutableBytesWritable key, Result result) 
    throws IOException {
      Put put = new Put(key.get());
      for (KeyValue kv : result.raw()) {
        put.add(kv);
      }
      return put;
    }
  }

  /**
   * Sets up the actual job.
   * 
   * @param conf  The current configuration.
   * @param args  The command line parameters.
   * @return The newly created job.
   * @throws IOException When setting up the job fails.
   */
  public static Job createSubmittableJob(Configuration conf, String[] args) 
  throws IOException {
    String tableName = args[0];
    Path inputDir = new Path(args[1]);
    Job job = new Job(conf, NAME + "_" + tableName);
    job.setJarByClass(Importer.class);
    FileInputFormat.setInputPaths(job, inputDir);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setMapperClass(Importer.class);
    // No reducers.  Just write straight to table.  Call initTableReducerJob
    // because it sets up the TableOutputFormat.
    TableMapReduceUtil.initTableReducerJob(tableName, null, job);
    job.setNumReduceTasks(0);
    return job;
  }

  /*
   * @param errorMsg Error message.  Can be null.
   */
  private static void usage(final String errorMsg) {
    if (errorMsg != null && errorMsg.length() > 0) {
      System.err.println("ERROR: " + errorMsg);
    }
    System.err.println("Usage: Import <tablename> <inputdir>");
  }

  /**
   * Main entry point.
   * 
   * @param args  The command line parameters.
   * @throws Exception When running the job fails.
   */
  public static void main(String[] args) throws Exception {
    HBaseConfiguration conf = new HBaseConfiguration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      usage("Wrong number of arguments: " + otherArgs.length);
      System.exit(-1);
    }
    Job job = createSubmittableJob(conf, otherArgs);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Example 3

  public static class ComputeSimilarity extends
      TableMapper<ImmutableBytesWritable, Put> implements Configurable {
    private Configuration conf = null;
    private DenseMatrix matrix;

    public void map(ImmutableBytesWritable key, Result value, Context context)
        throws IOException, InterruptedException {
      DenseVector v = new DenseVector(value);

      Put put = new Put(key.get());
      for (int i = 0; i < matrix.getRows(); i++) {
        double dotProduct = matrix.getRow(i).dot(v);
        if (BytesUtil.getRowIndex(key.get()) == i) {
          dotProduct = 0;
        }
        put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String
            .valueOf(i)), Bytes.toBytes(dotProduct));
      }

      context.write(key, put);
    }

完整

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.matrix.DenseMatrix;
import org.apache.hama.matrix.DenseVector;
import org.apache.hama.matrix.Matrix;
import org.apache.hama.matrix.Vector;
import org.apache.hama.util.BytesUtil;
import org.apache.log4j.Logger;

public class TestCosineSimilarityMatrix extends HamaCluster {
  static final Logger LOG = Logger.getLogger(TestCosineSimilarityMatrix.class);
  private int SIZE = 10;
  private Matrix m1;
  private Matrix symmetricMatrix;
  private HamaConfiguration conf;

  /**
   * @throws UnsupportedEncodingException
   */
  public TestCosineSimilarityMatrix() throws UnsupportedEncodingException {
    super();
  }

  public void setUp() throws Exception {
    super.setUp();

    conf = getConf();

    m1 = DenseMatrix.random(conf, SIZE, SIZE);
    symmetricMatrix = new DenseMatrix(conf, SIZE, SIZE);
  }

  public void testCosineSimilarity() throws IOException {
    Job job = new Job(conf, "set MR job test");
    job.getConfiguration().set("input.matrix", m1.getPath());

    Scan scan = new Scan();
    scan.addFamily(Constants.COLUMNFAMILY);

    TableMapReduceUtil.initTableMapperJob(m1.getPath(), scan,
        ComputeSimilarity.class, ImmutableBytesWritable.class, Put.class, job);
    TableMapReduceUtil.initTableReducerJob(symmetricMatrix.getPath(),
        IdentityTableReducer.class, job);
    job.setNumReduceTasks(0);
    try {
      job.waitForCompletion(true);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ClassNotFoundException e) {
      e.printStackTrace();
    }

    Vector v1 = m1.getRow(0);
    Vector v2 = m1.getRow(2);
    assertEquals(v1.dot(v2), symmetricMatrix.get(0, 2));
  }

  public static class ComputeSimilarity extends
      TableMapper<ImmutableBytesWritable, Put> implements Configurable {
    private Configuration conf = null;
    private DenseMatrix matrix;

    public void map(ImmutableBytesWritable key, Result value, Context context)
        throws IOException, InterruptedException {
      DenseVector v = new DenseVector(value);

      Put put = new Put(key.get());
      for (int i = 0; i < matrix.getRows(); i++) {
        double dotProduct = matrix.getRow(i).dot(v);
        if (BytesUtil.getRowIndex(key.get()) == i) {
          dotProduct = 0;
        }
        put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String
            .valueOf(i)), Bytes.toBytes(dotProduct));
      }

      context.write(key, put);
    }

    @Override
    public Configuration getConf() {
      return conf;
    }

    @Override
    public void setConf(Configuration conf) {
      this.conf = conf;
      try {
        matrix = new DenseMatrix(new HamaConfiguration(conf), conf
            .get("input.matrix"));
      } catch (IOException e) {
        e.printStackTrace();
      }

    }
  }
}
Last modified 15 years ago Last modified on Feb 1, 2010, 2:54:25 PM