// WordCountHBase
//說明:
// 此程式碼將輸入路徑的檔案內的字串取出做字數統計
// 再將結果塞回HTable內
//
//運算方法:
// 將此程式運作在hadoop 1.0.0 + hbase 0.90.5 平台上
//
//結果:
// ---------------------------
// $ hbase shell
// > scan 'wordcount'
// ROW COLUMN+CELL
// am column=content:count, timestamp=1264406245488, value=1
// chen column=content:count, timestamp=1264406245488, value=1
// hi, column=content:count, timestamp=1264406245488, value=2
// ......(略)
// ---------------------------
//注意:
//1. 在hdfs 上來源檔案的路徑為 "/user/$YOUR_NAME/input"
// 請注意必須先放資料到此hdfs上的資料夾內,且此資料夾內只能放檔案,不可再放資料夾
//2. 運算完後,程式將執行結果放在hbase的wordcount資料表內
//
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class WordCountHBase {
public Configuration hbase_config ;
public Configuration hadoop_config;
public HBaseAdmin hbase_admin;
public String input ,tablename;
public WordCountHBase() throws MasterNotRunningException, ZooKeeperConnectionException{
hbase_config = HBaseConfiguration.create();
hadoop_config = new Configuration();
hbase_admin = new HBaseAdmin(hbase_config);
}
public void createHBaseTable(String family)
throws IOException {
// HTableDescriptor contains the name of an HTable, and its column
// families
// HTableDescriptor 用來描述table的屬性
HTableDescriptor htd = new HTableDescriptor(tablename);
// HColumnDescriptor HColumnDescriptor contains information about a
// column family such as the number of versions, compression settings,
// etc.
// HTableDescriptor 透過 add() 方法來加入Column family
htd.addFamily(new HColumnDescriptor(family));
// HBaseConfiguration 能接收 hbase-site.xml 的設定值
// HBaseConfiguration config = new HBaseConfiguration();
// 檔案的操作則使用 HBaseAdmin
// 檢查
if (hbase_admin.tableExists(tablename)) {
System.out.println("Table: " + tablename + "Existed.");
} else {
System.out.println("create new table: " + tablename);
// 建立
hbase_admin.createTable(htd);
}
}
public void drop() {
// HBaseConfiguration conf = new HBaseConfiguration(); // deprecated ,
// v0.90
try {
hbase_admin = new HBaseAdmin(hbase_config);
if (hbase_admin.tableExists(tablename)) {
hbase_admin.disableTable(tablename);
hbase_admin.deleteTable(tablename);
System.out.println("Droped the table [" + tablename + "]");
} else {
System.out.println("Table [" + tablename + "] was not found!");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static class HtMap extends
Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 輸入的字串先轉換成小寫再用空白區隔
String s[] = value.toString().toLowerCase().trim().split(" ");
for (String m : s) {
// 寫入到輸出串流
context.write(new Text(m), one);
}
}
}
// TableReducer<KEYIN,VALUEIN,KEYOUT>
// 原本為 TableReducer<Text, IntWritable, NullWritable >
// 但在此改成 LongWritable 也可以
// 因此證明在此的Class可以很多,org.apache.hadoop.io.* 內有write()的Writable class應該皆可
public static class HtReduce extends
TableReducer<Text, IntWritable, LongWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : values) {
sum += i.get();
}
// org.apache.hadoop.hbase.client.Put
// Used to perform Put operations for a single row.
// new Put(byte[] row)
Put put = new Put(Bytes.toBytes(key.toString()));
// add(byte[] family, byte[] qualifier, byte[] value)
// 在main設定output format class 為 TableOutputFormat
// TableReducer 內有定義 output Key class 必須為 Put 或 Delete
put.add(Bytes.toBytes("content"), Bytes.toBytes("word"),
Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes("content"), Bytes.toBytes("count"),
Bytes.toBytes(String.valueOf(sum)));
// NullWritable.get(): Returns the single instance of this class.
// NullWritable.write(): Serialize the fields of this object to out.
context.write(new LongWritable(), put);
// context.write(NullWritable.get(), put)
}
}
public boolean run() throws IOException, InterruptedException, ClassNotFoundException{
Job job = new Job(hadoop_config, "WordCount table with " + input);
job.setJarByClass(WordCountHBase.class);
job.setMapperClass(HtMap.class);
job.setReducerClass(HtReduce.class);
// 此範例的輸出為 <Text,IntWritable> 因此其實可以省略宣告
// set{Map|Reduce}Output{Key|Value}Class()
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// InputFormat 只有一個子介面
// FileInputFormat <-(SequenceFileInputFormat,TextInputFormat)
// 其中TextInputFormat 最常用 ,預設輸入為 LongWritable,Text
// 另外HBase 則設計了一個子類別 TableInputFormat
job.setInputFormatClass(TextInputFormat.class);
// TAbleOutputFormat
// 宣告此行則可使 reduce 輸出為 HBase 的table
job.setOutputFormatClass(TableOutputFormat.class);
// 原本設定輸入檔案為 Config.setInputPath(Path) 卻改為
// FileInputFormat.addInputPath(Job, Path()) 的設計,
// 猜測應該是考慮某些檔案操作並不需要跑mapreduce的Job,因此提到外面
FileInputFormat.addInputPath(job, new Path(input));
return job.waitForCompletion(true) ? true : false;
}
public static void main(String argv[]) throws Exception {
// debug
String[] argc = { "/tmp/ProductEmp/input/" };
argv = argc;
WordCountHBase wchb = new WordCountHBase();
if (argv.length < 1) {
System.out.println("CountToHBaseReducer <inHdfsDir> ");
return;
}
wchb.input = argv[0];
wchb.tablename = "wordcounthbase";
// OUTPUT_TABLE = "hbase.mapred.outputtable"
// conf.set 用於設定 如 core-site.xml 的 name 與 value
// 告訴程式 hbase.mapred.outputtable --> wordcount
wchb.hadoop_config.set(TableOutputFormat.OUTPUT_TABLE, wchb.tablename);
// 建立hbase 的table 否則沒先建立會出錯
wchb.drop();
wchb.createHBaseTable( "content");
wchb.run();
}
}