source: sample/HBaseRecordPro.java @ 16

Last change on this file since 16 was 16, checked in by waue, 16 years ago

This program can be able to work !

beta

File size: 7.8 KB
Line 
1/**
2 * Program: HBaseRecord.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 06/01/2008
6 */
7
8/**
9 * Purpose :
10 *  Parse your record and then store in HBase.
11 *
12 * HowToUse :
13 *  Make sure Hadoop file system and Hbase are running correctly.
14 *  1. put test.txt in t1 directory which content is
15 ---------------
16 name:locate:years
17 waue:taiwan:1981
18 shellon:taiwan:1981
19 ---------------
20 *  2. hadoop_root/$ bin/hadoop dfs -put t1 t1
21 *  3. hbase_root/$ bin/hbase shell
22 *  4. hql > create table t1_table("person");
23 *  5. Come to Eclipse and run this code, and we will let database as that
24 t1_table -> person
25 ----------------
26 |  name | locate | years |
27 | waue  | taiwan | 1981 |
28 | shellon | taiwan | 1981 |
29 ----------------
30 * Check Result:
31 *  Go to hbase console, type :
32 *    hql > select * from t1_table;
33 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key
34 +-------------------------+-------------------------+-------------------------+
35 | Row                     | Column                  | Cell                    |
36 +-------------------------+-------------------------+-------------------------+
37 | 0                       | person:locate           | locate                  |
38 +-------------------------+-------------------------+-------------------------+
39 | 0                       | person:name             | name                    |
40 +-------------------------+-------------------------+-------------------------+
41 | 0                       | person:years            | years                   |
42 +-------------------------+-------------------------+-------------------------+
43 | 19                      | person:locate           | taiwan                  |
44 +-------------------------+-------------------------+-------------------------+
45 | 19                      | person:name             | waue                    |
46 +-------------------------+-------------------------+-------------------------+
47 | 19                      | person:years            | 1981                    |
48 +-------------------------+-------------------------+-------------------------+
49 | 36                      | person:locate           | taiwan                  |
50 +-------------------------+-------------------------+-------------------------+
51 | 36                      | person:name             | shellon                 |
52 +-------------------------+-------------------------+-------------------------+
53 | 36                      | person:years            | 1981                    |
54 +-------------------------+-------------------------+-------------------------+
55 3 row(s) in set. (0.04 sec)
56 */
57
58package tw.org.nchc.code;
59
60import java.io.BufferedReader;
61import java.io.BufferedWriter;
62import java.io.File;
63import java.io.FileOutputStream;
64import java.io.FileReader;
65import java.io.FileWriter;
66import java.io.IOException;
67import java.util.Iterator;
68import org.apache.hadoop.fs.FileSystem;
69import org.apache.hadoop.fs.Path;
70import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
71import org.apache.hadoop.hbase.mapred.TableReduce;
72import org.apache.hadoop.io.LongWritable;
73import org.apache.hadoop.io.MapWritable;
74import org.apache.hadoop.io.Text;
75import org.apache.hadoop.mapred.JobClient;
76import org.apache.hadoop.mapred.JobConf;
77import org.apache.hadoop.mapred.OutputCollector;
78import org.apache.hadoop.mapred.Reporter;
79import org.apache.hadoop.mapred.lib.IdentityMapper;
80import org.apache.hadoop.mapred.lib.IdentityReducer;
81
82
83
84class ReduceClass extends TableReduce<LongWritable, Text> {
85  // on this sample, map is nonuse, we use reduce to handle
86  public void reduce(LongWritable key, Iterator<Text> values,
87      OutputCollector<Text, MapWritable> output, Reporter reporter)
88      throws IOException {
89    String sp = ":";
90    String bf = "person:";
91    // this map holds the columns per row
92    MapWritable map = new MapWritable();
93    // values.next().getByte() can get value and transfer to byte form,
94    String stro = new String(values.next().getBytes());
95    String str[] = stro.split(sp);
96   
97    BufferedReader fconf = new BufferedReader(new FileReader(new File("/tmp/fi_conf.tmp")));
98    // int length = cf.length;
99   
100   
101   
102    // test for debug
103    FileOutputStream out = new FileOutputStream(new File(
104        "/home/waue/mr-result.txt"));
105    String first_line = fconf.readLine();
106   
107    // test for debug
108    String[] cf = first_line.split(sp);
109    int length = cf.length;
110    for(int i=0 ; i<length; i ++){
111      out.write((bf + cf[i]+"\n").getBytes());
112     
113    }
114    out.close();
115    // Column id is created dymanically,
116    Text[] col_n = new Text[length];
117    byte[][] b_l = new byte[length][];
118    // contents must be ImmutableBytesWritable
119    ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
120    map.clear();
121
122    for (int i = 0; i < length; i++) {
123      col_n[i] = new Text(bf + cf[i]);
124      b_l[i] = str[i].getBytes();
125      w_l[i] = new ImmutableBytesWritable(b_l[i]);
126      // populate the current row
127      map.put(col_n[i], w_l[i]);
128    }
129    // add the row with the key as the row id
130    output.collect(new Text(key.toString()), map);
131  }
132}
133
134public class HBaseRecordPro {
135
136  /* Denify parameter */
137
138  // file path in hadoop file system (not phisical file system)
139  final String file_path = "/home/waue/test.txt";
140
141  // setup MapTask and Reduce Task
142
143  final String bf = "person:";
144
145  final String table_name = "testend";
146
147  final String sp = ":";
148
149  String[] cf ;
150 
151  String test;
152
153  public HBaseRecordPro() {
154
155
156  }
157  public HBaseRecordPro(String[] st) {
158    cf = st;
159  }
160
161  static public String parseFirstLine(String in, String ou) throws IOException {
162    BufferedReader fi = new BufferedReader(new FileReader(new File(in)));
163    BufferedWriter ff = new BufferedWriter(new FileWriter(new File("/tmp/fi_conf.tmp")));
164    BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou)));
165    String first_line, data;
166    first_line = fi.readLine();
167    ff.write(first_line);
168    ff.flush();
169    do {
170      data = fi.readLine();
171      if (data == null) {
172        break;
173      } else {
174        fw.write(data + "\n");
175        fw.flush();
176      }
177    } while (true);
178    fw.close();
179    ff.close();
180    return first_line;
181  }
182
183  /**
184   * Runs the demo.
185   */
186  public static void main(String[] args) throws IOException {
187
188    String bf = "person:";
189    String file_path = "/home/waue/test.txt";
190   
191    final String file_tmp = "/tmp/HBaseRecord.text.tmp";
192    final int mapTasks = 1;
193    final int reduceTasks = 1;
194    String[] column_family = { bf };
195   
196    HBaseRecordPro setup = new HBaseRecordPro();
197   
198    String first_line = parseFirstLine(file_path, file_tmp);
199    System.out.println(first_line);
200//    HBaseRecordPro.cf = first_line.split(sp);
201
202   
203    // test
204    /*
205    for (int i = 0; i < 3; i++) {
206      System.out.println("column[" + i + "]=" + bf + cf[i]);
207    }*/
208
209    BuildHTable build_table = new BuildHTable(setup.table_name,
210        column_family);
211    if (!build_table.checkTableExist(setup.table_name)) {
212      if (!build_table.createTable()) {
213        System.out.println("create table error !");
214      }
215    } else {
216      System.out.println("Table \"" + setup.table_name
217          + "\" has already existed !");
218    }
219    JobConf conf = new JobConf(HBaseRecordPro.class);
220    FileSystem fileconf = FileSystem.get(conf);
221    fileconf.copyFromLocalFile(true, new Path(file_tmp), new Path(file_tmp));
222    // Job name; you can modify to any you like
223    conf.setJobName("PersonDataBase");
224
225    // Hbase table name must be correct , in our profile is t1_table
226    TableReduce.initJob(setup.table_name, ReduceClass.class, conf);
227   
228    // below are map-reduce profile
229    conf.setNumMapTasks(mapTasks);
230    conf.setNumReduceTasks(reduceTasks);
231    conf.setInputPath(new Path(file_tmp));
232    conf.setMapperClass(IdentityMapper.class);
233    conf.setCombinerClass(IdentityReducer.class);
234    conf.setReducerClass(ReduceClass.class);
235
236    JobClient.runJob(conf);
237   
238  }
239}
Note: See TracBrowser for help on using the repository browser.