Changes between Initial Version and Version 1 of waue/2010/0125


Ignore:
Timestamp:
Jan 25, 2010, 4:29:39 PM (14 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2010/0125

    v1 v1  
     1 = hbase 程式範例 =
     2
     3 = 範例一 =
     4{{{
     5#!java
     6
     7package hbase020;
     8// WordCountHBase
     9//說明:
     10//      此程式碼將輸入路徑的檔案內的字串取出做字數統計
     11//      再將結果塞回HTable內
     12//
     13//運算方法:
     14//      將此程式運作在hadoop 0.20 平台上,用(參考2)的方法加入hbase參數後,將此程式碼打包成XX.jar
     15//  執行:
     16//      ---------------------------
     17//      hadoop jar XX.jar WordCountHBase <hdfs_input>
     18//      ---------------------------
     19//
     20//結果:
     21// ---------------------------
     22//      $ hbase shell
     23//      > scan 'wordcount'
     24//      ROW             COLUMN+CELL
     25//      am              column=content:count, timestamp=1264406245488, value=1
     26//  chen        column=content:count, timestamp=1264406245488, value=1
     27//      hi,             column=content:count, timestamp=1264406245488, value=2
     28//  ......(略)
     29// ---------------------------
     30//注意:
     31//1.    在hdfs 上來源檔案的路徑為 "/user/$YOUR_NAME/input"
     32//      請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
     33//2.    運算完後,程式將執行結果放在hbase的wordcount資料表內
     34//
     35//參考:
     36//      1.程式碼改編於: http://blog.ring.idv.tw/comment.ser?i=337
     37//  2.hbase 運作 mapreduce 程式的方法參考於:http://wiki.apache.org/hadoop/Hbase/MapReduce
     38
     39import java.io.IOException;
     40
     41import org.apache.hadoop.conf.Configuration;
     42import org.apache.hadoop.fs.Path;
     43import org.apache.hadoop.hbase.HBaseConfiguration;
     44import org.apache.hadoop.hbase.HColumnDescriptor;
     45import org.apache.hadoop.hbase.HTableDescriptor;
     46import org.apache.hadoop.hbase.client.HBaseAdmin;
     47import org.apache.hadoop.hbase.client.Put;
     48import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
     49import org.apache.hadoop.hbase.mapreduce.TableReducer;
     50import org.apache.hadoop.hbase.util.Bytes;
     51import org.apache.hadoop.io.IntWritable;
     52import org.apache.hadoop.io.LongWritable;
     53import org.apache.hadoop.io.Text;
     54import org.apache.hadoop.mapreduce.Job;
     55import org.apache.hadoop.mapreduce.Mapper;
     56import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     57import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     58
     59public class WordCountHBase {
     60        public static class HtMap extends
     61                        Mapper<LongWritable, Text, Text, IntWritable> {
     62                private IntWritable one = new IntWritable(1);
     63
     64                public void map(LongWritable key, Text value, Context context)
     65                                throws IOException, InterruptedException {
     66                        // 輸入的字串先轉換成小寫再用空白區隔
     67                        String s[] = value.toString().toLowerCase().trim().split(" ");
     68                        for (String m : s) {
     69                                // 寫入到輸出串流
     70                                context.write(new Text(m), one);
     71                        }
     72                }
     73        }
     74        // TableReducer<KEYIN,VALUEIN,KEYOUT>
     75        // 原本為 TableReducer<Text, IntWritable, NullWritable >
     76        // 但在此改成 LongWritable 也可以
     77        // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
     78        public static class HtReduce extends
     79                        TableReducer<Text, IntWritable,LongWritable > {
     80               
     81                public void reduce(Text key, Iterable<IntWritable> values,
     82                                Context context) throws IOException, InterruptedException {
     83                        int sum = 0;
     84                        for (IntWritable i : values) {
     85                                sum += i.get();
     86                        }
     87                       
     88                        // org.apache.hadoop.hbase.client.Put
     89                        // Used to perform Put operations for a single row.
     90                        // new Put(byte[] row)
     91                        Put put = new Put(Bytes.toBytes(key.toString()));
     92                       
     93                       
     94                        // add(byte[] family, byte[] qualifier, byte[] value)
     95                        // 在main設定output format class 為 TableOutputFormat 
     96                        // TableReducer 內有定義 output Key class 必須為 Put 或 Delete
     97                        put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes
     98                                        .toBytes(String.valueOf(sum)));
     99                                               
     100                        //NullWritable.get(): Returns the single instance of this class.
     101                        //NullWritable.write(): Serialize the fields of this object to out.
     102                        context.write(new LongWritable(), put);
     103                        // context.write(NullWritable.get(), put)
     104                }
     105        }
     106
     107        public static void createHBaseTable(String tablename) throws IOException {
     108                // HTableDescriptor contains the name of an HTable, and its column
     109                // families
     110                // HTableDescriptor 用來描述table的屬性
     111                HTableDescriptor htd = new HTableDescriptor(tablename);
     112                // HColumnDescriptor HColumnDescriptor contains information about a
     113                // column family such as the number of versions, compression settings,
     114                // etc.
     115                // HTableDescriptor 透過 add() 方法來加入Column family
     116                htd.addFamily(new HColumnDescriptor("content:"));
     117                // HBaseConfiguration 能接收 hbase-site.xml 的設定值
     118                HBaseConfiguration config = new HBaseConfiguration();
     119                // 檔案的操作則使用 HBaseAdmin
     120                HBaseAdmin admin = new HBaseAdmin(config);
     121                // 檢查
     122                if (admin.tableExists(tablename)) {
     123                        // 停止
     124                        admin.disableTable(tablename);
     125                        // 刪除
     126                        admin.deleteTable(tablename);
     127                }
     128                System.out.println("create new table: " + tablename);
     129                // 建立
     130                admin.createTable(htd);
     131        }
     132
     133        public static void main(String args[]) throws Exception {
     134                // debug
     135                String[] argv = { "/user/waue/input" };
     136                args = argv;
     137                String input = args[0];
     138
     139                String tablename = "wordcount";
     140
     141                Configuration conf = new Configuration();
     142                // OUTPUT_TABLE = "hbase.mapred.outputtable"
     143                // conf.set 用於設定 如 core-site.xml 的 name 與 value
     144                // 告訴程式 hbase.mapred.outputtable --> wordcount
     145                conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
     146                // 建立hbase 的table 否則沒先建立會出錯
     147                createHBaseTable(tablename);
     148
     149                Job job = new Job(conf, "WordCount table with " + input);
     150
     151                job.setJarByClass(WordCountHBase.class);
     152                job.setNumReduceTasks(1);
     153                job.setMapperClass(HtMap.class);
     154                job.setReducerClass(HtReduce.class);
     155                // 此範例的輸出為 <Text,IntWritable> 因此其實可以省略宣告
     156                // set{Map|Reduce}Output{Key|Value}Class()
     157                job.setMapOutputKeyClass(Text.class);
     158                job.setMapOutputValueClass(IntWritable.class);
     159                // InputFormat 只有一個子介面
     160                // FileInputFormat <-(SequenceFileInputFormat,TextInputFormat)
     161                // 其中TextInputFormat 最常用 ,預設輸入為 LongWritable,Text
     162                // 另外HBase 則設計了一個子類別 TableInputFormat
     163                job.setInputFormatClass(TextInputFormat.class);
     164                // TAbleOutputFormat
     165                // 宣告此行則可使 reduce 輸出為 HBase 的table
     166                job.setOutputFormatClass(TableOutputFormat.class);
     167
     168                // 原本設定輸入檔案為 Config.setInputPath(Path) 卻改為
     169                // FileInputFormat.addInputPath(Job, Path()) 的設計,
     170                // 猜測應該是考慮某些檔案操作並不需要跑mapreduce的Job,因此提到外面
     171                FileInputFormat.addInputPath(job, new Path(input));
     172
     173                System.exit(job.waitForCompletion(true) ? 0 : 1);
     174        }
     175}
     176}}}
     177
     178
     179 = 範例二 =
     180
     181{{{
     182#!java
     183
     184package hbase020;
     185
     186// from hbase website
     187// http://hadoop.apache.org/hbase/docs/current/api/org/apache/.接.
     188//  hadoop/hbase/client/package-summary.html#package_description
     189
     190import java.io.IOException;
     191
     192import org.apache.hadoop.hbase.HBaseConfiguration;
     193import org.apache.hadoop.hbase.HColumnDescriptor;
     194import org.apache.hadoop.hbase.HTableDescriptor;
     195import org.apache.hadoop.hbase.client.Get;
     196import org.apache.hadoop.hbase.client.HBaseAdmin;
     197import org.apache.hadoop.hbase.client.HTable;
     198import org.apache.hadoop.hbase.client.Put;
     199import org.apache.hadoop.hbase.client.Result;
     200import org.apache.hadoop.hbase.client.ResultScanner;
     201import org.apache.hadoop.hbase.client.Scan;
     202import org.apache.hadoop.hbase.util.Bytes;
     203
     204// Class that has nothing but a main.
     205// Does a Put, Get and a Scan against an hbase table.
     206public class MyLittleHBaseClient {
     207        public static void createHBaseTable(String tablename) throws IOException {
     208                // HTableDescriptor contains the name of an HTable, and its column
     209                // families
     210                // HTableDescriptor 用來描述table的屬性
     211                HTableDescriptor htd = new HTableDescriptor(tablename);
     212                // HColumnDescriptor HColumnDescriptor contains information about a
     213                // column family such as the number of versions, compression settings,
     214                // etc.
     215                // HTableDescriptor 透過 add() 方法來加入Column family
     216                htd.addFamily(new HColumnDescriptor("content:"));
     217                // HBaseConfiguration 能接收 hbase-site.xml 的設定值
     218                HBaseConfiguration config = new HBaseConfiguration();
     219                // 檔案的操作則使用 HBaseAdmin
     220                HBaseAdmin admin = new HBaseAdmin(config);
     221                // 檢查
     222                if (admin.tableExists(tablename)) {
     223                        // 停止
     224                        admin.disableTable(tablename);
     225                        // 刪除
     226                        admin.deleteTable(tablename);
     227                }
     228                System.out.println("create new table: " + tablename);
     229                // 建立
     230                admin.createTable(htd);
     231        }
     232        public static void main(String[] args) throws IOException {
     233                // You need a configuration object to tell the client where to connect.
     234                // When you create a HBaseConfiguration, it reads in whatever you've set
     235                // into your hbase-site.xml and in hbase-default.xml, as long as these
     236                // can
     237                // be found on the CLASSPATH
     238                HBaseConfiguration config = new HBaseConfiguration();
     239
     240                // This instantiates an HTable object that connects you to
     241                // the "myLittleHBaseTable" table.
     242                HTable table = new HTable(config, "myLittleHBaseTable");
     243
     244                // To add to a row, use Put. A Put constructor takes the name of the row
     245                // you want to insert into as a byte array. In HBase, the Bytes class
     246                // has
     247                // utility for converting all kinds of java types to byte arrays. In the
     248                // below, we are converting the String "myLittleRow" into a byte array
     249                // to
     250                // use as a row key for our update. Once you have a Put instance, you
     251                // can
     252                // adorn it by setting the names of columns you want to update on the
     253                // row,
     254                // the timestamp to use in your update, etc.If no timestamp, the server
     255                // applies current time to the edits.
     256                Put p = new Put(Bytes.toBytes("myLittleRow"));
     257
     258                // To set the value you'd like to update in the row 'myRow', specify the
     259                // column family, column qualifier, and value of the table cell you'd
     260                // like
     261                // to update. The column family must already exist in your table schema.
     262                // The qualifier can be anything. All must be specified as byte arrays
     263                // as
     264                // hbase is all about byte arrays. Lets pretend the table
     265                // 'myLittleHBaseTable' was created with a family 'myLittleFamily'.
     266                p.add(Bytes.toBytes("myLittleFamily"), Bytes.toBytes("someQualifier"),
     267                                Bytes.toBytes("Some Value"));
     268
     269                // Once you've adorned your Put instance with all the updates you want
     270                // to
     271                // make, to commit it do the following (The HTable#put method takes the
     272                // Put instance you've been building and pushes the changes you made
     273                // into
     274                // hbase)
     275                table.put(p);
     276
     277                // Now, to retrieve the data we just wrote. The values that come back
     278                // are
     279                // Result instances. Generally, a Result is an object that will package
     280                // up
     281                // the hbase return into the form you find most palatable.
     282                Get g = new Get(Bytes.toBytes("myLittleRow"));
     283                Result r = table.get(g);
     284                byte[] value = r.getValue(Bytes.toBytes("myLittleFamily"), Bytes
     285                                .toBytes("someQualifier"));
     286                // If we convert the value bytes, we should get back 'Some Value', the
     287                // value we inserted at this location.
     288                String valueStr = Bytes.toString(value);
     289                System.out.println("GET: " + valueStr);
     290
     291                // Sometimes, you won't know the row you're looking for. In this case,
     292                // you
     293                // use a Scanner. This will give you cursor-like interface to the
     294                // contents
     295                // of the table. To set up a Scanner, do like you did above making a Put
     296                // and a Get, create a Scan. Adorn it with column names, etc.
     297                Scan s = new Scan();
     298                s.addColumn(Bytes.toBytes("myLittleFamily"), Bytes
     299                                .toBytes("someQualifier"));
     300                ResultScanner scanner = table.getScanner(s);
     301                try {
     302                        // Scanners return Result instances.
     303                        // Now, for the actual iteration. One way is to use a while loop
     304                        // like so:
     305                        for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
     306                                // print out the row we found and the columns we were looking
     307                                // for
     308                                System.out.println("Found row: " + rr);
     309                        }
     310
     311                        // The other approach is to use a foreach loop. Scanners are
     312                        // iterable!
     313                        // for (Result rr : scanner) {
     314                        // System.out.println("Found row: " + rr);
     315                        // }
     316                } finally {
     317                        // Make sure you close your scanners when you are done!
     318                        // Thats why we have it inside a try/finally clause
     319                        scanner.close();
     320                }
     321        }
     322}
     323
     324}}}