Changeset 17
- Timestamp:
- Jul 1, 2008, 11:22:41 AM (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
sample/HBaseRecordPro.java
r16 r17 8 8 /** 9 9 * Purpose : 10 * Parse your record and then store in HBase. 10 * First, program would parse your record and create Hbase.\ 11 * Then it sets the first line as column qualify \ 12 * Finally it stores in HBase automatically. 11 13 * 12 14 * HowToUse : 13 * Make sure Hadoop file system and Hbase are running correctly. 14 * 1. put test.txt in t1 directory which content is 15 --------------- 16 name:locate:years 17 waue:taiwan:1981 18 shellon:taiwan:1981 19 --------------- 20 * 2. hadoop_root/$ bin/hadoop dfs -put t1 t1 21 * 3. hbase_root/$ bin/hbase shell 22 * 4. hql > create table t1_table("person"); 23 * 5. Come to Eclipse and run this code, and we will let database as that 24 t1_table -> person 25 ---------------- 26 | name | locate | years | 27 | waue | taiwan | 1981 | 28 | shellon | taiwan | 1981 | 29 ---------------- 15 * Make sure two thing : 16 * 1. source_file must be regular as follow: 17 * first line: qualify1:qualify2:...:qualifyN 18 * other line: records1:records2:...:recordsN 19 * (the number N of qualify must be the same as records ) 20 ----------------- 21 name:locate:years 22 waue:taiwan:1981 23 rock:taiwan:1981 24 aso:taiwan:1981 25 jazz:taiwan:1982 26 ----------------- 27 * 2. source_file path must be correct. 28 30 29 * Check Result: 31 30 * Go to hbase console, type : 32 31 * hql > select * from t1_table; 33 32 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key 34 +-------------------------+-------------------------+-------------------------+ 35 | Row | Column | Cell | 36 +-------------------------+-------------------------+-------------------------+ 37 | 0 | person:locate | locate | 38 +-------------------------+-------------------------+-------------------------+ 39 | 0 | person:name | name | 40 +-------------------------+-------------------------+-------------------------+ 41 | 0 | person:years | years | 42 +-------------------------+-------------------------+-------------------------+ 43 | 19 | person:locate | taiwan | 44 +-------------------------+-------------------------+-------------------------+ 45 | 19 | person:name | waue | 46 +-------------------------+-------------------------+-------------------------+ 47 | 19 | person:years | 1981 | 48 +-------------------------+-------------------------+-------------------------+ 49 | 36 | person:locate | taiwan | 50 +-------------------------+-------------------------+-------------------------+ 51 | 36 | person:name | shellon | 52 +-------------------------+-------------------------+-------------------------+ 53 | 36 | person:years | 1981 | 54 +-------------------------+-------------------------+-------------------------+ 55 3 row(s) in set. (0.04 sec) 33 34 +-------------------------+-------------------------+-------------------------+ 35 | Row | Column | Cell | 36 +-------------------------+-------------------------+-------------------------+ 37 | 0 | member:locate | taiwan | 38 +-------------------------+-------------------------+-------------------------+ 39 | 0 | member:name | waue | 40 +-------------------------+-------------------------+-------------------------+ 41 | 0 | member:years | 1981 | 42 +-------------------------+-------------------------+-------------------------+ 43 | 17 | member:locate | taiwan | 44 +-------------------------+-------------------------+-------------------------+ 45 | 17 | member:name | rock | 46 +-------------------------+-------------------------+-------------------------+ 47 | 17 | member:years | 1981 | 48 +-------------------------+-------------------------+-------------------------+ 49 | 34 | member:locate | taiwan | 50 +-------------------------+-------------------------+-------------------------+ 51 | 34 | member:name | aso | 52 +-------------------------+-------------------------+-------------------------+ 53 | 34 | member:years | 1981 | 54 +-------------------------+-------------------------+-------------------------+ 55 | 50 | member:locate | taiwan | 56 +-------------------------+-------------------------+-------------------------+ 57 | 50 | member:name | jazz | 58 +-------------------------+-------------------------+-------------------------+ 59 | 50 | member:years | 1982 | 60 +-------------------------+-------------------------+-------------------------+ 61 4 row(s) in set. (0.31 sec) 62 56 63 */ 57 64 … … 61 68 import java.io.BufferedWriter; 62 69 import java.io.File; 63 import java.io.FileOutputStream;64 70 import java.io.FileReader; 65 71 import java.io.FileWriter; 66 72 import java.io.IOException; 67 73 import java.util.Iterator; 74 68 75 import org.apache.hadoop.fs.FileSystem; 69 76 import org.apache.hadoop.fs.Path; … … 80 87 import org.apache.hadoop.mapred.lib.IdentityReducer; 81 88 82 83 84 class ReduceClass extends TableReduce<LongWritable, Text> { 89 public class HBaseRecordPro { 90 /* Major parameter */ 91 // it indicates local path, not hadoop file system path 92 final static String source_file = "/home/waue/test.txt"; 93 94 /* Minor parameter */ 95 // column family name 96 final static String column_family = "member:"; 97 98 // table name 99 final static String table_name = "HBaseRecord"; 100 101 // separate char 102 final static String sp = ":"; 103 104 // conf tmp with column qualify 105 final static String conf_tmp = "/tmp/HBaseRecordPro.firstLine.tmp"; 106 107 // data source tmp 108 final static String text_tmp = "/tmp/HBaseRecord.text.tmp"; 109 85 110 // on this sample, map is nonuse, we use reduce to handle 86 public void reduce(LongWritable key, Iterator<Text> values, 87 OutputCollector<Text, MapWritable> output, Reporter reporter) 111 private static class ReduceClass extends TableReduce<LongWritable, Text> { 112 public void reduce(LongWritable key, Iterator<Text> values, 113 OutputCollector<Text, MapWritable> output, Reporter reporter) 114 throws IOException { 115 116 // read the configure file 117 BufferedReader fconf = new BufferedReader(new FileReader(new File( 118 conf_tmp))); 119 String first_line = fconf.readLine(); 120 fconf.close(); 121 // extract cf data 122 String[] cf = first_line.split(sp); 123 int length = cf.length; 124 125 // values.next().getByte() can get value and transfer to byte form, 126 String stro = new String(values.next().getBytes()); 127 String str[] = stro.split(sp); 128 129 // Column id is created dymanically, 130 Text[] col_n = new Text[length]; 131 byte[][] b_l = new byte[length][]; 132 // contents must be ImmutableBytesWritable 133 ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length]; 134 135 // This map connect to hbase table and holds the columns per row 136 MapWritable map = new MapWritable(); 137 map.clear(); 138 139 // prepare to write data into map 140 for (int i = 0; i < length; i++) { 141 col_n[i] = new Text(column_family + cf[i]); 142 b_l[i] = str[i].getBytes(); 143 w_l[i] = new ImmutableBytesWritable(b_l[i]); 144 // populate the current row 145 map.put(col_n[i], w_l[i]); 146 } 147 // add the row with the key as the row id 148 output.collect(new Text(key.toString()), map); 149 } 150 } 151 152 public HBaseRecordPro() { 153 } 154 155 // This function can split the source text into two file, \ 156 // conf_tmp file recorded first line is used to set column qualify \ 157 // text_tmp , ou, recorded data is used to store into table. 158 public String parseFirstLine(String in, String ou) 88 159 throws IOException { 89 String sp = ":"; 90 String bf = "person:"; 91 // this map holds the columns per row 92 MapWritable map = new MapWritable(); 93 // values.next().getByte() can get value and transfer to byte form, 94 String stro = new String(values.next().getBytes()); 95 String str[] = stro.split(sp); 96 97 BufferedReader fconf = new BufferedReader(new FileReader(new File("/tmp/fi_conf.tmp"))); 98 // int length = cf.length; 99 100 101 102 // test for debug 103 FileOutputStream out = new FileOutputStream(new File( 104 "/home/waue/mr-result.txt")); 105 String first_line = fconf.readLine(); 106 107 // test for debug 108 String[] cf = first_line.split(sp); 109 int length = cf.length; 110 for(int i=0 ; i<length; i ++){ 111 out.write((bf + cf[i]+"\n").getBytes()); 112 113 } 114 out.close(); 115 // Column id is created dymanically, 116 Text[] col_n = new Text[length]; 117 byte[][] b_l = new byte[length][]; 118 // contents must be ImmutableBytesWritable 119 ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length]; 120 map.clear(); 121 122 for (int i = 0; i < length; i++) { 123 col_n[i] = new Text(bf + cf[i]); 124 b_l[i] = str[i].getBytes(); 125 w_l[i] = new ImmutableBytesWritable(b_l[i]); 126 // populate the current row 127 map.put(col_n[i], w_l[i]); 128 } 129 // add the row with the key as the row id 130 output.collect(new Text(key.toString()), map); 131 } 132 } 133 134 public class HBaseRecordPro { 135 136 /* Denify parameter */ 137 138 // file path in hadoop file system (not phisical file system) 139 final String file_path = "/home/waue/test.txt"; 140 141 // setup MapTask and Reduce Task 142 143 final String bf = "person:"; 144 145 final String table_name = "testend"; 146 147 final String sp = ":"; 148 149 String[] cf ; 150 151 String test; 152 153 public HBaseRecordPro() { 154 155 156 } 157 public HBaseRecordPro(String[] st) { 158 cf = st; 159 } 160 161 static public String parseFirstLine(String in, String ou) throws IOException { 160 162 161 BufferedReader fi = new BufferedReader(new FileReader(new File(in))); 163 BufferedWriter ff = new BufferedWriter(new FileWriter(new File( "/tmp/fi_conf.tmp")));162 BufferedWriter ff = new BufferedWriter(new FileWriter(new File(conf_tmp))); 164 163 BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou))); 165 164 String first_line, data; … … 180 179 return first_line; 181 180 } 182 181 // tmp file delete 182 boolean deleteFile(String str)throws IOException{ 183 File df = new File(str); 184 185 if(df.exists()){ 186 if(!df.delete()){ 187 System.err.print("delete file error !"); 188 } 189 }else{ 190 System.out.println("file not exit!"); 191 } 192 return true; 193 } 183 194 /** 184 195 * Runs the demo. … … 186 197 public static void main(String[] args) throws IOException { 187 198 188 String bf = "person:"; 189 String file_path = "/home/waue/test.txt"; 199 HBaseRecordPro setup = new HBaseRecordPro(); 200 String[] col_family = {column_family}; 201 Path text_path = new Path(text_tmp); 190 202 191 final String file_tmp = "/tmp/HBaseRecord.text.tmp"; 192 final int mapTasks = 1; 193 final int reduceTasks = 1; 194 String[] column_family = { bf }; 195 196 HBaseRecordPro setup = new HBaseRecordPro(); 197 198 String first_line = parseFirstLine(file_path, file_tmp); 199 System.out.println(first_line); 200 // HBaseRecordPro.cf = first_line.split(sp); 201 202 203 // test 204 /* 205 for (int i = 0; i < 3; i++) { 206 System.out.println("column[" + i + "]=" + bf + cf[i]); 207 }*/ 208 209 BuildHTable build_table = new BuildHTable(setup.table_name, 210 column_family); 211 if (!build_table.checkTableExist(setup.table_name)) { 203 setup.parseFirstLine(source_file, text_tmp); 204 // System.out.println(first_line); 205 206 BuildHTable build_table = new BuildHTable(table_name, 207 col_family); 208 if (!build_table.checkTableExist(table_name)) { 212 209 if (!build_table.createTable()) { 213 210 System.out.println("create table error !"); 214 211 } 215 212 } else { 216 System.out.println("Table \"" + setup.table_name213 System.out.println("Table \"" + table_name 217 214 + "\" has already existed !"); 218 215 } 219 216 JobConf conf = new JobConf(HBaseRecordPro.class); 220 217 FileSystem fileconf = FileSystem.get(conf); 221 fileconf.copyFromLocalFile(true, new Path(file_tmp), new Path(file_tmp));218 fileconf.copyFromLocalFile(true, text_path, text_path); 222 219 // Job name; you can modify to any you like 223 220 conf.setJobName("PersonDataBase"); 224 221 final int mapTasks = 1; 222 final int reduceTasks = 1; 225 223 // Hbase table name must be correct , in our profile is t1_table 226 TableReduce.initJob( setup.table_name, ReduceClass.class, conf);227 224 TableReduce.initJob(table_name, ReduceClass.class, conf); 225 228 226 // below are map-reduce profile 229 227 conf.setNumMapTasks(mapTasks); 230 228 conf.setNumReduceTasks(reduceTasks); 231 conf.setInputPath( new Path(file_tmp));229 conf.setInputPath(text_path); 232 230 conf.setMapperClass(IdentityMapper.class); 233 231 conf.setCombinerClass(IdentityReducer.class); … … 236 234 JobClient.runJob(conf); 237 235 236 // delete tmp file 237 FileSystem.get(conf).delete(text_path); 238 setup.deleteFile(conf_tmp); 238 239 } 239 240 }
Note: See TracChangeset
for help on using the changeset viewer.