| 1 | /** | 
|---|
| 2 |  * Program: WordCountIntoHBase.java | 
|---|
| 3 |  * Editor: Waue Chen  | 
|---|
| 4 |  * From :  NCHC. Taiwn | 
|---|
| 5 |  * Last Update Date: 07/02/2008 | 
|---|
| 6 |  */ | 
|---|
| 7 |  | 
|---|
| 8 | /** | 
|---|
| 9 |  * Purpose :  | 
|---|
| 10 |  *  Store every line from $Input_Path to HBase | 
|---|
| 11 |  *  | 
|---|
| 12 |  * HowToUse :  | 
|---|
| 13 |  *  Make sure Hadoop file system and HBase are running correctly. | 
|---|
| 14 |  *  Use Hadoop instruction to add input-text-files to $Input_Path. | 
|---|
| 15 |  *  ($ bin/hadoop dfs -put local_dir hdfs_dir) | 
|---|
| 16 |  *  Then run the program with BuildHTable.java after \ | 
|---|
| 17 |  *  modifying these setup parameters. | 
|---|
| 18 |  *  | 
|---|
| 19 |  * Check Result :  | 
|---|
| 20 |  *  View the result by hbase instruction (hql> select * from $Table_Name).  | 
|---|
| 21 |  *  Or run WordCountFromHBase.java then inspect http://localhost:60070 by web explorer; | 
|---|
| 22 |  */ | 
|---|
| 23 |  | 
|---|
| 24 | package tw.org.nchc.code; | 
|---|
| 25 |  | 
|---|
| 26 | import java.io.IOException; | 
|---|
| 27 | import java.util.Iterator; | 
|---|
| 28 |  | 
|---|
| 29 | import org.apache.hadoop.fs.Path; | 
|---|
| 30 | import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | 
|---|
| 31 | import org.apache.hadoop.hbase.mapred.TableReduce; | 
|---|
| 32 | import org.apache.hadoop.io.LongWritable; | 
|---|
| 33 | import org.apache.hadoop.io.MapWritable; | 
|---|
| 34 | import org.apache.hadoop.io.Text; | 
|---|
| 35 | import org.apache.hadoop.mapred.JobClient; | 
|---|
| 36 | import org.apache.hadoop.mapred.JobConf; | 
|---|
| 37 | import org.apache.hadoop.mapred.OutputCollector; | 
|---|
| 38 | import org.apache.hadoop.mapred.Reporter; | 
|---|
| 39 | import org.apache.hadoop.mapred.lib.IdentityMapper; | 
|---|
| 40 | import org.apache.hadoop.mapred.lib.IdentityReducer; | 
|---|
| 41 |  | 
|---|
| 42 | public class WordCountIntoHBase { | 
|---|
| 43 |  | 
|---|
| 44 |   /* setup parameters */ | 
|---|
| 45 |   // $Input_Path. Please make sure the path is correct and contains input | 
|---|
| 46 |   // files | 
|---|
| 47 |   static final String Input_Path = "/user/waue/input"; | 
|---|
| 48 |  | 
|---|
| 49 |   // Hbase table name, the program will create it | 
|---|
| 50 |   static final String Table_Name = "word_count5"; | 
|---|
| 51 |  | 
|---|
| 52 |   // column name, the program will create it | 
|---|
| 53 |   static final String colstr = "word:text"; | 
|---|
| 54 |  | 
|---|
| 55 |   // constructor | 
|---|
| 56 |   private WordCountIntoHBase() { | 
|---|
| 57 |   } | 
|---|
| 58 |  | 
|---|
| 59 |   private static class ReduceClass extends TableReduce<LongWritable, Text> { | 
|---|
| 60 |     // set (column_family:column_qualify) | 
|---|
| 61 |     private static final Text col = new Text(WordCountIntoHBase.colstr); | 
|---|
| 62 |  | 
|---|
| 63 |     // this map holds the columns per row | 
|---|
| 64 |     private MapWritable map = new MapWritable(); | 
|---|
| 65 |  | 
|---|
| 66 |     public void reduce(LongWritable key, Iterator<Text> values, | 
|---|
| 67 |         OutputCollector<Text, MapWritable> output, Reporter reporter) | 
|---|
| 68 |         throws IOException { | 
|---|
| 69 |       // contents must be ImmutableBytesWritable | 
|---|
| 70 |       ImmutableBytesWritable bytes = new ImmutableBytesWritable(values | 
|---|
| 71 |           .next().getBytes()); | 
|---|
| 72 |       map.clear(); | 
|---|
| 73 |       // write data | 
|---|
| 74 |       map.put(col, bytes); | 
|---|
| 75 |       // add the row with the key as the row id | 
|---|
| 76 |       output.collect(new Text(key.toString()), map); | 
|---|
| 77 |     } | 
|---|
| 78 |   } | 
|---|
| 79 |  | 
|---|
| 80 |   /** | 
|---|
| 81 |    * Runs the demo. | 
|---|
| 82 |    */ | 
|---|
| 83 |   public static void main(String[] args) throws IOException { | 
|---|
| 84 |     // parse colstr to split column family and column qualify | 
|---|
| 85 |     String tmp[] = colstr.split(":"); | 
|---|
| 86 |     String Column_Family = tmp[0] + ":"; | 
|---|
| 87 |     String CF[] = { Column_Family }; | 
|---|
| 88 |     // check whether create table or not , we don't admit \ | 
|---|
| 89 |     // the same name but different structure | 
|---|
| 90 |     BuildHTable build_table = new BuildHTable(Table_Name, CF); | 
|---|
| 91 |     if (!build_table.checkTableExist(Table_Name)) { | 
|---|
| 92 |       if (!build_table.createTable()) { | 
|---|
| 93 |         System.out.println("create table error !"); | 
|---|
| 94 |       } | 
|---|
| 95 |     } else { | 
|---|
| 96 |       System.out.println("Table \"" + Table_Name | 
|---|
| 97 |           + "\" has already existed !"); | 
|---|
| 98 |     } | 
|---|
| 99 |     int mapTasks = 1; | 
|---|
| 100 |     int reduceTasks = 1; | 
|---|
| 101 |     JobConf conf = new JobConf(WordCountIntoHBase.class); | 
|---|
| 102 |     conf.setJobName(Table_Name); | 
|---|
| 103 |  | 
|---|
| 104 |     // must initialize the TableReduce before running job | 
|---|
| 105 |     TableReduce.initJob(Table_Name, ReduceClass.class, conf); | 
|---|
| 106 |     conf.setNumMapTasks(mapTasks); | 
|---|
| 107 |     conf.setNumReduceTasks(reduceTasks); | 
|---|
| 108 |     conf.setInputPath(new Path(Input_Path));     | 
|---|
| 109 |     conf.setMapperClass(IdentityMapper.class); | 
|---|
| 110 |     conf.setCombinerClass(IdentityReducer.class); | 
|---|
| 111 |     conf.setReducerClass(ReduceClass.class); | 
|---|
| 112 |  | 
|---|
| 113 |     JobClient.runJob(conf); | 
|---|
| 114 |   } | 
|---|
| 115 | } | 
|---|