/** * Program: WordCountIntoHBase.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 07/02/2008 * Upgrade to 0.17 */ /** * Purpose : * Store every line from $Input_Path to HBase * * HowToUse : * Make sure Hadoop file system and HBase are running correctly. * Use Hadoop instruction to add input-text-files to $Input_Path. * ($ bin/hadoop dfs -put local_dir hdfs_dir) * Then run the program with BuildHTable.java after \ * modifying these setup parameters. * * Check Result : * View the result by hbase instruction (hql> select * from $Table_Name). * Or run WordCountFromHBase.java then inspect http://localhost:60070 by web explorer; */ package tw.org.nchc.code; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapred.TableReduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; public class WordCountIntoHBase { /* setup parameters */ // $Input_Path. Please make sure the path is correct and contains input // files static final String Input_Path = "/user/waue/simple"; // Hbase table name, the program will create it static final String Table_Name = "word_count5"; // column name, the program will create it static final String colstr = "word:text"; // constructor private WordCountIntoHBase() { } private static class ReduceClass extends TableReduce { // set (column_family:column_qualify) private static final Text col = new Text(WordCountIntoHBase.colstr); // this map holds the columns per row private MapWritable map = new MapWritable(); public void reduce(LongWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { // contents must be ImmutableBytesWritable ImmutableBytesWritable bytes = new ImmutableBytesWritable(values .next().getBytes()); map.clear(); // write data map.put(col, bytes); // add the row with the key as the row id output.collect(new Text(key.toString()), map); } } /** * Runs the demo. */ public static void main(String[] args) throws IOException { // parse colstr to split column family and column qualify String tmp[] = colstr.split(":"); String Column_Family = tmp[0] + ":"; String CF[] = { Column_Family }; // check whether create table or not , we don't admit \ // the same name but different structure BuildHTable build_table = new BuildHTable(Table_Name, CF); if (!build_table.checkTableExist(Table_Name)) { if (!build_table.createTable()) { System.out.println("create table error !"); } } else { System.out.println("Table \"" + Table_Name + "\" has already existed !"); } int mapTasks = 1; int reduceTasks = 1; JobConf conf = new JobConf(WordCountIntoHBase.class); conf.setJobName(Table_Name); // must initialize the TableReduce before running job TableReduce.initJob(Table_Name, ReduceClass.class, conf); conf.setNumMapTasks(mapTasks); conf.setNumReduceTasks(reduceTasks); // 0.16 // conf.setInputPath(new Path(Input_Path)); Convert.setInputPath(conf, new Path(Input_Path)); conf.setMapperClass(IdentityMapper.class); conf.setCombinerClass(IdentityReducer.class); conf.setReducerClass(ReduceClass.class); JobClient.runJob(conf); } }