程式宣告
- Program: HBaseRecordPro.java
- Editor: Waue Chen
- From : NCHC. Taiwn
- Last Update Date: 07/01/2008
程式功能
- Program would parse your record and create Hbase
- Then it sets the first line as column qualify
- Finally it stores in HBase automatically.
如何使用
- Make sure two thing :
- 1. source_file must be regular as follow:
- first line: qualify1:qualify2:...:qualifyN
- other line: records1:records2:...:recordsN
- 2. source_file path must be correct.
原始檔內容
name:locate:years waue:taiwan:1981 rock:taiwan:1981 aso:taiwan:1981 jazz:taiwan:1982
結果
* Go to hbase console, type : * hql > select * from t1_table; 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key +-------------------------+-------------------------+-------------------------+ | Row | Column | Cell | +-------------------------+-------------------------+-------------------------+ | 0 | member:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 0 | member:name | waue | +-------------------------+-------------------------+-------------------------+ | 0 | member:years | 1981 | +-------------------------+-------------------------+-------------------------+ | 17 | member:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 17 | member:name | rock | +-------------------------+-------------------------+-------------------------+ | 17 | member:years | 1981 | +-------------------------+-------------------------+-------------------------+ | 34 | member:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 34 | member:name | aso | +-------------------------+-------------------------+-------------------------+ | 34 | member:years | 1981 | +-------------------------+-------------------------+-------------------------+ | 50 | member:locate | taiwan | +-------------------------+-------------------------+-------------------------+ | 50 | member:name | jazz | +-------------------------+-------------------------+-------------------------+ | 50 | member:years | 1982 | +-------------------------+-------------------------+-------------------------+ 4 row(s) in set. (0.31 sec) */
程式碼
package tw.org.nchc.code; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableReduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; public class HBaseRecordPro { /* Major parameter */ // it indicates local path, not hadoop file system path final static String source_file = "/home/waue/test.txt"; /* Minor parameter */ // column family name final static String column_family = "member:"; // table name final static String table_name = "HBaseRecord"; // separate char final static String sp = ":"; // conf tmp with column qualify final static String conf_tmp = "/tmp/HBaseRecordPro.firstLine.tmp"; // data source tmp final static String text_tmp = "/tmp/HBaseRecord.text.tmp"; // on this sample, map is nonuse, we use reduce to handle private static class ReduceClass extends TableReduce<LongWritable, Text> { public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<Text, MapWritable> output, Reporter reporter) throws IOException { // read the configure file BufferedReader fconf = new BufferedReader(new FileReader(new File( conf_tmp))); String first_line = fconf.readLine(); fconf.close(); // extract cf data String[] cf = first_line.split(sp); int length = cf.length; // values.next().getByte() can get value and transfer to byte form, String stro = new String(values.next().getBytes()); String str[] = stro.split(sp); // Column id is created dymanically, Text[] col_n = new Text[length]; byte[][] b_l = new byte[length][]; // contents must be ImmutableBytesWritable ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length]; // This map connect to hbase table and holds the columns per row MapWritable map = new MapWritable(); map.clear(); // prepare to write data into map for (int i = 0; i < length; i++) { col_n[i] = new Text(column_family + cf[i]); b_l[i] = str[i].getBytes(); w_l[i] = new ImmutableBytesWritable(b_l[i]); // populate the current row map.put(col_n[i], w_l[i]); } // add the row with the key as the row id output.collect(new Text(key.toString()), map); } } public HBaseRecordPro() { } // This function can split the source text into two file, \ // conf_tmp file recorded first line is used to set column qualify \ // text_tmp , ou, recorded data is used to store into table. public String parseFirstLine(String in, String ou) throws IOException { BufferedReader fi = new BufferedReader(new FileReader(new File(in))); BufferedWriter ff = new BufferedWriter(new FileWriter(new File(conf_tmp))); BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou))); String first_line, data; first_line = fi.readLine(); ff.write(first_line); ff.flush(); do { data = fi.readLine(); if (data == null) { break; } else { fw.write(data + "\n"); fw.flush(); } } while (true); fw.close(); ff.close(); return first_line; } // tmp file delete boolean deleteFile(String str)throws IOException{ File df = new File(str); if(df.exists()){ if(!df.delete()){ System.err.print("delete file error !"); } }else{ System.out.println("file not exit!"); } return true; } /** * Runs the demo. */ public static void main(String[] args) throws IOException { HBaseRecordPro setup = new HBaseRecordPro(); String[] col_family = {column_family}; Path text_path = new Path(text_tmp); setup.parseFirstLine(source_file, text_tmp); // System.out.println(first_line); BuildHTable build_table = new BuildHTable(table_name, col_family); if (!build_table.checkTableExist(table_name)) { if (!build_table.createTable()) { System.out.println("create table error !"); } } else { System.out.println("Table \"" + table_name + "\" has already existed !"); } JobConf conf = new JobConf(HBaseRecordPro.class); FileSystem fileconf = FileSystem.get(conf); fileconf.copyFromLocalFile(true, text_path, text_path); // Job name; you can modify to any you like conf.setJobName("PersonDataBase"); final int mapTasks = 1; final int reduceTasks = 1; // Hbase table name must be correct , in our profile is t1_table TableReduce.initJob(table_name, ReduceClass.class, conf); // below are map-reduce profile conf.setNumMapTasks(mapTasks); conf.setNumReduceTasks(reduceTasks); conf.setInputPath(text_path); conf.setMapperClass(IdentityMapper.class); conf.setCombinerClass(IdentityReducer.class); conf.setReducerClass(ReduceClass.class); JobClient.runJob(conf); // delete tmp file FileSystem.get(conf).delete(text_path); setup.deleteFile(conf_tmp); } }
Last modified 14 years ago
Last modified on Jun 29, 2010, 11:27:03 AM