source: sample/HBaseRecordPro.java @ 15

Last change on this file since 15 was 15, checked in by waue, 16 years ago

can't run

File size: 7.0 KB
Line 
1/**
2 * Program: HBaseRecord.java
3 * Editor: Waue Chen
4 * From :  NCHC. Taiwn
5 * Last Update Date: 06/01/2008
6 */
7
8/**
9 * Purpose :
10 *  Parse your record and then store in HBase.
11 *
12 * HowToUse :
13 *  Make sure Hadoop file system and Hbase are running correctly.
14 *  1. put test.txt in t1 directory which content is
15  ---------------
16  name:locate:years
17  waue:taiwan:1981
18  shellon:taiwan:1981
19  ---------------
20 *  2. hadoop_root/$ bin/hadoop dfs -put t1 t1
21 *  3. hbase_root/$ bin/hbase shell
22 *  4. hql > create table t1_table("person");
23 *  5. Come to Eclipse and run this code, and we will let database as that
24  t1_table -> person
25    ----------------
26    |  name | locate | years |
27    | waue  | taiwan | 1981 |
28    | shellon | taiwan | 1981 |
29    ----------------
30 * Check Result:
31 *  Go to hbase console, type :
32 *    hql > select * from t1_table;
3308/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key
34+-------------------------+-------------------------+-------------------------+
35| Row                     | Column                  | Cell                    |
36+-------------------------+-------------------------+-------------------------+
37| 0                       | person:locate           | locate                  |
38+-------------------------+-------------------------+-------------------------+
39| 0                       | person:name             | name                    |
40+-------------------------+-------------------------+-------------------------+
41| 0                       | person:years            | years                   |
42+-------------------------+-------------------------+-------------------------+
43| 19                      | person:locate           | taiwan                  |
44+-------------------------+-------------------------+-------------------------+
45| 19                      | person:name             | waue                    |
46+-------------------------+-------------------------+-------------------------+
47| 19                      | person:years            | 1981                    |
48+-------------------------+-------------------------+-------------------------+
49| 36                      | person:locate           | taiwan                  |
50+-------------------------+-------------------------+-------------------------+
51| 36                      | person:name             | shellon                 |
52+-------------------------+-------------------------+-------------------------+
53| 36                      | person:years            | 1981                    |
54+-------------------------+-------------------------+-------------------------+
553 row(s) in set. (0.04 sec)
56 */
57
58
59
60
61package tw.org.nchc.code;
62
63import java.io.BufferedReader;
64import java.io.BufferedWriter;
65import java.io.File;
66import java.io.FileReader;
67import java.io.FileWriter;
68import java.io.IOException;
69import java.util.Iterator;
70import org.apache.hadoop.fs.FileSystem;
71import org.apache.hadoop.fs.Path;
72import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
73import org.apache.hadoop.hbase.mapred.TableReduce;
74import org.apache.hadoop.io.LongWritable;
75import org.apache.hadoop.io.MapWritable;
76import org.apache.hadoop.io.Text;
77import org.apache.hadoop.mapred.JobClient;
78import org.apache.hadoop.mapred.JobConf;
79import org.apache.hadoop.mapred.OutputCollector;
80import org.apache.hadoop.mapred.Reporter;
81import org.apache.hadoop.mapred.lib.IdentityMapper;
82import org.apache.hadoop.mapred.lib.IdentityReducer;
83
84
85public class HBaseRecordPro {
86
87  /* Denify parameter */
88 
89  // file path in hadoop file system (not phisical file system)
90  private String file_path = "/home/waue/test.txt";
91 
92  // setup MapTask and Reduce Task
93
94 
95  private final static String bf = "person:";
96  private final String table_name = "testpro";
97
98  private final static String sp = ":";
99  private static String[] cf;
100 
101  private static class ReduceClass extends TableReduce<LongWritable, Text> {
102
103
104   
105    // on this sample, map is nonuse, we use reduce to handle
106    public void reduce(LongWritable key, Iterator<Text> values,
107        OutputCollector<Text, MapWritable> output, Reporter reporter)
108        throws IOException {
109
110      // this map holds the columns per row
111      MapWritable map = new MapWritable(); 
112      // values.next().getByte() can get value and transfer to byte form,
113      String stro = new String(values.next().getBytes());
114      String str[] = stro.split(sp);
115     
116      int length = cf.length;
117     
118      // Column id is created dymanically,
119      Text[] col_n = new Text[length];
120      byte[][] b_l = new byte[length][];
121      // contents must be ImmutableBytesWritable
122      ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
123      map.clear();
124      for(int i = 0; i < length; i++){
125        col_n[i] = new Text(bf+cf[i]);
126        b_l[i] = str[i].getBytes();
127        w_l[i] = new ImmutableBytesWritable(b_l[i]);
128        // populate the current row
129        map.put(col_n[i], w_l[i]);
130      }
131      // add the row with the key as the row id
132      output.collect(new Text(key.toString()), map);
133    }
134  }
135
136  private HBaseRecordPro() {
137  }
138 
139  String parseFirstLine(String in, String ou) throws IOException {
140    BufferedReader fi = new BufferedReader(new FileReader(new File(in)));
141    BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou)));
142    String first_line,data;
143    first_line = fi.readLine();
144    do{
145      data = fi.readLine();
146      if( data == null){
147        break;
148      }else{
149        fw.write(data+"\n");
150      }
151    }while(true);   
152    fw.close();
153    return first_line;
154  }
155  /**
156   * Runs the demo.
157   */
158  public static void main(String[] args) throws IOException {
159
160    final String file_tmp = "/tmp/HBaseRecord.text.tmp";
161    final int mapTasks = 1;
162    final int reduceTasks = 1;
163    String[] column_family = {bf};
164   
165    HBaseRecordPro setup = new HBaseRecordPro();
166   
167    String first_line = setup.parseFirstLine(setup.file_path, file_tmp);
168    System.out.println(first_line);
169    HBaseRecordPro.cf = first_line.split(sp); 
170    //test
171    for(int i =0 ; i< cf.length; i++){
172      System.out.println("column["+i+"]"+bf+cf[i]);
173    }
174
175    BuildHTable build_table = new BuildHTable(setup.table_name,column_family);
176    if (!build_table.checkTableExist(setup.table_name)) {
177      if (!build_table.createTable()) {
178        System.out.println("create table error !");
179      }
180    } else {
181      System.out.println("Table \"" + setup.table_name
182          + "\" has already existed !");
183    }   
184    JobConf conf = new JobConf(HBaseRecordPro.class);
185    FileSystem fileconf = FileSystem.get(conf);
186    fileconf.copyFromLocalFile(true,new Path(file_tmp), new Path(file_tmp));
187    //Job name; you can modify to any you like 
188    conf.setJobName("PersonDataBase");
189
190    // Hbase table name must be correct , in our profile is t1_table
191    TableReduce.initJob(setup.table_name, ReduceClass.class, conf);
192   
193    // below are map-reduce profile
194    conf.setNumMapTasks(mapTasks);
195    conf.setNumReduceTasks(reduceTasks);
196    conf.setInputPath(new Path(file_tmp));
197    conf.setMapperClass(IdentityMapper.class);
198    conf.setCombinerClass(IdentityReducer.class);
199    conf.setReducerClass(ReduceClass.class);
200    JobClient.runJob(conf);
201   
202  }
203}
Note: See TracBrowser for help on using the repository browser.