source: sample/hadoop-0.17/tw/org/nchc/code/HBaseRecordPro.java @ 20

Last change on this file since 20 was 20, checked in by waue, 16 years ago

將改完的 hadoop 0.17版package 放來備份
目前繼續開發 hadoop 0.16 + hbase 1.3

File size: 8.3 KB
Line 
1/**
2 * Program: HBaseRecordPro.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 07/02/2008
6 * Upgrade to 0.17
7 */
8
9/**
10 * Purpose :
11 *  First, program would parse your record and create Hbase.\
12 *  Then it sets the first line as column qualify \
13 *  Finally it stores in HBase automatically.
14 *
15 * HowToUse :
16 *  Make sure two thing :
17 *  1. source_file must be regular as follow:
18 *    first line: qualify1:qualify2:...:qualifyN
19 *    other line: records1:records2:...:recordsN
20 *     (the number N of qualify must be the same as records )
21-----------------
22name:locate:years
23waue:taiwan:1981
24rock:taiwan:1981
25aso:taiwan:1981
26jazz:taiwan:1982
27-----------------
28 *  2. source_file path must be correct.
29
30 * Check Result:
31 *  Go to hbase console, type :
32 *    hql > select * from t1_table;
33 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key
34
35+-------------------------+-------------------------+-------------------------+
36| Row                     | Column                  | Cell                    |
37+-------------------------+-------------------------+-------------------------+
38| 0                       | member:locate           | taiwan                  |
39+-------------------------+-------------------------+-------------------------+
40| 0                       | member:name             | waue                    |
41+-------------------------+-------------------------+-------------------------+
42| 0                       | member:years            | 1981                    |
43+-------------------------+-------------------------+-------------------------+
44| 17                      | member:locate           | taiwan                  |
45+-------------------------+-------------------------+-------------------------+
46| 17                      | member:name             | rock                    |
47+-------------------------+-------------------------+-------------------------+
48| 17                      | member:years            | 1981                    |
49+-------------------------+-------------------------+-------------------------+
50| 34                      | member:locate           | taiwan                  |
51+-------------------------+-------------------------+-------------------------+
52| 34                      | member:name             | aso                     |
53+-------------------------+-------------------------+-------------------------+
54| 34                      | member:years            | 1981                    |
55+-------------------------+-------------------------+-------------------------+
56| 50                      | member:locate           | taiwan                  |
57+-------------------------+-------------------------+-------------------------+
58| 50                      | member:name             | jazz                    |
59+-------------------------+-------------------------+-------------------------+
60| 50                      | member:years            | 1982                    |
61+-------------------------+-------------------------+-------------------------+
624 row(s) in set. (0.31 sec)
63
64 */
65
66package tw.org.nchc.code;
67
68import java.io.BufferedReader;
69import java.io.BufferedWriter;
70import java.io.File;
71import java.io.FileReader;
72import java.io.FileWriter;
73import java.io.IOException;
74import java.util.Iterator;
75
76import org.apache.hadoop.fs.FileSystem;
77import org.apache.hadoop.fs.Path;
78import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
79import org.apache.hadoop.hbase.mapred.TableReduce;
80import org.apache.hadoop.io.LongWritable;
81import org.apache.hadoop.io.MapWritable;
82import org.apache.hadoop.io.Text;
83import org.apache.hadoop.mapred.JobClient;
84import org.apache.hadoop.mapred.JobConf;
85import org.apache.hadoop.mapred.OutputCollector;
86import org.apache.hadoop.mapred.Reporter;
87import org.apache.hadoop.mapred.lib.IdentityMapper;
88import org.apache.hadoop.mapred.lib.IdentityReducer;
89
90public class HBaseRecordPro {
91  /* Major parameter */
92  // it indicates local path, not hadoop file system path
93  final static String source_file = "/home/waue/test.txt";
94
95  /* Minor parameter */
96  // column family name
97  final static String column_family = "member:";
98
99  // table name
100  final static String table_name = "HBaseRecord";
101
102  // separate char
103  final static String sp = ":";
104 
105  // conf tmp with column qualify
106  final static String conf_tmp = "/tmp/HBaseRecordPro.firstLine.tmp";
107 
108  // data source tmp
109  final static String text_tmp = "/tmp/HBaseRecord.text.tmp";
110
111  // on this sample, map is nonuse, we use reduce to handle
112  private static class ReduceClass extends TableReduce<LongWritable, Text> {
113    public void reduce(LongWritable key, Iterator<Text> values,
114        OutputCollector<Text, MapWritable> output, Reporter reporter)
115        throws IOException {
116
117      // read the configure file
118      BufferedReader fconf = new BufferedReader(new FileReader(new File(
119          conf_tmp)));
120      String first_line = fconf.readLine();
121      fconf.close();
122      // extract cf data
123      String[] cf = first_line.split(sp);
124      int length = cf.length;
125       
126      // values.next().getByte() can get value and transfer to byte form,
127      String stro = new String(values.next().getBytes());
128      String str[] = stro.split(sp);
129
130      // Column id is created dymanically,
131      Text[] col_n = new Text[length];
132      byte[][] b_l = new byte[length][];
133      // contents must be ImmutableBytesWritable
134      ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
135
136      // This map connect to hbase table and holds the columns per row
137      MapWritable map = new MapWritable();
138      map.clear();
139
140      // prepare to write data into map
141      for (int i = 0; i < length; i++) {
142        col_n[i] = new Text(column_family + cf[i]);
143        b_l[i] = str[i].getBytes();
144        w_l[i] = new ImmutableBytesWritable(b_l[i]);
145        // populate the current row
146        map.put(col_n[i], w_l[i]);
147      }
148      // add the row with the key as the row id
149      output.collect(new Text(key.toString()), map);
150    }
151  }
152
153  public HBaseRecordPro() {
154  }
155 
156  // This function can split the source text into two file, \
157  //  conf_tmp file recorded first line is used to set column qualify \
158  //  text_tmp , ou, recorded data is used to store into table.
159  public String parseFirstLine(String in, String ou)
160      throws IOException {
161
162    BufferedReader fi = new BufferedReader(new FileReader(new File(in)));
163    BufferedWriter ff = new BufferedWriter(new FileWriter(new File(conf_tmp)));
164    BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou)));
165    String first_line, data;
166    first_line = fi.readLine();
167    ff.write(first_line);
168    ff.flush();
169    do {
170      data = fi.readLine();
171      if (data == null) {
172        break;
173      } else {
174        fw.write(data + "\n");
175        fw.flush();
176      }
177    } while (true);
178    fw.close();
179    ff.close();
180    return first_line;
181  }
182  // tmp file delete
183  boolean deleteFile(String str)throws IOException{
184    File df = new File(str);
185   
186    if(df.exists()){
187      if(!df.delete()){
188        System.err.print("delete file error !");
189      }
190    }else{
191      System.out.println("file not exit!");
192    }
193    return true;
194  }
195  /**
196   * Runs the demo.
197   */
198  public static void main(String[] args) throws IOException {
199
200    HBaseRecordPro setup = new HBaseRecordPro();
201    String[] col_family = {column_family};
202    Path text_path = new Path(text_tmp);
203   
204    setup.parseFirstLine(source_file, text_tmp);
205//    System.out.println(first_line);
206
207    BuildHTable build_table = new BuildHTable(table_name,
208        col_family);
209    if (!build_table.checkTableExist(table_name)) {
210      if (!build_table.createTable()) {
211        System.out.println("create table error !");
212      }
213    } else {
214      System.out.println("Table \"" + table_name
215          + "\" has already existed !");
216    }
217    JobConf conf = new JobConf(HBaseRecordPro.class);
218    FileSystem fileconf = FileSystem.get(conf);
219    fileconf.copyFromLocalFile(true, text_path, text_path);
220    // Job name; you can modify to any you like
221    conf.setJobName("PersonDataBase");
222    final int mapTasks = 1;
223    final int reduceTasks = 1;
224    // Hbase table name must be correct , in our profile is t1_table
225    TableReduce.initJob(table_name, ReduceClass.class, conf);
226
227    // below are map-reduce profile
228    conf.setNumMapTasks(mapTasks);
229    conf.setNumReduceTasks(reduceTasks);
230    // 0.16
231//    conf.setInputPath(text_path);
232    Convert.setInputPath(conf, text_path);
233   
234    conf.setMapperClass(IdentityMapper.class);
235    conf.setCombinerClass(IdentityReducer.class);
236    conf.setReducerClass(ReduceClass.class);
237
238    JobClient.runJob(conf);
239   
240    // delete tmp file
241    // 0.16
242//    FileSystem.get(conf).delete(text_path);
243    FileSystem.get(conf).delete(text_path,true);
244   
245    setup.deleteFile(conf_tmp);
246  }
247}
Note: See TracBrowser for help on using the repository browser.