Changes between Initial Version and Version 1 of hadoop_hbase_sample2


Ignore:
Timestamp:
Jun 6, 2008, 1:52:57 PM (16 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • hadoop_hbase_sample2

    v1 v1  
     1
     2{{{
     3/*
     4
     5 *  NCHC Hbase with map reduce sample code
     6
     7 *  DemoHBaseSlink.java
     8
     9 */
     10
     11
     12
     13package tw.org.nchc.demo;
     14
     15
     16
     17import java.io.IOException;
     18
     19import java.util.Iterator;
     20
     21
     22
     23import org.apache.hadoop.fs.Path;
     24
     25import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
     26
     27import org.apache.hadoop.hbase.mapred.TableReduce;
     28
     29import org.apache.hadoop.io.LongWritable;
     30
     31import org.apache.hadoop.io.MapWritable;
     32
     33import org.apache.hadoop.io.Text;
     34
     35import org.apache.hadoop.mapred.JobClient;
     36
     37import org.apache.hadoop.mapred.JobConf;
     38
     39import org.apache.hadoop.mapred.OutputCollector;
     40
     41import org.apache.hadoop.mapred.Reporter;
     42
     43import org.apache.hadoop.mapred.lib.IdentityMapper;
     44
     45import org.apache.hadoop.mapred.lib.IdentityReducer;
     46
     47
     48
     49/**
     50
     51 * This sample code will put the indicate data to Hbase.
     52
     53 * 1. put test.txt in t1 directory which content is
     54
     55---------------
     56
     57name:locate:years
     58
     59waue:taiwan:1981
     60
     61shellon:taiwan:1981
     62
     63---------------
     64
     65 * 2. hadoop_root/$ bin/hadoop dfs -put t1 t1
     66
     67 * 3. hbase_root/$ bin/hbase shell
     68
     69 * 4. hql > create table t1_table("person");
     70
     71 * 5. Come to Eclipse and run this code, and we will let database as that
     72
     73 t1_table -> person
     74
     75  ----------------
     76
     77  |  name | locate | years |
     78
     79  ----------------
     80
     81  | waue  | taiwan | 1981 |
     82
     83  ----------------
     84
     85  | shellon | taiwan | 1981 |
     86
     87  * 6. Go to hbase console, type : hql > select * from t1_table;
     88
     89     
     90
     9108/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key
     92
     93+-------------------------+-------------------------+-------------------------+
     94
     95| Row                     | Column                  | Cell                    |
     96
     97+-------------------------+-------------------------+-------------------------+
     98
     99| 0                       | person:locate           | locate                  |
     100
     101+-------------------------+-------------------------+-------------------------+
     102
     103| 0                       | person:name             | name                    |
     104
     105+-------------------------+-------------------------+-------------------------+
     106
     107| 0                       | person:years            | years                   |
     108
     109+-------------------------+-------------------------+-------------------------+
     110
     111| 19                      | person:locate           | taiwan                  |
     112
     113+-------------------------+-------------------------+-------------------------+
     114
     115| 19                      | person:name             | waue                    |
     116
     117+-------------------------+-------------------------+-------------------------+
     118
     119| 19                      | person:years            | 1981                    |
     120
     121+-------------------------+-------------------------+-------------------------+
     122
     123| 36                      | person:locate           | taiwan                  |
     124
     125+-------------------------+-------------------------+-------------------------+
     126
     127| 36                      | person:name             | shellon                 |
     128
     129+-------------------------+-------------------------+-------------------------+
     130
     131| 36                      | person:years            | 1981                    |
     132
     133+-------------------------+-------------------------+-------------------------+
     134
     1353 row(s) in set. (0.04 sec)
     136
     137 **/
     138
     139public class DemoHBaseSink {
     140
     141
     142
     143        private static class ReduceClass extends TableReduce<LongWritable, Text> {
     144
     145
     146
     147                // Column id is created dymanically,
     148
     149                private static final Text col_name = new Text("person:name");
     150
     151                private static final Text col_local = new Text("person:locate");
     152
     153                private static final Text col_year = new Text("person:years");
     154
     155               
     156
     157                // this map holds the columns per row
     158
     159                private MapWritable map = new MapWritable();   
     160
     161               
     162
     163                // on this sample, map is nonuse, we use reduce to handle
     164
     165                public void reduce(LongWritable key, Iterator<Text> values,
     166
     167                                OutputCollector<Text, MapWritable> output, Reporter reporter)
     168
     169                                throws IOException {
     170
     171
     172
     173                        // values.next().getByte() can get value and transfer to byte form, there is an other way that let decode()
     174
     175                        // to substitude getByte()
     176
     177                        String stro = new String(values.next().getBytes());
     178
     179                        String str[] = stro.split(":");
     180
     181                        byte b_local[] = str[0].getBytes();
     182
     183                        byte b_name[] = str[1].getBytes();
     184
     185                        byte b_year[] = str[2].getBytes();
     186
     187                       
     188
     189                        // contents must be ImmutableBytesWritable
     190
     191                        ImmutableBytesWritable w_local = new ImmutableBytesWritable( b_local);
     192
     193                        ImmutableBytesWritable w_name = new ImmutableBytesWritable( b_name );
     194
     195                        ImmutableBytesWritable w_year = new ImmutableBytesWritable( b_year );
     196
     197
     198
     199                        // populate the current row
     200
     201                        map.clear();
     202
     203                        map.put(col_name, w_local);
     204
     205                        map.put(col_local, w_name);
     206
     207                        map.put(col_year, w_year);
     208
     209
     210
     211                        // add the row with the key as the row id
     212
     213                        output.collect(new Text(key.toString()), map);
     214
     215                }
     216
     217        }
     218
     219
     220
     221        private DemoHBaseSink() {
     222
     223        }
     224
     225
     226
     227        /**
     228
     229         * Runs the demo.
     230
     231         */
     232
     233        public static void main(String[] args) throws IOException {
     234
     235                // which path of input files in Hadoop file system
     236
     237                String file_path = "/user/waue/t1";
     238
     239
     240
     241                int mapTasks = 1;
     242
     243                int reduceTasks = 1;
     244
     245
     246
     247                JobConf conf = new JobConf(DemoHBaseSink.class);
     248
     249
     250
     251                //Job name; you can modify to any you like 
     252
     253                conf.setJobName("DemoPersonBase");
     254
     255
     256
     257                // Hbase table name must be correct , in our profile is t1_table
     258
     259                TableReduce.initJob("t1_table", ReduceClass.class, conf);
     260
     261               
     262
     263                // below are map-reduce profile
     264
     265                conf.setNumMapTasks(mapTasks);
     266
     267                conf.setNumReduceTasks(reduceTasks);
     268
     269                conf.setInputPath(new Path(file_path));
     270
     271                conf.setMapperClass(IdentityMapper.class);
     272
     273                conf.setCombinerClass(IdentityReducer.class);
     274
     275                conf.setReducerClass(ReduceClass.class);
     276
     277                JobClient.runJob(conf);
     278
     279        }
     280
     281}
     282
     283}}}