source: sample/hadoop-0.17/tw/org/nchc/code/WordCountIntoHBase.java @ 229

Last change on this file since 229 was 20, checked in by waue, 16 years ago

將改完的 hadoop 0.17版package 放來備份
目前繼續開發 hadoop 0.16 + hbase 1.3

File size: 3.8 KB
Line 
1/**
2 * Program: WordCountIntoHBase.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 * Upgrade to 0.17
7 */
8
9/**
10 * Purpose :
11 *  Store every line from $Input_Path to HBase
12 *
13 * HowToUse :
14 *  Make sure Hadoop file system and HBase are running correctly.
15 *  Use Hadoop instruction to add input-text-files to $Input_Path.
16 *  ($ bin/hadoop dfs -put local_dir hdfs_dir)
17 *  Then run the program with BuildHTable.java after \
18 *  modifying these setup parameters.
19 *
20 * Check Result :
21 *  View the result by hbase instruction (hql> select * from $Table_Name).
22 *  Or run WordCountFromHBase.java then inspect http://localhost:60070 by web explorer;
23 */
24
25package tw.org.nchc.code;
26
27import java.io.IOException;
28import java.util.Iterator;
29
30import org.apache.hadoop.fs.Path;
31import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32import org.apache.hadoop.hbase.mapred.TableReduce;
33import org.apache.hadoop.io.LongWritable;
34import org.apache.hadoop.io.MapWritable;
35import org.apache.hadoop.io.Text;
36import org.apache.hadoop.mapred.JobClient;
37import org.apache.hadoop.mapred.JobConf;
38import org.apache.hadoop.mapred.OutputCollector;
39import org.apache.hadoop.mapred.Reporter;
40import org.apache.hadoop.mapred.lib.IdentityMapper;
41import org.apache.hadoop.mapred.lib.IdentityReducer;
42
43public class WordCountIntoHBase {
44
45  /* setup parameters */
46  // $Input_Path. Please make sure the path is correct and contains input
47  // files
48  static final String Input_Path = "/user/waue/simple";
49
50  // Hbase table name, the program will create it
51  static final String Table_Name = "word_count5";
52
53  // column name, the program will create it
54  static final String colstr = "word:text";
55
56  // constructor
57  private WordCountIntoHBase() {
58  }
59
60  private static class ReduceClass extends TableReduce<LongWritable, Text> {
61    // set (column_family:column_qualify)
62    private static final Text col = new Text(WordCountIntoHBase.colstr);
63
64    // this map holds the columns per row
65    private MapWritable map = new MapWritable();
66
67    public void reduce(LongWritable key, Iterator<Text> values,
68        OutputCollector<Text, MapWritable> output, Reporter reporter)
69        throws IOException {
70      // contents must be ImmutableBytesWritable
71      ImmutableBytesWritable bytes = new ImmutableBytesWritable(values
72          .next().getBytes());
73      map.clear();
74      // write data
75      map.put(col, bytes);
76      // add the row with the key as the row id
77      output.collect(new Text(key.toString()), map);
78    }
79  }
80
81  /**
82   * Runs the demo.
83   */
84  public static void main(String[] args) throws IOException {
85    // parse colstr to split column family and column qualify
86    String tmp[] = colstr.split(":");
87    String Column_Family = tmp[0] + ":";
88    String CF[] = { Column_Family };
89    // check whether create table or not , we don't admit \
90    // the same name but different structure
91    BuildHTable build_table = new BuildHTable(Table_Name, CF);
92    if (!build_table.checkTableExist(Table_Name)) {
93      if (!build_table.createTable()) {
94        System.out.println("create table error !");
95      }
96    } else {
97      System.out.println("Table \"" + Table_Name
98          + "\" has already existed !");
99    }
100    int mapTasks = 1;
101    int reduceTasks = 1;
102    JobConf conf = new JobConf(WordCountIntoHBase.class);
103    conf.setJobName(Table_Name);
104
105    // must initialize the TableReduce before running job
106    TableReduce.initJob(Table_Name, ReduceClass.class, conf);
107    conf.setNumMapTasks(mapTasks);
108    conf.setNumReduceTasks(reduceTasks);
109    // 0.16
110    // conf.setInputPath(new Path(Input_Path));
111    Convert.setInputPath(conf, new Path(Input_Path));
112    conf.setMapperClass(IdentityMapper.class);
113    conf.setCombinerClass(IdentityReducer.class);
114    conf.setReducerClass(ReduceClass.class);
115
116    JobClient.runJob(conf);
117  }
118}
Note: See TracBrowser for help on using the repository browser.