Changes between Initial Version and Version 1 of waue/2010/0201


Ignore:
Timestamp:
Feb 1, 2010, 2:54:25 PM (15 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0201

    v1 v1  
     1{{{
     2#!html
     3<div style="text-align: center; color:#151B8D"><big style="font-weight: bold;"><big><big>
     4HBase 程式設計
     5</big></big></big></div> <div style="text-align: center; color:#7E2217"><big style="font-weight: bold;"><big>
     6TableMapper 使用方式
     7</big></big></div>
     8}}}
     9[[PageOutline]]
     10
     11 = example 1 =
     12
     13{{{
     14#!java
     15public static void main(String[] args) throws Exception {
     16Job myJob = new Job();
     17myJob.setJobName("myJob");
     18myJob.setJarByClass(MyClass.class);
     19
     20myJob.setMapOutputKeyClass(Text.class);
     21myJob.setMapOutputValueClass(Text.class);
     22
     23myJob.setOutputKeyClass(Text.class);
     24myJob.setOutputValueClass(Put.class);
     25
     26Scan myScan = new Scan("".getBytes(),"12345".getBytes());
     27myScan.addColumn("Resume:Text".getBytes());
     28
     29TableMapReduceUtil.initTableMapperJob("inputTable", myScan, Map.class, Text.class, Text.class, myJob);
     30TableMapReduceUtil.initTableReducerJob("outputTable", Reduce.class, myJob);
     31
     32myJob.setMapperClass(Map.class);
     33myJob.setReducerClass(Reduce.class);
     34
     35myJob.setInputFormatClass(TableInputFormat.class);
     36myJob.setOutputFormatClass(TableOutputFormat.class);
     37
     38myJob.setNumReduceTasks(12);
     39
     40myJob.submit();
     41
     42while(!myJob.isComplete()) {
     43Thread.currentThread().sleep(10000);
     44System.out.println("Map: " + (myJob.mapProgress() * 100) + "% ... Reduce: " + (myJob.reduceProgress() *
     45100) + "%");
     46}
     47
     48if(myJob.isSuccessful()) {
     49System.out.println("Job Successful.");
     50} else {
     51System.out.println("Job Failed.");
     52}
     53}
     54
     55public static class Map extends TableMapper<Text,Text> {
     56  public void map(ImmutableBytesWritable key, Result value, Mapper.Context context) throws
     57IOException, InterruptedException {
     58  }
     59}
     60
     61public static class Reduce extends TableReducer<Writable,Writable,Put> {
     62  public void reduce(Text key, Iterable<Text> values, Reducer.Context context) throws IOException,
     63InterruptedException {
     64  }
     65}
     66
     67}}}
     68
     69
     70 = Example 2 =
     71
     72{{{
     73#!java
     74    public void map(ImmutableBytesWritable row, Result value,
     75      Context context)
     76    throws IOException {
     77      try {
     78        context.write(row, resultToPut(row, value));
     79      } catch (InterruptedException e) {
     80        e.printStackTrace();
     81      }
     82    }
     83}}}
     84
     85 == 完整 ==
     86
     87{{{
     88#!java
     89/**
     90 * Copyright 2009 The Apache Software Foundation
     91 *
     92 * Licensed to the Apache Software Foundation (ASF) under one
     93 * or more contributor license agreements.  See the NOTICE file
     94 * distributed with this work for additional information
     95 * regarding copyright ownership.  The ASF licenses this file
     96 * to you under the Apache License, Version 2.0 (the
     97 * "License"); you may not use this file except in compliance
     98 * with the License.  You may obtain a copy of the License at
     99 *
     100 *     http://www.apache.org/licenses/LICENSE-2.0
     101 *
     102 * Unless required by applicable law or agreed to in writing, software
     103 * distributed under the License is distributed on an "AS IS" BASIS,
     104 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     105 * See the License for the specific language governing permissions and
     106 * limitations under the License.
     107 */
     108package org.apache.hadoop.hbase.mapreduce;
     109
     110import java.io.IOException;
     111
     112import org.apache.hadoop.conf.Configuration;
     113import org.apache.hadoop.fs.Path;
     114import org.apache.hadoop.hbase.HBaseConfiguration;
     115import org.apache.hadoop.hbase.KeyValue;
     116import org.apache.hadoop.hbase.client.Put;
     117import org.apache.hadoop.hbase.client.Result;
     118import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     119import org.apache.hadoop.mapreduce.Job;
     120import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     121import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
     122import org.apache.hadoop.util.GenericOptionsParser;
     123
     124/**
     125 * Import data written by {@link Export}.
     126 */
     127public class Import {
     128  final static String NAME = "import";
     129
     130  /**
     131   * Write table content out to files in hdfs.
     132   */
     133  static class Importer
     134  extends TableMapper<ImmutableBytesWritable, Put> {
     135    /**
     136     * @param row  The current table row key.
     137     * @param value  The columns.
     138     * @param context  The current context.
     139     * @throws IOException When something is broken with the data.
     140     * @see org.apache.hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN,
     141     *   org.apache.hadoop.mapreduce.Mapper.Context)
     142     */
     143    @Override
     144    public void map(ImmutableBytesWritable row, Result value,
     145      Context context)
     146    throws IOException {
     147      try {
     148        context.write(row, resultToPut(row, value));
     149      } catch (InterruptedException e) {
     150        e.printStackTrace();
     151      }
     152    }
     153
     154    private static Put resultToPut(ImmutableBytesWritable key, Result result)
     155    throws IOException {
     156      Put put = new Put(key.get());
     157      for (KeyValue kv : result.raw()) {
     158        put.add(kv);
     159      }
     160      return put;
     161    }
     162  }
     163
     164  /**
     165   * Sets up the actual job.
     166   *
     167   * @param conf  The current configuration.
     168   * @param args  The command line parameters.
     169   * @return The newly created job.
     170   * @throws IOException When setting up the job fails.
     171   */
     172  public static Job createSubmittableJob(Configuration conf, String[] args)
     173  throws IOException {
     174    String tableName = args[0];
     175    Path inputDir = new Path(args[1]);
     176    Job job = new Job(conf, NAME + "_" + tableName);
     177    job.setJarByClass(Importer.class);
     178    FileInputFormat.setInputPaths(job, inputDir);
     179    job.setInputFormatClass(SequenceFileInputFormat.class);
     180    job.setMapperClass(Importer.class);
     181    // No reducers.  Just write straight to table.  Call initTableReducerJob
     182    // because it sets up the TableOutputFormat.
     183    TableMapReduceUtil.initTableReducerJob(tableName, null, job);
     184    job.setNumReduceTasks(0);
     185    return job;
     186  }
     187
     188  /*
     189   * @param errorMsg Error message.  Can be null.
     190   */
     191  private static void usage(final String errorMsg) {
     192    if (errorMsg != null && errorMsg.length() > 0) {
     193      System.err.println("ERROR: " + errorMsg);
     194    }
     195    System.err.println("Usage: Import <tablename> <inputdir>");
     196  }
     197
     198  /**
     199   * Main entry point.
     200   *
     201   * @param args  The command line parameters.
     202   * @throws Exception When running the job fails.
     203   */
     204  public static void main(String[] args) throws Exception {
     205    HBaseConfiguration conf = new HBaseConfiguration();
     206    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     207    if (otherArgs.length < 2) {
     208      usage("Wrong number of arguments: " + otherArgs.length);
     209      System.exit(-1);
     210    }
     211    Job job = createSubmittableJob(conf, otherArgs);
     212    System.exit(job.waitForCompletion(true) ? 0 : 1);
     213  }
     214}
     215}}}
     216
     217 = Example 3 =
     218 
     219{{{
     220#!java
     221  public static class ComputeSimilarity extends
     222      TableMapper<ImmutableBytesWritable, Put> implements Configurable {
     223    private Configuration conf = null;
     224    private DenseMatrix matrix;
     225
     226    public void map(ImmutableBytesWritable key, Result value, Context context)
     227        throws IOException, InterruptedException {
     228      DenseVector v = new DenseVector(value);
     229
     230      Put put = new Put(key.get());
     231      for (int i = 0; i < matrix.getRows(); i++) {
     232        double dotProduct = matrix.getRow(i).dot(v);
     233        if (BytesUtil.getRowIndex(key.get()) == i) {
     234          dotProduct = 0;
     235        }
     236        put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String
     237            .valueOf(i)), Bytes.toBytes(dotProduct));
     238      }
     239
     240      context.write(key, put);
     241    }
     242}}}
     243
     244 == 完整 ==
     245{{{
     246#!java
     247import java.io.IOException;
     248import java.io.UnsupportedEncodingException;
     249
     250import org.apache.hadoop.conf.Configurable;
     251import org.apache.hadoop.conf.Configuration;
     252import org.apache.hadoop.hbase.client.Put;
     253import org.apache.hadoop.hbase.client.Result;
     254import org.apache.hadoop.hbase.client.Scan;
     255import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     256import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
     257import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
     258import org.apache.hadoop.hbase.mapreduce.TableMapper;
     259import org.apache.hadoop.hbase.util.Bytes;
     260import org.apache.hadoop.mapreduce.Job;
     261import org.apache.hama.Constants;
     262import org.apache.hama.HamaCluster;
     263import org.apache.hama.HamaConfiguration;
     264import org.apache.hama.matrix.DenseMatrix;
     265import org.apache.hama.matrix.DenseVector;
     266import org.apache.hama.matrix.Matrix;
     267import org.apache.hama.matrix.Vector;
     268import org.apache.hama.util.BytesUtil;
     269import org.apache.log4j.Logger;
     270
     271public class TestCosineSimilarityMatrix extends HamaCluster {
     272  static final Logger LOG = Logger.getLogger(TestCosineSimilarityMatrix.class);
     273  private int SIZE = 10;
     274  private Matrix m1;
     275  private Matrix symmetricMatrix;
     276  private HamaConfiguration conf;
     277
     278  /**
     279   * @throws UnsupportedEncodingException
     280   */
     281  public TestCosineSimilarityMatrix() throws UnsupportedEncodingException {
     282    super();
     283  }
     284
     285  public void setUp() throws Exception {
     286    super.setUp();
     287
     288    conf = getConf();
     289
     290    m1 = DenseMatrix.random(conf, SIZE, SIZE);
     291    symmetricMatrix = new DenseMatrix(conf, SIZE, SIZE);
     292  }
     293
     294  public void testCosineSimilarity() throws IOException {
     295    Job job = new Job(conf, "set MR job test");
     296    job.getConfiguration().set("input.matrix", m1.getPath());
     297
     298    Scan scan = new Scan();
     299    scan.addFamily(Constants.COLUMNFAMILY);
     300
     301    TableMapReduceUtil.initTableMapperJob(m1.getPath(), scan,
     302        ComputeSimilarity.class, ImmutableBytesWritable.class, Put.class, job);
     303    TableMapReduceUtil.initTableReducerJob(symmetricMatrix.getPath(),
     304        IdentityTableReducer.class, job);
     305    job.setNumReduceTasks(0);
     306    try {
     307      job.waitForCompletion(true);
     308    } catch (InterruptedException e) {
     309      e.printStackTrace();
     310    } catch (ClassNotFoundException e) {
     311      e.printStackTrace();
     312    }
     313
     314    Vector v1 = m1.getRow(0);
     315    Vector v2 = m1.getRow(2);
     316    assertEquals(v1.dot(v2), symmetricMatrix.get(0, 2));
     317  }
     318
     319  public static class ComputeSimilarity extends
     320      TableMapper<ImmutableBytesWritable, Put> implements Configurable {
     321    private Configuration conf = null;
     322    private DenseMatrix matrix;
     323
     324    public void map(ImmutableBytesWritable key, Result value, Context context)
     325        throws IOException, InterruptedException {
     326      DenseVector v = new DenseVector(value);
     327
     328      Put put = new Put(key.get());
     329      for (int i = 0; i < matrix.getRows(); i++) {
     330        double dotProduct = matrix.getRow(i).dot(v);
     331        if (BytesUtil.getRowIndex(key.get()) == i) {
     332          dotProduct = 0;
     333        }
     334        put.add(Constants.COLUMNFAMILY, Bytes.toBytes(String
     335            .valueOf(i)), Bytes.toBytes(dotProduct));
     336      }
     337
     338      context.write(key, put);
     339    }
     340
     341    @Override
     342    public Configuration getConf() {
     343      return conf;
     344    }
     345
     346    @Override
     347    public void setConf(Configuration conf) {
     348      this.conf = conf;
     349      try {
     350        matrix = new DenseMatrix(new HamaConfiguration(conf), conf
     351            .get("input.matrix"));
     352      } catch (IOException e) {
     353        e.printStackTrace();
     354      }
     355
     356    }
     357  }
     358}
     359}}}
     360