Changeset 16
- Timestamp:
- Jun 30, 2008, 1:16:09 PM (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
sample/HBaseRecordPro.java
r15 r16 13 13 * Make sure Hadoop file system and Hbase are running correctly. 14 14 * 1. put test.txt in t1 directory which content is 15 16 17 18 19 15 --------------- 16 name:locate:years 17 waue:taiwan:1981 18 shellon:taiwan:1981 19 --------------- 20 20 * 2. hadoop_root/$ bin/hadoop dfs -put t1 t1 21 21 * 3. hbase_root/$ bin/hbase shell 22 22 * 4. hql > create table t1_table("person"); 23 23 * 5. Come to Eclipse and run this code, and we will let database as that 24 25 26 27 28 29 24 t1_table -> person 25 ---------------- 26 | name | locate | years | 27 | waue | taiwan | 1981 | 28 | shellon | taiwan | 1981 | 29 ---------------- 30 30 * Check Result: 31 31 * Go to hbase console, type : 32 32 * hql > select * from t1_table; 33 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key34 +-------------------------+-------------------------+-------------------------+35 | Row | Column | Cell |36 +-------------------------+-------------------------+-------------------------+37 | 0 | person:locate | locate |38 +-------------------------+-------------------------+-------------------------+39 | 0 | person:name | name |40 +-------------------------+-------------------------+-------------------------+41 | 0 | person:years | years |42 +-------------------------+-------------------------+-------------------------+43 | 19 | person:locate | taiwan |44 +-------------------------+-------------------------+-------------------------+45 | 19 | person:name | waue |46 +-------------------------+-------------------------+-------------------------+47 | 19 | person:years | 1981 |48 +-------------------------+-------------------------+-------------------------+49 | 36 | person:locate | taiwan |50 +-------------------------+-------------------------+-------------------------+51 | 36 | person:name | shellon |52 +-------------------------+-------------------------+-------------------------+53 | 36 | person:years | 1981 |54 +-------------------------+-------------------------+-------------------------+55 3 row(s) in set. (0.04 sec)33 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key 34 +-------------------------+-------------------------+-------------------------+ 35 | Row | Column | Cell | 36 +-------------------------+-------------------------+-------------------------+ 37 | 0 | person:locate | locate | 38 +-------------------------+-------------------------+-------------------------+ 39 | 0 | person:name | name | 40 +-------------------------+-------------------------+-------------------------+ 41 | 0 | person:years | years | 42 +-------------------------+-------------------------+-------------------------+ 43 | 19 | person:locate | taiwan | 44 +-------------------------+-------------------------+-------------------------+ 45 | 19 | person:name | waue | 46 +-------------------------+-------------------------+-------------------------+ 47 | 19 | person:years | 1981 | 48 +-------------------------+-------------------------+-------------------------+ 49 | 36 | person:locate | taiwan | 50 +-------------------------+-------------------------+-------------------------+ 51 | 36 | person:name | shellon | 52 +-------------------------+-------------------------+-------------------------+ 53 | 36 | person:years | 1981 | 54 +-------------------------+-------------------------+-------------------------+ 55 3 row(s) in set. (0.04 sec) 56 56 */ 57 58 59 60 57 61 58 package tw.org.nchc.code; … … 64 61 import java.io.BufferedWriter; 65 62 import java.io.File; 63 import java.io.FileOutputStream; 66 64 import java.io.FileReader; 67 65 import java.io.FileWriter; … … 83 81 84 82 83 84 class ReduceClass extends TableReduce<LongWritable, Text> { 85 // on this sample, map is nonuse, we use reduce to handle 86 public void reduce(LongWritable key, Iterator<Text> values, 87 OutputCollector<Text, MapWritable> output, Reporter reporter) 88 throws IOException { 89 String sp = ":"; 90 String bf = "person:"; 91 // this map holds the columns per row 92 MapWritable map = new MapWritable(); 93 // values.next().getByte() can get value and transfer to byte form, 94 String stro = new String(values.next().getBytes()); 95 String str[] = stro.split(sp); 96 97 BufferedReader fconf = new BufferedReader(new FileReader(new File("/tmp/fi_conf.tmp"))); 98 // int length = cf.length; 99 100 101 102 // test for debug 103 FileOutputStream out = new FileOutputStream(new File( 104 "/home/waue/mr-result.txt")); 105 String first_line = fconf.readLine(); 106 107 // test for debug 108 String[] cf = first_line.split(sp); 109 int length = cf.length; 110 for(int i=0 ; i<length; i ++){ 111 out.write((bf + cf[i]+"\n").getBytes()); 112 113 } 114 out.close(); 115 // Column id is created dymanically, 116 Text[] col_n = new Text[length]; 117 byte[][] b_l = new byte[length][]; 118 // contents must be ImmutableBytesWritable 119 ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length]; 120 map.clear(); 121 122 for (int i = 0; i < length; i++) { 123 col_n[i] = new Text(bf + cf[i]); 124 b_l[i] = str[i].getBytes(); 125 w_l[i] = new ImmutableBytesWritable(b_l[i]); 126 // populate the current row 127 map.put(col_n[i], w_l[i]); 128 } 129 // add the row with the key as the row id 130 output.collect(new Text(key.toString()), map); 131 } 132 } 133 85 134 public class HBaseRecordPro { 86 135 87 136 /* Denify parameter */ 137 138 // file path in hadoop file system (not phisical file system) 139 final String file_path = "/home/waue/test.txt"; 140 141 // setup MapTask and Reduce Task 142 143 final String bf = "person:"; 144 145 final String table_name = "testend"; 146 147 final String sp = ":"; 148 149 String[] cf ; 88 150 89 // file path in hadoop file system (not phisical file system) 90 private String file_path = "/home/waue/test.txt"; 91 92 // setup MapTask and Reduce Task 93 94 95 private final static String bf = "person:"; 96 private final String table_name = "testpro"; 97 98 private final static String sp = ":"; 99 private static String[] cf; 100 101 private static class ReduceClass extends TableReduce<LongWritable, Text> { 102 103 104 105 // on this sample, map is nonuse, we use reduce to handle 106 public void reduce(LongWritable key, Iterator<Text> values, 107 OutputCollector<Text, MapWritable> output, Reporter reporter) 108 throws IOException { 109 110 // this map holds the columns per row 111 MapWritable map = new MapWritable(); 112 // values.next().getByte() can get value and transfer to byte form, 113 String stro = new String(values.next().getBytes()); 114 String str[] = stro.split(sp); 115 116 int length = cf.length; 117 118 // Column id is created dymanically, 119 Text[] col_n = new Text[length]; 120 byte[][] b_l = new byte[length][]; 121 // contents must be ImmutableBytesWritable 122 ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length]; 123 map.clear(); 124 for(int i = 0; i < length; i++){ 125 col_n[i] = new Text(bf+cf[i]); 126 b_l[i] = str[i].getBytes(); 127 w_l[i] = new ImmutableBytesWritable(b_l[i]); 128 // populate the current row 129 map.put(col_n[i], w_l[i]); 151 String test; 152 153 public HBaseRecordPro() { 154 155 156 } 157 public HBaseRecordPro(String[] st) { 158 cf = st; 159 } 160 161 static public String parseFirstLine(String in, String ou) throws IOException { 162 BufferedReader fi = new BufferedReader(new FileReader(new File(in))); 163 BufferedWriter ff = new BufferedWriter(new FileWriter(new File("/tmp/fi_conf.tmp"))); 164 BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou))); 165 String first_line, data; 166 first_line = fi.readLine(); 167 ff.write(first_line); 168 ff.flush(); 169 do { 170 data = fi.readLine(); 171 if (data == null) { 172 break; 173 } else { 174 fw.write(data + "\n"); 175 fw.flush(); 130 176 } 131 // add the row with the key as the row id 132 output.collect(new Text(key.toString()), map); 133 } 134 } 135 136 private HBaseRecordPro() { 137 } 138 139 String parseFirstLine(String in, String ou) throws IOException { 140 BufferedReader fi = new BufferedReader(new FileReader(new File(in))); 141 BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou))); 142 String first_line,data; 143 first_line = fi.readLine(); 144 do{ 145 data = fi.readLine(); 146 if( data == null){ 147 break; 148 }else{ 149 fw.write(data+"\n"); 150 } 151 }while(true); 177 } while (true); 152 178 fw.close(); 179 ff.close(); 153 180 return first_line; 154 181 } 182 155 183 /** 156 184 * Runs the demo. … … 158 186 public static void main(String[] args) throws IOException { 159 187 188 String bf = "person:"; 189 String file_path = "/home/waue/test.txt"; 190 160 191 final String file_tmp = "/tmp/HBaseRecord.text.tmp"; 161 192 final int mapTasks = 1; 162 193 final int reduceTasks = 1; 163 String[] column_family = { bf};194 String[] column_family = { bf }; 164 195 165 196 HBaseRecordPro setup = new HBaseRecordPro(); 166 197 167 String first_line = setup.parseFirstLine(setup.file_path, file_tmp);198 String first_line = parseFirstLine(file_path, file_tmp); 168 199 System.out.println(first_line); 169 HBaseRecordPro.cf = first_line.split(sp); 170 //test 171 for(int i =0 ; i< cf.length; i++){ 172 System.out.println("column["+i+"]"+bf+cf[i]); 173 } 174 175 BuildHTable build_table = new BuildHTable(setup.table_name,column_family); 200 // HBaseRecordPro.cf = first_line.split(sp); 201 202 203 // test 204 /* 205 for (int i = 0; i < 3; i++) { 206 System.out.println("column[" + i + "]=" + bf + cf[i]); 207 }*/ 208 209 BuildHTable build_table = new BuildHTable(setup.table_name, 210 column_family); 176 211 if (!build_table.checkTableExist(setup.table_name)) { 177 212 if (!build_table.createTable()) { … … 181 216 System.out.println("Table \"" + setup.table_name 182 217 + "\" has already existed !"); 183 } 218 } 184 219 JobConf conf = new JobConf(HBaseRecordPro.class); 185 220 FileSystem fileconf = FileSystem.get(conf); 186 fileconf.copyFromLocalFile(true, new Path(file_tmp), new Path(file_tmp));187 // Job name; you can modify to any you like221 fileconf.copyFromLocalFile(true, new Path(file_tmp), new Path(file_tmp)); 222 // Job name; you can modify to any you like 188 223 conf.setJobName("PersonDataBase"); 189 224 … … 198 233 conf.setCombinerClass(IdentityReducer.class); 199 234 conf.setReducerClass(ReduceClass.class); 235 200 236 JobClient.runJob(conf); 201 237
Note: See TracChangeset
for help on using the changeset viewer.