source: sample/hadoop-0.16/tw/org/nchc/code/HBaseRecordPro.java @ 27

Last change on this file since 27 was 27, checked in by waue, 16 years ago

test!

File size: 8.2 KB
Line 
1/**
2 * Program: HBaseRecordPro.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 */
7
8/**
9 * Purpose :
10 *  First, program would parse your record and create Hbase.\
11 *  Then it sets the first line as column qualify \
12 *  Finally it stores in HBase automatically.
13 *
14 * HowToUse :
15 *  Make sure two thing :
16 *  1. source_file must be regular as follow:
17 *    first line: qualify1:qualify2:...:qualifyN
18 *    other line: records1:records2:...:recordsN
19 *     (the number N of qualify must be the same as records )
20-----------------
21name:locate:years
22waue:taiwan:1981
23rock:taiwan:1981
24aso:taiwan:1981
25jazz:taiwan:1982
26-----------------
27 *  2. source_file path must be correct.
28
29 * Check Result:
30 *  Go to hbase console, type :
31 *    hql > select * from t1_table;
32 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key
33
34+-------------------------+-------------------------+-------------------------+
35| Row                     | Column                  | Cell                    |
36+-------------------------+-------------------------+-------------------------+
37| 0                       | member:locate           | taiwan                  |
38+-------------------------+-------------------------+-------------------------+
39| 0                       | member:name             | waue                    |
40+-------------------------+-------------------------+-------------------------+
41| 0                       | member:years            | 1981                    |
42+-------------------------+-------------------------+-------------------------+
43| 17                      | member:locate           | taiwan                  |
44+-------------------------+-------------------------+-------------------------+
45| 17                      | member:name             | rock                    |
46+-------------------------+-------------------------+-------------------------+
47| 17                      | member:years            | 1981                    |
48+-------------------------+-------------------------+-------------------------+
49| 34                      | member:locate           | taiwan                  |
50+-------------------------+-------------------------+-------------------------+
51| 34                      | member:name             | aso                     |
52+-------------------------+-------------------------+-------------------------+
53| 34                      | member:years            | 1981                    |
54+-------------------------+-------------------------+-------------------------+
55| 50                      | member:locate           | taiwan                  |
56+-------------------------+-------------------------+-------------------------+
57| 50                      | member:name             | jazz                    |
58+-------------------------+-------------------------+-------------------------+
59| 50                      | member:years            | 1982                    |
60+-------------------------+-------------------------+-------------------------+
614 row(s) in set. (0.31 sec)
62
63 */
64
65package tw.org.nchc.code;
66
67import java.io.BufferedReader;
68import java.io.BufferedWriter;
69import java.io.File;
70import java.io.FileReader;
71import java.io.FileWriter;
72import java.io.IOException;
73import java.util.Iterator;
74
75import org.apache.hadoop.fs.FileSystem;
76import org.apache.hadoop.fs.Path;
77import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
78import org.apache.hadoop.hbase.mapred.TableReduce;
79import org.apache.hadoop.io.LongWritable;
80import org.apache.hadoop.io.MapWritable;
81import org.apache.hadoop.io.Text;
82import org.apache.hadoop.mapred.JobClient;
83import org.apache.hadoop.mapred.JobConf;
84import org.apache.hadoop.mapred.OutputCollector;
85import org.apache.hadoop.mapred.Reporter;
86import org.apache.hadoop.mapred.lib.IdentityMapper;
87import org.apache.hadoop.mapred.lib.IdentityReducer;
88
89public class HBaseRecordPro {
90  /* Major parameter */
91  // it indicates local path, not hadoop file system path
92  final static String source_file = "/home/waue/test.txt";
93
94  /* Minor parameter */
95  // column family name
96  final static String column_family = "member:";
97
98  // table name
99  final static String table_name = "HBaseRecord";
100
101  // separate char
102  final static String sp = ":";
103 
104  // conf tmp with column qualify
105  final static String conf_tmp = "/tmp/HBaseRecordPro.firstLine.tmp";
106 
107  // data source tmp
108  final static String text_tmp = "/tmp/HBaseRecord.text.tmp";
109
110  // on this sample, map is nonuse, we use reduce to handle
111  private static class ReduceClass extends TableReduce<LongWritable, Text> {
112    public void reduce(LongWritable key, Iterator<Text> values,
113        OutputCollector<Text, MapWritable> output, Reporter reporter)
114        throws IOException {
115
116      // read the configure file
117      BufferedReader fconf = new BufferedReader(new FileReader(new File(
118          conf_tmp)));
119      String first_line = fconf.readLine();
120      fconf.close();
121      // extract cf data
122      String[] cf = first_line.split(sp);
123      int length = cf.length;
124       
125      // values.next().getByte() can get value and transfer to byte form,
126      String stro = new String(values.next().getBytes());
127      String str[] = stro.split(sp);
128
129      // Column id is created dymanically,
130      Text[] col_n = new Text[length];
131      byte[][] b_l = new byte[length][];
132      // contents must be ImmutableBytesWritable
133      ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
134
135      // This map connect to hbase table and holds the columns per row
136      MapWritable map = new MapWritable();
137      map.clear();
138
139      // prepare to write data into map
140      for (int i = 0; i < length; i++) {
141        col_n[i] = new Text(column_family + cf[i]);
142        b_l[i] = str[i].getBytes();
143        w_l[i] = new ImmutableBytesWritable(b_l[i]);
144        // populate the current row
145        map.put(col_n[i], w_l[i]);
146      }
147      // add the row with the key as the row id
148      output.collect(new Text(key.toString()), map);
149    }
150  }
151
152  public HBaseRecordPro() {
153  }
154 
155  // This function can split the source text into two file, \
156  //  conf_tmp file recorded first line is used to set column qualify \
157  //  text_tmp , ou, recorded data is used to store into table.
158  public String parseFirstLine(String in, String ou)
159      throws IOException {
160
161    BufferedReader fi = new BufferedReader(new FileReader(new File(in)));
162    BufferedWriter ff = new BufferedWriter(new FileWriter(new File(conf_tmp)));
163    BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou)));
164    String first_line, data;
165    first_line = fi.readLine();
166    ff.write(first_line);
167    ff.flush();
168    do {
169      data = fi.readLine();
170      if (data == null) {
171        break;
172      } else {
173        fw.write(data + "\n");
174        fw.flush();
175      }
176    } while (true);
177    fw.close();
178    ff.close();
179    return first_line;
180  }
181  // tmp file delete
182  boolean deleteFile(String str)throws IOException{
183    File df = new File(str);
184   
185    if(df.exists()){
186      if(!df.delete()){
187        System.err.print("delete file error !");
188      }
189    }else{
190      System.out.println("file not exit!");
191    }
192    return true;
193  }
194  /**
195   * Runs the demo.
196   */
197  public static void main(String[] args) throws IOException {
198
199    HBaseRecordPro setup = new HBaseRecordPro();
200    String[] col_family = {column_family};
201    Path text_path = new Path(text_tmp);
202   
203    setup.parseFirstLine(source_file, text_tmp);
204//    System.out.println(first_line);
205
206    BuildHTable build_table = new BuildHTable(table_name,
207        col_family);
208    if (!build_table.checkTableExist(table_name)) {
209      if (!build_table.createTable()) {
210        System.out.println("create table error !");
211      }
212    } else {
213      System.out.println("Table \"" + table_name
214          + "\" has already existed !");
215    }
216    JobConf conf = new JobConf(HBaseRecordPro.class);
217    FileSystem fileconf = FileSystem.get(conf);
218    fileconf.copyFromLocalFile(true, text_path, text_path);
219    // Job name; you can modify to any you like
220    conf.setJobName("PersonDataBase");
221    final int mapTasks = 1;
222    final int reduceTasks = 1;
223    // Hbase table name must be correct , in our profile is t1_table
224    TableReduce.initJob(table_name, ReduceClass.class, conf);
225
226    // below are map-reduce profile
227    conf.setNumMapTasks(mapTasks);
228    conf.setNumReduceTasks(reduceTasks);
229   
230    conf.setInputPath(text_path);
231
232   
233    conf.setMapperClass(IdentityMapper.class);
234    conf.setCombinerClass(IdentityReducer.class);
235    conf.setReducerClass(ReduceClass.class);
236
237    JobClient.runJob(conf);
238   
239    // delete tmp file
240    // 0.16
241    FileSystem.get(conf).delete(text_path);
242   
243    setup.deleteFile(conf_tmp);
244  }
245}
Note: See TracBrowser for help on using the repository browser.