Changes between Version 4 and Version 5 of waue/2012/0120


Ignore:
Timestamp:
Jan 20, 2012, 6:32:10 PM (12 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2012/0120

    v4 v5  
    4242 = eclipse =
    4343
    44 由於 hadoop 與 hbase 的 lib 內有大部分的重複,但其中沒重複到的 library 又很重要,沒引入會出現錯誤。
     44
     45 * 由於 hadoop 與 hbase 的 lib 內有大部分的重複,但其中沒重複到的 library 又很重要,沒引入會出現錯誤。
    4546
    4647 1. 引入 hbase => hbase-core.jar
     
    5051 5. 引入 hadoop=> lib/commons-*.jar  (尤其是 commons-configuration-1.6.jar)
    5152
     53{{{
     54#!java
     55// WordCountHBase
     56//說明:
     57//      此程式碼將輸入路徑的檔案內的字串取出做字數統計
     58//      再將結果塞回HTable內
     59//
     60//運算方法:
     61//      將此程式運作在hadoop 1.0.0 + hbase 0.90.5 平台上
     62//
     63//結果:
     64// ---------------------------
     65//      $ hbase shell
     66//      > scan 'wordcount'
     67//      ROW             COLUMN+CELL
     68//      am              column=content:count, timestamp=1264406245488, value=1
     69//  chen        column=content:count, timestamp=1264406245488, value=1
     70//      hi,             column=content:count, timestamp=1264406245488, value=2
     71//  ......(略)
     72// ---------------------------
     73//注意:
     74//1.    在hdfs 上來源檔案的路徑為 "/user/$YOUR_NAME/input"
     75//      請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
     76//2.    運算完後,程式將執行結果放在hbase的wordcount資料表內
     77//
     78
     79import java.io.IOException;
     80
     81import org.apache.hadoop.conf.Configuration;
     82import org.apache.hadoop.fs.Path;
     83import org.apache.hadoop.hbase.HBaseConfiguration;
     84import org.apache.hadoop.hbase.HColumnDescriptor;
     85import org.apache.hadoop.hbase.HTableDescriptor;
     86import org.apache.hadoop.hbase.MasterNotRunningException;
     87import org.apache.hadoop.hbase.ZooKeeperConnectionException;
     88import org.apache.hadoop.hbase.client.HBaseAdmin;
     89import org.apache.hadoop.hbase.client.Put;
     90import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
     91import org.apache.hadoop.hbase.mapreduce.TableReducer;
     92import org.apache.hadoop.hbase.util.Bytes;
     93import org.apache.hadoop.io.IntWritable;
     94import org.apache.hadoop.io.LongWritable;
     95import org.apache.hadoop.io.Text;
     96import org.apache.hadoop.mapreduce.Job;
     97import org.apache.hadoop.mapreduce.Mapper;
     98import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     99import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     100
     101public class WordCountHBase {
     102       
     103        public Configuration hbase_config ;
     104        public Configuration hadoop_config;
     105        public HBaseAdmin hbase_admin;
     106        public String input ,tablename;
     107       
     108        public WordCountHBase() throws MasterNotRunningException, ZooKeeperConnectionException{
     109                hbase_config = HBaseConfiguration.create();
     110                hadoop_config = new Configuration();
     111                hbase_admin = new HBaseAdmin(hbase_config);
     112        }
     113       
     114        public void createHBaseTable(String family)
     115                        throws IOException {
     116                // HTableDescriptor contains the name of an HTable, and its column
     117                // families
     118                // HTableDescriptor 用來描述table的屬性
     119                HTableDescriptor htd = new HTableDescriptor(tablename);
     120                // HColumnDescriptor HColumnDescriptor contains information about a
     121                // column family such as the number of versions, compression settings,
     122                // etc.
     123                // HTableDescriptor 透過 add() 方法來加入Column family
     124
     125                htd.addFamily(new HColumnDescriptor(family));
     126
     127                // HBaseConfiguration 能接收 hbase-site.xml 的設定值
     128                // HBaseConfiguration config = new HBaseConfiguration();
     129               
     130
     131                // 檔案的操作則使用 HBaseAdmin
     132               
     133                // 檢查
     134                if (hbase_admin.tableExists(tablename)) {
     135                        System.out.println("Table: " + tablename + "Existed.");
     136                } else {
     137                        System.out.println("create new table: " + tablename);
     138                        // 建立
     139                        hbase_admin.createTable(htd);
     140                }
     141        }
     142
     143        public void drop() {
     144                // HBaseConfiguration conf = new HBaseConfiguration(); // deprecated ,
     145                // v0.90
     146               
     147                try {
     148                        hbase_admin = new HBaseAdmin(hbase_config);
     149                        if (hbase_admin.tableExists(tablename)) {
     150                                hbase_admin.disableTable(tablename);
     151                                hbase_admin.deleteTable(tablename);
     152                                System.out.println("Droped the table [" + tablename + "]");
     153                        } else {
     154                                System.out.println("Table [" + tablename + "] was not found!");
     155                        }
     156
     157                } catch (IOException e) {
     158                        e.printStackTrace();
     159                }
     160        }
     161
     162        public static class HtMap extends
     163                        Mapper<LongWritable, Text, Text, IntWritable> {
     164                private IntWritable one = new IntWritable(1);
     165
     166                public void map(LongWritable key, Text value, Context context)
     167                                throws IOException, InterruptedException {
     168                        // 輸入的字串先轉換成小寫再用空白區隔
     169                        String s[] = value.toString().toLowerCase().trim().split(" ");
     170                        for (String m : s) {
     171                                // 寫入到輸出串流
     172                                context.write(new Text(m), one);
     173                        }
     174                }
     175        }
     176
     177        // TableReducer<KEYIN,VALUEIN,KEYOUT>
     178        // 原本為 TableReducer<Text, IntWritable, NullWritable >
     179        // 但在此改成 LongWritable 也可以
     180        // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
     181        public static class HtReduce extends
     182                        TableReducer<Text, IntWritable, LongWritable> {
     183
     184                public void reduce(Text key, Iterable<IntWritable> values,
     185                                Context context) throws IOException, InterruptedException {
     186                        int sum = 0;
     187                        for (IntWritable i : values) {
     188                                sum += i.get();
     189                        }
     190
     191                        // org.apache.hadoop.hbase.client.Put
     192                        // Used to perform Put operations for a single row.
     193                        // new Put(byte[] row)
     194                        Put put = new Put(Bytes.toBytes(key.toString()));
     195
     196                        // add(byte[] family, byte[] qualifier, byte[] value)
     197                        // 在main設定output format class 為 TableOutputFormat
     198                        // TableReducer 內有定義 output Key class 必須為 Put 或 Delete
     199                        put.add(Bytes.toBytes("content"), Bytes.toBytes("word"),
     200                                        Bytes.toBytes(key.toString()));
     201                        put.add(Bytes.toBytes("content"), Bytes.toBytes("count"),
     202                                        Bytes.toBytes(String.valueOf(sum)));
     203
     204                        // NullWritable.get(): Returns the single instance of this class.
     205                        // NullWritable.write(): Serialize the fields of this object to out.
     206                        context.write(new LongWritable(), put);
     207                        // context.write(NullWritable.get(), put)
     208                }
     209        }
     210        public boolean run() throws IOException, InterruptedException, ClassNotFoundException{
     211                Job job = new Job(hadoop_config, "WordCount table with " + input);
     212
     213                job.setJarByClass(WordCountHBase.class);
     214
     215                job.setMapperClass(HtMap.class);
     216                job.setReducerClass(HtReduce.class);
     217                // 此範例的輸出為 <Text,IntWritable> 因此其實可以省略宣告
     218                // set{Map|Reduce}Output{Key|Value}Class()
     219                job.setMapOutputKeyClass(Text.class);
     220                job.setMapOutputValueClass(IntWritable.class);
     221                // InputFormat 只有一個子介面
     222                // FileInputFormat <-(SequenceFileInputFormat,TextInputFormat)
     223                // 其中TextInputFormat 最常用 ,預設輸入為 LongWritable,Text
     224                // 另外HBase 則設計了一個子類別 TableInputFormat
     225                job.setInputFormatClass(TextInputFormat.class);
     226                // TAbleOutputFormat
     227                // 宣告此行則可使 reduce 輸出為 HBase 的table
     228                job.setOutputFormatClass(TableOutputFormat.class);
     229
     230                // 原本設定輸入檔案為 Config.setInputPath(Path) 卻改為
     231                // FileInputFormat.addInputPath(Job, Path()) 的設計,
     232                // 猜測應該是考慮某些檔案操作並不需要跑mapreduce的Job,因此提到外面
     233                FileInputFormat.addInputPath(job, new Path(input));
     234
     235                return job.waitForCompletion(true) ? true : false;
     236        }
     237        public static void main(String argv[]) throws Exception {
     238                // debug
     239                String[] argc = { "/tmp/ProductEmp/input/" };
     240                argv = argc;
     241               
     242                WordCountHBase wchb = new WordCountHBase();
     243               
     244
     245                if (argv.length < 1) {
     246                        System.out.println("CountToHBaseReducer <inHdfsDir> ");
     247                        return;
     248                }
     249
     250                wchb.input = argv[0];
     251
     252                wchb.tablename = "wordcounthbase";
     253
     254                // OUTPUT_TABLE = "hbase.mapred.outputtable"
     255                // conf.set 用於設定 如 core-site.xml 的 name 與 value
     256                // 告訴程式 hbase.mapred.outputtable --> wordcount
     257               
     258                wchb.hadoop_config.set(TableOutputFormat.OUTPUT_TABLE, wchb.tablename);
     259
     260               
     261                // 建立hbase 的table 否則沒先建立會出錯
     262                wchb.drop();
     263                wchb.createHBaseTable( "content");
     264
     265                wchb.run();
     266        }
     267}
     268}}}
     269
    52270 = run =
    53271