1 | /** |
---|
2 | * Program: WordCountIntoHBase.java |
---|
3 | * Editor: Waue Chen |
---|
4 | * From : NCHC. Taiwn |
---|
5 | * Last Update Date: 07/02/2008 |
---|
6 | * Upgrade to 0.17 |
---|
7 | */ |
---|
8 | |
---|
9 | /** |
---|
10 | * Purpose : |
---|
11 | * Store every line from $Input_Path to HBase |
---|
12 | * |
---|
13 | * HowToUse : |
---|
14 | * Make sure Hadoop file system and HBase are running correctly. |
---|
15 | * Use Hadoop instruction to add input-text-files to $Input_Path. |
---|
16 | * ($ bin/hadoop dfs -put local_dir hdfs_dir) |
---|
17 | * Then run the program with BuildHTable.java after \ |
---|
18 | * modifying these setup parameters. |
---|
19 | * |
---|
20 | * Check Result : |
---|
21 | * View the result by hbase instruction (hql> select * from $Table_Name). |
---|
22 | * Or run WordCountFromHBase.java then inspect http://localhost:60070 by web explorer; |
---|
23 | */ |
---|
24 | |
---|
25 | package tw.org.nchc.code; |
---|
26 | |
---|
27 | import java.io.IOException; |
---|
28 | import java.util.Iterator; |
---|
29 | |
---|
30 | import org.apache.hadoop.fs.Path; |
---|
31 | import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
---|
32 | import org.apache.hadoop.hbase.mapred.TableReduce; |
---|
33 | import org.apache.hadoop.io.LongWritable; |
---|
34 | import org.apache.hadoop.io.MapWritable; |
---|
35 | import org.apache.hadoop.io.Text; |
---|
36 | import org.apache.hadoop.mapred.JobClient; |
---|
37 | import org.apache.hadoop.mapred.JobConf; |
---|
38 | import org.apache.hadoop.mapred.OutputCollector; |
---|
39 | import org.apache.hadoop.mapred.Reporter; |
---|
40 | import org.apache.hadoop.mapred.lib.IdentityMapper; |
---|
41 | import org.apache.hadoop.mapred.lib.IdentityReducer; |
---|
42 | |
---|
43 | public class WordCountIntoHBase { |
---|
44 | |
---|
45 | /* setup parameters */ |
---|
46 | // $Input_Path. Please make sure the path is correct and contains input |
---|
47 | // files |
---|
48 | static final String Input_Path = "/user/waue/simple"; |
---|
49 | |
---|
50 | // Hbase table name, the program will create it |
---|
51 | static final String Table_Name = "word_count5"; |
---|
52 | |
---|
53 | // column name, the program will create it |
---|
54 | static final String colstr = "word:text"; |
---|
55 | |
---|
56 | // constructor |
---|
57 | private WordCountIntoHBase() { |
---|
58 | } |
---|
59 | |
---|
60 | private static class ReduceClass extends TableReduce<LongWritable, Text> { |
---|
61 | // set (column_family:column_qualify) |
---|
62 | private static final Text col = new Text(WordCountIntoHBase.colstr); |
---|
63 | |
---|
64 | // this map holds the columns per row |
---|
65 | private MapWritable map = new MapWritable(); |
---|
66 | |
---|
67 | public void reduce(LongWritable key, Iterator<Text> values, |
---|
68 | OutputCollector<Text, MapWritable> output, Reporter reporter) |
---|
69 | throws IOException { |
---|
70 | // contents must be ImmutableBytesWritable |
---|
71 | ImmutableBytesWritable bytes = new ImmutableBytesWritable(values |
---|
72 | .next().getBytes()); |
---|
73 | map.clear(); |
---|
74 | // write data |
---|
75 | map.put(col, bytes); |
---|
76 | // add the row with the key as the row id |
---|
77 | output.collect(new Text(key.toString()), map); |
---|
78 | } |
---|
79 | } |
---|
80 | |
---|
81 | /** |
---|
82 | * Runs the demo. |
---|
83 | */ |
---|
84 | public static void main(String[] args) throws IOException { |
---|
85 | // parse colstr to split column family and column qualify |
---|
86 | String tmp[] = colstr.split(":"); |
---|
87 | String Column_Family = tmp[0] + ":"; |
---|
88 | String CF[] = { Column_Family }; |
---|
89 | // check whether create table or not , we don't admit \ |
---|
90 | // the same name but different structure |
---|
91 | BuildHTable build_table = new BuildHTable(Table_Name, CF); |
---|
92 | if (!build_table.checkTableExist(Table_Name)) { |
---|
93 | if (!build_table.createTable()) { |
---|
94 | System.out.println("create table error !"); |
---|
95 | } |
---|
96 | } else { |
---|
97 | System.out.println("Table \"" + Table_Name |
---|
98 | + "\" has already existed !"); |
---|
99 | } |
---|
100 | int mapTasks = 1; |
---|
101 | int reduceTasks = 1; |
---|
102 | JobConf conf = new JobConf(WordCountIntoHBase.class); |
---|
103 | conf.setJobName(Table_Name); |
---|
104 | |
---|
105 | // must initialize the TableReduce before running job |
---|
106 | TableReduce.initJob(Table_Name, ReduceClass.class, conf); |
---|
107 | conf.setNumMapTasks(mapTasks); |
---|
108 | conf.setNumReduceTasks(reduceTasks); |
---|
109 | // 0.16 |
---|
110 | // conf.setInputPath(new Path(Input_Path)); |
---|
111 | Convert.setInputPath(conf, new Path(Input_Path)); |
---|
112 | conf.setMapperClass(IdentityMapper.class); |
---|
113 | conf.setCombinerClass(IdentityReducer.class); |
---|
114 | conf.setReducerClass(ReduceClass.class); |
---|
115 | |
---|
116 | JobClient.runJob(conf); |
---|
117 | } |
---|
118 | } |
---|