Changeset 17


Ignore:
Timestamp:
Jul 1, 2008, 11:22:41 AM (16 years ago)
Author:
waue
Message:

more comments; much readable

File:
1 edited

Legend:

Unmodified
Added
Removed
  • sample/HBaseRecordPro.java

    r16 r17  
    88/**
    99 * Purpose :
    10  *  Parse your record and then store in HBase.
     10 *  First, program would parse your record and create Hbase.\
     11 *  Then it sets the first line as column qualify \
     12 *  Finally it stores in HBase automatically.
    1113 *
    1214 * HowToUse :
    13  *  Make sure Hadoop file system and Hbase are running correctly.
    14  *  1. put test.txt in t1 directory which content is
    15  ---------------
    16  name:locate:years
    17  waue:taiwan:1981
    18  shellon:taiwan:1981
    19  ---------------
    20  *  2. hadoop_root/$ bin/hadoop dfs -put t1 t1
    21  *  3. hbase_root/$ bin/hbase shell
    22  *  4. hql > create table t1_table("person");
    23  *  5. Come to Eclipse and run this code, and we will let database as that
    24  t1_table -> person
    25  ----------------
    26  |  name | locate | years |
    27  | waue  | taiwan | 1981 |
    28  | shellon | taiwan | 1981 |
    29  ----------------
     15 *  Make sure two thing :
     16 *  1. source_file must be regular as follow:
     17 *    first line: qualify1:qualify2:...:qualifyN
     18 *    other line: records1:records2:...:recordsN
     19 *     (the number N of qualify must be the same as records )
     20-----------------
     21name:locate:years
     22waue:taiwan:1981
     23rock:taiwan:1981
     24aso:taiwan:1981
     25jazz:taiwan:1982
     26-----------------
     27 *  2. source_file path must be correct.
     28
    3029 * Check Result:
    3130 *  Go to hbase console, type :
    3231 *    hql > select * from t1_table;
    3332 08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key
    34  +-------------------------+-------------------------+-------------------------+
    35  | Row                     | Column                  | Cell                    |
    36  +-------------------------+-------------------------+-------------------------+
    37  | 0                       | person:locate           | locate                  |
    38  +-------------------------+-------------------------+-------------------------+
    39  | 0                       | person:name             | name                    |
    40  +-------------------------+-------------------------+-------------------------+
    41  | 0                       | person:years            | years                   |
    42  +-------------------------+-------------------------+-------------------------+
    43  | 19                      | person:locate           | taiwan                  |
    44  +-------------------------+-------------------------+-------------------------+
    45  | 19                      | person:name             | waue                    |
    46  +-------------------------+-------------------------+-------------------------+
    47  | 19                      | person:years            | 1981                    |
    48  +-------------------------+-------------------------+-------------------------+
    49  | 36                      | person:locate           | taiwan                  |
    50  +-------------------------+-------------------------+-------------------------+
    51  | 36                      | person:name             | shellon                 |
    52  +-------------------------+-------------------------+-------------------------+
    53  | 36                      | person:years            | 1981                    |
    54  +-------------------------+-------------------------+-------------------------+
    55  3 row(s) in set. (0.04 sec)
     33
     34+-------------------------+-------------------------+-------------------------+
     35| Row                     | Column                  | Cell                    |
     36+-------------------------+-------------------------+-------------------------+
     37| 0                       | member:locate           | taiwan                  |
     38+-------------------------+-------------------------+-------------------------+
     39| 0                       | member:name             | waue                    |
     40+-------------------------+-------------------------+-------------------------+
     41| 0                       | member:years            | 1981                    |
     42+-------------------------+-------------------------+-------------------------+
     43| 17                      | member:locate           | taiwan                  |
     44+-------------------------+-------------------------+-------------------------+
     45| 17                      | member:name             | rock                    |
     46+-------------------------+-------------------------+-------------------------+
     47| 17                      | member:years            | 1981                    |
     48+-------------------------+-------------------------+-------------------------+
     49| 34                      | member:locate           | taiwan                  |
     50+-------------------------+-------------------------+-------------------------+
     51| 34                      | member:name             | aso                     |
     52+-------------------------+-------------------------+-------------------------+
     53| 34                      | member:years            | 1981                    |
     54+-------------------------+-------------------------+-------------------------+
     55| 50                      | member:locate           | taiwan                  |
     56+-------------------------+-------------------------+-------------------------+
     57| 50                      | member:name             | jazz                    |
     58+-------------------------+-------------------------+-------------------------+
     59| 50                      | member:years            | 1982                    |
     60+-------------------------+-------------------------+-------------------------+
     614 row(s) in set. (0.31 sec)
     62
    5663 */
    5764
     
    6168import java.io.BufferedWriter;
    6269import java.io.File;
    63 import java.io.FileOutputStream;
    6470import java.io.FileReader;
    6571import java.io.FileWriter;
    6672import java.io.IOException;
    6773import java.util.Iterator;
     74
    6875import org.apache.hadoop.fs.FileSystem;
    6976import org.apache.hadoop.fs.Path;
     
    8087import org.apache.hadoop.mapred.lib.IdentityReducer;
    8188
    82 
    83 
    84 class ReduceClass extends TableReduce<LongWritable, Text> {
     89public class HBaseRecordPro {
     90  /* Major parameter */
     91  // it indicates local path, not hadoop file system path
     92  final static String source_file = "/home/waue/test.txt";
     93
     94  /* Minor parameter */
     95  // column family name
     96  final static String column_family = "member:";
     97
     98  // table name
     99  final static String table_name = "HBaseRecord";
     100
     101  // separate char
     102  final static String sp = ":";
     103 
     104  // conf tmp with column qualify
     105  final static String conf_tmp = "/tmp/HBaseRecordPro.firstLine.tmp";
     106 
     107  // data source tmp
     108  final static String text_tmp = "/tmp/HBaseRecord.text.tmp";
     109
    85110  // on this sample, map is nonuse, we use reduce to handle
    86   public void reduce(LongWritable key, Iterator<Text> values,
    87       OutputCollector<Text, MapWritable> output, Reporter reporter)
     111  private static class ReduceClass extends TableReduce<LongWritable, Text> {
     112    public void reduce(LongWritable key, Iterator<Text> values,
     113        OutputCollector<Text, MapWritable> output, Reporter reporter)
     114        throws IOException {
     115
     116      // read the configure file
     117      BufferedReader fconf = new BufferedReader(new FileReader(new File(
     118          conf_tmp)));
     119      String first_line = fconf.readLine();
     120      fconf.close();
     121      // extract cf data
     122      String[] cf = first_line.split(sp);
     123      int length = cf.length;
     124       
     125      // values.next().getByte() can get value and transfer to byte form,
     126      String stro = new String(values.next().getBytes());
     127      String str[] = stro.split(sp);
     128
     129      // Column id is created dymanically,
     130      Text[] col_n = new Text[length];
     131      byte[][] b_l = new byte[length][];
     132      // contents must be ImmutableBytesWritable
     133      ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
     134
     135      // This map connect to hbase table and holds the columns per row
     136      MapWritable map = new MapWritable();
     137      map.clear();
     138
     139      // prepare to write data into map
     140      for (int i = 0; i < length; i++) {
     141        col_n[i] = new Text(column_family + cf[i]);
     142        b_l[i] = str[i].getBytes();
     143        w_l[i] = new ImmutableBytesWritable(b_l[i]);
     144        // populate the current row
     145        map.put(col_n[i], w_l[i]);
     146      }
     147      // add the row with the key as the row id
     148      output.collect(new Text(key.toString()), map);
     149    }
     150  }
     151
     152  public HBaseRecordPro() {
     153  }
     154 
     155  // This function can split the source text into two file, \
     156  //  conf_tmp file recorded first line is used to set column qualify \
     157  //  text_tmp , ou, recorded data is used to store into table.
     158  public String parseFirstLine(String in, String ou)
    88159      throws IOException {
    89     String sp = ":";
    90     String bf = "person:";
    91     // this map holds the columns per row
    92     MapWritable map = new MapWritable();
    93     // values.next().getByte() can get value and transfer to byte form,
    94     String stro = new String(values.next().getBytes());
    95     String str[] = stro.split(sp);
    96    
    97     BufferedReader fconf = new BufferedReader(new FileReader(new File("/tmp/fi_conf.tmp")));
    98     // int length = cf.length;
    99    
    100    
    101    
    102     // test for debug
    103     FileOutputStream out = new FileOutputStream(new File(
    104         "/home/waue/mr-result.txt"));
    105     String first_line = fconf.readLine();
    106    
    107     // test for debug
    108     String[] cf = first_line.split(sp);
    109     int length = cf.length;
    110     for(int i=0 ; i<length; i ++){
    111       out.write((bf + cf[i]+"\n").getBytes());
    112      
    113     }
    114     out.close();
    115     // Column id is created dymanically,
    116     Text[] col_n = new Text[length];
    117     byte[][] b_l = new byte[length][];
    118     // contents must be ImmutableBytesWritable
    119     ImmutableBytesWritable[] w_l = new ImmutableBytesWritable[length];
    120     map.clear();
    121 
    122     for (int i = 0; i < length; i++) {
    123       col_n[i] = new Text(bf + cf[i]);
    124       b_l[i] = str[i].getBytes();
    125       w_l[i] = new ImmutableBytesWritable(b_l[i]);
    126       // populate the current row
    127       map.put(col_n[i], w_l[i]);
    128     }
    129     // add the row with the key as the row id
    130     output.collect(new Text(key.toString()), map);
    131   }
    132 }
    133 
    134 public class HBaseRecordPro {
    135 
    136   /* Denify parameter */
    137 
    138   // file path in hadoop file system (not phisical file system)
    139   final String file_path = "/home/waue/test.txt";
    140 
    141   // setup MapTask and Reduce Task
    142 
    143   final String bf = "person:";
    144 
    145   final String table_name = "testend";
    146 
    147   final String sp = ":";
    148 
    149   String[] cf ;
    150  
    151   String test;
    152 
    153   public HBaseRecordPro() {
    154 
    155 
    156   }
    157   public HBaseRecordPro(String[] st) {
    158     cf = st;
    159   }
    160 
    161   static public String parseFirstLine(String in, String ou) throws IOException {
     160
    162161    BufferedReader fi = new BufferedReader(new FileReader(new File(in)));
    163     BufferedWriter ff = new BufferedWriter(new FileWriter(new File("/tmp/fi_conf.tmp")));
     162    BufferedWriter ff = new BufferedWriter(new FileWriter(new File(conf_tmp)));
    164163    BufferedWriter fw = new BufferedWriter(new FileWriter(new File(ou)));
    165164    String first_line, data;
     
    180179    return first_line;
    181180  }
    182 
     181  // tmp file delete
     182  boolean deleteFile(String str)throws IOException{
     183    File df = new File(str);
     184   
     185    if(df.exists()){
     186      if(!df.delete()){
     187        System.err.print("delete file error !");
     188      }
     189    }else{
     190      System.out.println("file not exit!");
     191    }
     192    return true;
     193  }
    183194  /**
    184195   * Runs the demo.
     
    186197  public static void main(String[] args) throws IOException {
    187198
    188     String bf = "person:";
    189     String file_path = "/home/waue/test.txt";
     199    HBaseRecordPro setup = new HBaseRecordPro();
     200    String[] col_family = {column_family};
     201    Path text_path = new Path(text_tmp);
    190202   
    191     final String file_tmp = "/tmp/HBaseRecord.text.tmp";
    192     final int mapTasks = 1;
    193     final int reduceTasks = 1;
    194     String[] column_family = { bf };
    195    
    196     HBaseRecordPro setup = new HBaseRecordPro();
    197    
    198     String first_line = parseFirstLine(file_path, file_tmp);
    199     System.out.println(first_line);
    200 //    HBaseRecordPro.cf = first_line.split(sp);
    201 
    202    
    203     // test
    204     /*
    205     for (int i = 0; i < 3; i++) {
    206       System.out.println("column[" + i + "]=" + bf + cf[i]);
    207     }*/
    208 
    209     BuildHTable build_table = new BuildHTable(setup.table_name,
    210         column_family);
    211     if (!build_table.checkTableExist(setup.table_name)) {
     203    setup.parseFirstLine(source_file, text_tmp);
     204//    System.out.println(first_line);
     205
     206    BuildHTable build_table = new BuildHTable(table_name,
     207        col_family);
     208    if (!build_table.checkTableExist(table_name)) {
    212209      if (!build_table.createTable()) {
    213210        System.out.println("create table error !");
    214211      }
    215212    } else {
    216       System.out.println("Table \"" + setup.table_name
     213      System.out.println("Table \"" + table_name
    217214          + "\" has already existed !");
    218215    }
    219216    JobConf conf = new JobConf(HBaseRecordPro.class);
    220217    FileSystem fileconf = FileSystem.get(conf);
    221     fileconf.copyFromLocalFile(true, new Path(file_tmp), new Path(file_tmp));
     218    fileconf.copyFromLocalFile(true, text_path, text_path);
    222219    // Job name; you can modify to any you like
    223220    conf.setJobName("PersonDataBase");
    224 
     221    final int mapTasks = 1;
     222    final int reduceTasks = 1;
    225223    // Hbase table name must be correct , in our profile is t1_table
    226     TableReduce.initJob(setup.table_name, ReduceClass.class, conf);
    227    
     224    TableReduce.initJob(table_name, ReduceClass.class, conf);
     225
    228226    // below are map-reduce profile
    229227    conf.setNumMapTasks(mapTasks);
    230228    conf.setNumReduceTasks(reduceTasks);
    231     conf.setInputPath(new Path(file_tmp));
     229    conf.setInputPath(text_path);
    232230    conf.setMapperClass(IdentityMapper.class);
    233231    conf.setCombinerClass(IdentityReducer.class);
     
    236234    JobClient.runJob(conf);
    237235   
     236    // delete tmp file
     237    FileSystem.get(conf).delete(text_path);
     238    setup.deleteFile(conf_tmp);
    238239  }
    239240}
Note: See TracChangeset for help on using the changeset viewer.