source: sample/hadoop-0.16/tw/org/nchc/code/WordCountIntoHBase.java @ 27

Last change on this file since 27 was 27, checked in by waue, 16 years ago

test!

File size: 3.7 KB
RevLine 
[7]1/**
2 * Program: WordCountIntoHBase.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
[18]5 * Last Update Date: 07/02/2008
[7]6 */
7
8/**
9 * Purpose :
10 *  Store every line from $Input_Path to HBase
11 *
12 * HowToUse :
13 *  Make sure Hadoop file system and HBase are running correctly.
14 *  Use Hadoop instruction to add input-text-files to $Input_Path.
15 *  ($ bin/hadoop dfs -put local_dir hdfs_dir)
16 *  Then run the program with BuildHTable.java after \
17 *  modifying these setup parameters.
18 *
19 * Check Result :
20 *  View the result by hbase instruction (hql> select * from $Table_Name).
21 *  Or run WordCountFromHBase.java then inspect http://localhost:60070 by web explorer;
22 */
23
[8]24package tw.org.nchc.code;
[7]25
26import java.io.IOException;
27import java.util.Iterator;
28
29import org.apache.hadoop.fs.Path;
30import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31import org.apache.hadoop.hbase.mapred.TableReduce;
32import org.apache.hadoop.io.LongWritable;
33import org.apache.hadoop.io.MapWritable;
34import org.apache.hadoop.io.Text;
35import org.apache.hadoop.mapred.JobClient;
36import org.apache.hadoop.mapred.JobConf;
37import org.apache.hadoop.mapred.OutputCollector;
38import org.apache.hadoop.mapred.Reporter;
39import org.apache.hadoop.mapred.lib.IdentityMapper;
40import org.apache.hadoop.mapred.lib.IdentityReducer;
41
42public class WordCountIntoHBase {
43
44  /* setup parameters */
[18]45  // $Input_Path. Please make sure the path is correct and contains input
46  // files
[7]47  static final String Input_Path = "/user/waue/simple";
[18]48
[7]49  // Hbase table name, the program will create it
50  static final String Table_Name = "word_count5";
[18]51
[7]52  // column name, the program will create it
[18]53  static final String colstr = "word:text";
54
[7]55  // constructor
56  private WordCountIntoHBase() {
57  }
58
59  private static class ReduceClass extends TableReduce<LongWritable, Text> {
60    // set (column_family:column_qualify)
61    private static final Text col = new Text(WordCountIntoHBase.colstr);
[18]62
[7]63    // this map holds the columns per row
64    private MapWritable map = new MapWritable();
[18]65
[7]66    public void reduce(LongWritable key, Iterator<Text> values,
67        OutputCollector<Text, MapWritable> output, Reporter reporter)
68        throws IOException {
69      // contents must be ImmutableBytesWritable
[18]70      ImmutableBytesWritable bytes = new ImmutableBytesWritable(values
71          .next().getBytes());
[7]72      map.clear();
[18]73      // write data
[7]74      map.put(col, bytes);
75      // add the row with the key as the row id
76      output.collect(new Text(key.toString()), map);
77    }
78  }
79
80  /**
81   * Runs the demo.
82   */
[18]83  public static void main(String[] args) throws IOException {
[7]84    // parse colstr to split column family and column qualify
85    String tmp[] = colstr.split(":");
[18]86    String Column_Family = tmp[0] + ":";
87    String CF[] = { Column_Family };
88    // check whether create table or not , we don't admit \
[7]89    // the same name but different structure
[18]90    BuildHTable build_table = new BuildHTable(Table_Name, CF);
[7]91    if (!build_table.checkTableExist(Table_Name)) {
92      if (!build_table.createTable()) {
93        System.out.println("create table error !");
94      }
[18]95    } else {
96      System.out.println("Table \"" + Table_Name
97          + "\" has already existed !");
[7]98    }
99    int mapTasks = 1;
100    int reduceTasks = 1;
101    JobConf conf = new JobConf(WordCountIntoHBase.class);
102    conf.setJobName(Table_Name);
103
104    // must initialize the TableReduce before running job
105    TableReduce.initJob(Table_Name, ReduceClass.class, conf);
106    conf.setNumMapTasks(mapTasks);
107    conf.setNumReduceTasks(reduceTasks);
[27]108    conf.setInputPath(new Path(Input_Path));   
[7]109    conf.setMapperClass(IdentityMapper.class);
110    conf.setCombinerClass(IdentityReducer.class);
111    conf.setReducerClass(ReduceClass.class);
112
113    JobClient.runJob(conf);
114  }
115}
Note: See TracBrowser for help on using the repository browser.