Changes between Version 5 and Version 6 of waue/2012/0120


Ignore:
Timestamp:
Jan 20, 2012, 6:33:13 PM (12 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2012/0120

    v5 v6  
    5151 5. 引入 hadoop=> lib/commons-*.jar  (尤其是 commons-configuration-1.6.jar)
    5252
    53 {{{
    54 #!java
    55 // WordCountHBase
    56 //說明:
    57 //      此程式碼將輸入路徑的檔案內的字串取出做字數統計
    58 //      再將結果塞回HTable內
    59 //
    60 //運算方法:
    61 //      將此程式運作在hadoop 1.0.0 + hbase 0.90.5 平台上
    62 //
    63 //結果:
    64 // ---------------------------
    65 //      $ hbase shell
    66 //      > scan 'wordcount'
    67 //      ROW             COLUMN+CELL
    68 //      am              column=content:count, timestamp=1264406245488, value=1
    69 //  chen        column=content:count, timestamp=1264406245488, value=1
    70 //      hi,             column=content:count, timestamp=1264406245488, value=2
    71 //  ......(略)
    72 // ---------------------------
    73 //注意:
    74 //1.    在hdfs 上來源檔案的路徑為 "/user/$YOUR_NAME/input"
    75 //      請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
    76 //2.    運算完後,程式將執行結果放在hbase的wordcount資料表內
    77 //
    78 
    79 import java.io.IOException;
    80 
    81 import org.apache.hadoop.conf.Configuration;
    82 import org.apache.hadoop.fs.Path;
    83 import org.apache.hadoop.hbase.HBaseConfiguration;
    84 import org.apache.hadoop.hbase.HColumnDescriptor;
    85 import org.apache.hadoop.hbase.HTableDescriptor;
    86 import org.apache.hadoop.hbase.MasterNotRunningException;
    87 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
    88 import org.apache.hadoop.hbase.client.HBaseAdmin;
    89 import org.apache.hadoop.hbase.client.Put;
    90 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    91 import org.apache.hadoop.hbase.mapreduce.TableReducer;
    92 import org.apache.hadoop.hbase.util.Bytes;
    93 import org.apache.hadoop.io.IntWritable;
    94 import org.apache.hadoop.io.LongWritable;
    95 import org.apache.hadoop.io.Text;
    96 import org.apache.hadoop.mapreduce.Job;
    97 import org.apache.hadoop.mapreduce.Mapper;
    98 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    99 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    100 
    101 public class WordCountHBase {
    102        
    103         public Configuration hbase_config ;
    104         public Configuration hadoop_config;
    105         public HBaseAdmin hbase_admin;
    106         public String input ,tablename;
    107        
    108         public WordCountHBase() throws MasterNotRunningException, ZooKeeperConnectionException{
    109                 hbase_config = HBaseConfiguration.create();
    110                 hadoop_config = new Configuration();
    111                 hbase_admin = new HBaseAdmin(hbase_config);
    112         }
    113        
    114         public void createHBaseTable(String family)
    115                         throws IOException {
    116                 // HTableDescriptor contains the name of an HTable, and its column
    117                 // families
    118                 // HTableDescriptor 用來描述table的屬性
    119                 HTableDescriptor htd = new HTableDescriptor(tablename);
    120                 // HColumnDescriptor HColumnDescriptor contains information about a
    121                 // column family such as the number of versions, compression settings,
    122                 // etc.
    123                 // HTableDescriptor 透過 add() 方法來加入Column family
    124 
    125                 htd.addFamily(new HColumnDescriptor(family));
    126 
    127                 // HBaseConfiguration 能接收 hbase-site.xml 的設定值
    128                 // HBaseConfiguration config = new HBaseConfiguration();
    129                
    130 
    131                 // 檔案的操作則使用 HBaseAdmin
    132                
    133                 // 檢查
    134                 if (hbase_admin.tableExists(tablename)) {
    135                         System.out.println("Table: " + tablename + "Existed.");
    136                 } else {
    137                         System.out.println("create new table: " + tablename);
    138                         // 建立
    139                         hbase_admin.createTable(htd);
    140                 }
    141         }
    142 
    143         public void drop() {
    144                 // HBaseConfiguration conf = new HBaseConfiguration(); // deprecated ,
    145                 // v0.90
    146                
    147                 try {
    148                         hbase_admin = new HBaseAdmin(hbase_config);
    149                         if (hbase_admin.tableExists(tablename)) {
    150                                 hbase_admin.disableTable(tablename);
    151                                 hbase_admin.deleteTable(tablename);
    152                                 System.out.println("Droped the table [" + tablename + "]");
    153                         } else {
    154                                 System.out.println("Table [" + tablename + "] was not found!");
    155                         }
    156 
    157                 } catch (IOException e) {
    158                         e.printStackTrace();
    159                 }
    160         }
    161 
    162         public static class HtMap extends
    163                         Mapper<LongWritable, Text, Text, IntWritable> {
    164                 private IntWritable one = new IntWritable(1);
    165 
    166                 public void map(LongWritable key, Text value, Context context)
    167                                 throws IOException, InterruptedException {
    168                         // 輸入的字串先轉換成小寫再用空白區隔
    169                         String s[] = value.toString().toLowerCase().trim().split(" ");
    170                         for (String m : s) {
    171                                 // 寫入到輸出串流
    172                                 context.write(new Text(m), one);
    173                         }
    174                 }
    175         }
    176 
    177         // TableReducer<KEYIN,VALUEIN,KEYOUT>
    178         // 原本為 TableReducer<Text, IntWritable, NullWritable >
    179         // 但在此改成 LongWritable 也可以
    180         // 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
    181         public static class HtReduce extends
    182                         TableReducer<Text, IntWritable, LongWritable> {
    183 
    184                 public void reduce(Text key, Iterable<IntWritable> values,
    185                                 Context context) throws IOException, InterruptedException {
    186                         int sum = 0;
    187                         for (IntWritable i : values) {
    188                                 sum += i.get();
    189                         }
    190 
    191                         // org.apache.hadoop.hbase.client.Put
    192                         // Used to perform Put operations for a single row.
    193                         // new Put(byte[] row)
    194                         Put put = new Put(Bytes.toBytes(key.toString()));
    195 
    196                         // add(byte[] family, byte[] qualifier, byte[] value)
    197                         // 在main設定output format class 為 TableOutputFormat
    198                         // TableReducer 內有定義 output Key class 必須為 Put 或 Delete
    199                         put.add(Bytes.toBytes("content"), Bytes.toBytes("word"),
    200                                         Bytes.toBytes(key.toString()));
    201                         put.add(Bytes.toBytes("content"), Bytes.toBytes("count"),
    202                                         Bytes.toBytes(String.valueOf(sum)));
    203 
    204                         // NullWritable.get(): Returns the single instance of this class.
    205                         // NullWritable.write(): Serialize the fields of this object to out.
    206                         context.write(new LongWritable(), put);
    207                         // context.write(NullWritable.get(), put)
    208                 }
    209         }
    210         public boolean run() throws IOException, InterruptedException, ClassNotFoundException{
    211                 Job job = new Job(hadoop_config, "WordCount table with " + input);
    212 
    213                 job.setJarByClass(WordCountHBase.class);
    214 
    215                 job.setMapperClass(HtMap.class);
    216                 job.setReducerClass(HtReduce.class);
    217                 // 此範例的輸出為 <Text,IntWritable> 因此其實可以省略宣告
    218                 // set{Map|Reduce}Output{Key|Value}Class()
    219                 job.setMapOutputKeyClass(Text.class);
    220                 job.setMapOutputValueClass(IntWritable.class);
    221                 // InputFormat 只有一個子介面
    222                 // FileInputFormat <-(SequenceFileInputFormat,TextInputFormat)
    223                 // 其中TextInputFormat 最常用 ,預設輸入為 LongWritable,Text
    224                 // 另外HBase 則設計了一個子類別 TableInputFormat
    225                 job.setInputFormatClass(TextInputFormat.class);
    226                 // TAbleOutputFormat
    227                 // 宣告此行則可使 reduce 輸出為 HBase 的table
    228                 job.setOutputFormatClass(TableOutputFormat.class);
    229 
    230                 // 原本設定輸入檔案為 Config.setInputPath(Path) 卻改為
    231                 // FileInputFormat.addInputPath(Job, Path()) 的設計,
    232                 // 猜測應該是考慮某些檔案操作並不需要跑mapreduce的Job,因此提到外面
    233                 FileInputFormat.addInputPath(job, new Path(input));
    234 
    235                 return job.waitForCompletion(true) ? true : false;
    236         }
    237         public static void main(String argv[]) throws Exception {
    238                 // debug
    239                 String[] argc = { "/tmp/ProductEmp/input/" };
    240                 argv = argc;
    241                
    242                 WordCountHBase wchb = new WordCountHBase();
    243                
    244 
    245                 if (argv.length < 1) {
    246                         System.out.println("CountToHBaseReducer <inHdfsDir> ");
    247                         return;
    248                 }
    249 
    250                 wchb.input = argv[0];
    251 
    252                 wchb.tablename = "wordcounthbase";
    253 
    254                 // OUTPUT_TABLE = "hbase.mapred.outputtable"
    255                 // conf.set 用於設定 如 core-site.xml 的 name 與 value
    256                 // 告訴程式 hbase.mapred.outputtable --> wordcount
    257                
    258                 wchb.hadoop_config.set(TableOutputFormat.OUTPUT_TABLE, wchb.tablename);
    259 
    260                
    261                 // 建立hbase 的table 否則沒先建立會出錯
    262                 wchb.drop();
    263                 wchb.createHBaseTable( "content");
    264 
    265                 wchb.run();
    266         }
    267 }
    268 }}}
     53 * WordCountHBase.java
    26954
    27055 = run =