source: sample/hadoop-0.17/tw/org/nchc/code/HBaseRecord2.java @ 20

Last change on this file since 20 was 20, checked in by waue, 16 years ago

將改完的 hadoop 0.17版package 放來備份
目前繼續開發 hadoop 0.16 + hbase 1.3

File size: 5.6 KB
Line 
1/**
2 * Program: HBaseRecord.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/01/2008
6 * Upgrade to 0.17
7 */
8
9/**
10 * Purpose :
11 *  Parse your record and then store in HBase.
12 *
13 * HowToUse :
14 *  Make sure Hadoop file system and Hbase are running correctly.
15 *  1. put test.txt in t1 directory which content is
16  ---------------
17  name:locate:years
18  waue:taiwan:1981
19  shellon:taiwan:1981
20  ---------------
21 *  2. hadoop_root/$ bin/hadoop dfs -put t1 t1
22    ----------------
23 * Check Result:
24 *  Go to hbase console, type :
25 *    hql > select * from t1_table;
2608/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key
27+-------------------------+-------------------------+-------------------------+
28| Row                     | Column                  | Cell                    |
29+-------------------------+-------------------------+-------------------------+
30| 0                       | person:locate           | locate                  |
31+-------------------------+-------------------------+-------------------------+
32| 0                       | person:name             | name                    |
33+-------------------------+-------------------------+-------------------------+
34| 0                       | person:years            | years                   |
35+-------------------------+-------------------------+-------------------------+
36| 19                      | person:locate           | taiwan                  |
37+-------------------------+-------------------------+-------------------------+
38| 19                      | person:name             | waue                    |
39+-------------------------+-------------------------+-------------------------+
40| 19                      | person:years            | 1981                    |
41+-------------------------+-------------------------+-------------------------+
42| 36                      | person:locate           | taiwan                  |
43+-------------------------+-------------------------+-------------------------+
44| 36                      | person:name             | shellon                 |
45+-------------------------+-------------------------+-------------------------+
46| 36                      | person:years            | 1981                    |
47+-------------------------+-------------------------+-------------------------+
483 row(s) in set. (0.04 sec)
49 */
50
51
52
53
54package tw.org.nchc.code;
55
56import java.io.IOException;
57import java.util.Iterator;
58
59import org.apache.hadoop.fs.Path;
60import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
61import org.apache.hadoop.hbase.mapred.TableReduce;
62import org.apache.hadoop.io.LongWritable;
63import org.apache.hadoop.io.MapWritable;
64import org.apache.hadoop.io.Text;
65import org.apache.hadoop.mapred.JobClient;
66import org.apache.hadoop.mapred.JobConf;
67import org.apache.hadoop.mapred.OutputCollector;
68import org.apache.hadoop.mapred.Reporter;
69import org.apache.hadoop.mapred.lib.IdentityMapper;
70import org.apache.hadoop.mapred.lib.IdentityReducer;
71
72
73public class HBaseRecord2 {
74
75  /* Denify parameter */
76  static String[] bf = {"person:name","person:local","person:birthyear"};
77  // file path in hadoop file system (not phisical file system)
78  String file_path = "/user/waue/t1/test.txt";
79  // Hbase table name
80  String table_name = "testtable";
81 
82 
83  // setup MapTask and Reduce Task
84  int mapTasks = 1;
85  int reduceTasks = 1;
86 
87  private static class ReduceClass extends TableReduce<LongWritable, Text> {
88
89
90   
91    // on this sample, map is nonuse, we use reduce to handle
92    public void reduce(LongWritable key, Iterator<Text> values,
93        OutputCollector<Text, MapWritable> output, Reporter reporter)
94        throws IOException {
95      // this map holds the columns per row
96      MapWritable map = new MapWritable(); 
97      // values.next().getByte() can get value and transfer to byte form,
98      String stro = new String(values.next().getBytes());
99      String str[] = stro.split(":");
100     
101      int length = bf.length;
102     
103      // Column id is created dymanically,
104      Text[] col_n = new Text[length];
105      byte[][] b_l = new byte[length][];
106      // contents must be ImmutableBytesWritable
107      ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
108      map.clear();
109      for(int i = 0; i < length; i++){
110        col_n[i] = new Text(bf[i]);
111        b_l[i] = str[i].getBytes();
112        w_l[i] = new ImmutableBytesWritable(b_l[i]);
113        // populate the current row
114        map.put(col_n[i], w_l[i]);
115      }
116      // add the row with the key as the row id
117      output.collect(new Text(key.toString()), map);
118    }
119  }
120
121  private HBaseRecord2() {
122  }
123
124  /**
125   * Runs the demo.
126   */
127  public static void main(String[] args) throws IOException {
128
129   
130    HBaseRecord2 setup = new HBaseRecord2();
131    String[] tmp = bf[0].split(":");
132    String[] CF = {tmp[0]};
133    BuildHTable build_table = new BuildHTable(setup.table_name, CF);
134    if (!build_table.checkTableExist(setup.table_name)) {
135      if (!build_table.createTable()) {
136        System.out.println("create table error !");
137      }
138    } else {
139      System.out.println("Table \"" + setup.table_name
140          + "\" has already existed !");
141    }
142   
143    JobConf conf = new JobConf(HBaseRecord2.class);
144
145    //Job name; you can modify to any you like 
146    conf.setJobName("PersonDataBase");
147
148    // Hbase table name must be correct , in our profile is t1_table
149    TableReduce.initJob(setup.table_name, ReduceClass.class, conf);
150   
151    // below are map-reduce profile
152    conf.setNumMapTasks(setup.mapTasks);
153    conf.setNumReduceTasks(setup.reduceTasks);
154    // 0.16
155//    conf.setInputPath(new Path(setup.file_path));
156    Convert.setInputPath(conf, new Path(setup.file_path));
157    conf.setMapperClass(IdentityMapper.class);
158    conf.setCombinerClass(IdentityReducer.class);
159    conf.setReducerClass(ReduceClass.class);
160    JobClient.runJob(conf);
161  }
162}
Note: See TracBrowser for help on using the repository browser.