| 38 | {{{ |
| 39 | public class LogParserGo { |
| 40 | static HBaseConfiguration conf = new HBaseConfiguration(); |
| 41 | public static final String TABLE = "table.name"; |
| 42 | static String tableName; |
| 43 | static HTable table = null; |
| 44 | public static class MapClass; |
| 45 | static public Path[] listPaths(FileSystem fsm, Path path); |
| 46 | public static void runMapReduce(String table, String dir); |
| 47 | public static void creatTable(String table) ; |
| 48 | public static void main(String[] args) ; |
| 49 | |
| 50 | }}} |
| 51 | LogParserGo共宣告了以下幾個全域變數及方法: |
| 52 | 1 HBaseConfiguration conf為重要的控制設定參數,其定義了很多方法可以設定或取得map reduce程式運作所需要的值 |
| 53 | 2 定義 TABLE 為 "table.name",table.name為 name property |
| 54 | 3 string tableName 為資料表名稱 |
| 55 | 4 Htable table 在定義一個HBase的操作變數 |
| 56 | 5 class MapClass 為實做map的一個內部類別 |
| 57 | 6 Path[] listPaths 是個可以列出指定路徑下的檔案和目錄,原本0.16 api即宣告 Deprecated,因此為了解決warning在此實做 |
| 58 | 7 void runMapReduce(String table, String dir) 跑MapReduce的程序 |
| 59 | 8 void creatTable(String table) 建立hbase的資料表 |
| 60 | 9 void main(String[] args) main 函數 |
| 61 | |
| 62 | 1~4為變數較為單純,之後將說明5~9的函數功能 |
| 63 | ------------------------------------ |
| 99 | 此內部類別繼承了 [http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/MapReduceBase.html org.apache.hadoop.mapred.MapReduceBase] ,並實做Mapper<WritableComparable, Text, Text, Writable> 介面, |
| 100 | 不見得所有map reduce程式都需要實做此介面,但若有要讓map能分配工作就需要寫在下面此函數中:[[BR]] |
| 101 | map(WritableComparable key, Text value, OutputCollector<Text, Writable> output, Reporter reporter) [[BR]] |
| 102 | 變數key為hbase中的row key,value則為值,output 可以透過collect() 功能將值寫入hbase的table中。但在此範例中, |
| 103 | 並沒有用到 output的寫入方式,reporter也沒有用到。[[br]] |
| 104 | 此方法因為有IO的存取,因此要宣告trows IOException, 且用try來起始。[[br]] |
| 105 | 首先LogParser log = new LogParser(value.toString()); value的值為要parser的內容的某一行,因為基於hdfs的map-reduce架構上,hadoop會幫我們把資料整合起來,因此程式的邏輯只要處理好這一行即可。LogParser 在下面會介紹到,目前只要知道log物件是原始資料value透過 LogParser 處理過的產物。透過log物件的方法getIP,getProtocol(),...等,我們可以輕易取得需要的資料,用table.put( Row_Key , Column_Qualify_Name , Value) 方法將Value值填入Row_Key中的Column_Qualify_Name欄位中。接著研究table物件。[[br]] |
| 106 | table是全域變數之一,由 [http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html org.apache.hadoop.hbase.HTable] 類別定義。產生出HTable物件'''必定要'''給兩個初始化的值,一個是另一個全域變數也是重要的設定檔conf,另一個是tableName也就是資料表的名稱 |
| 107 | |
| 108 | configure(jobConf conf) 此為override org.apache.hadoop.mapred.MapReduceBase.configure(JobConf ) |
| 109 | 內容只是取得並回傳Table的名字而已 |
| 110 | |
| 111 | ------------------------------ |
| 112 | {{{ |
| 113 | static public Path[] listPaths(FileSystem fsm, Path path) |
| 114 | throws IOException { |
| 115 | FileStatus[] fss = fsm.listStatus(path); |
| 116 | int length = fss.length; |
| 117 | Path[] pi = new Path[length]; |
| 118 | for (int i = 0; i < length; i++) { |
| 119 | pi[i] = fss[i].getPath(); |
| 120 | } |
| 121 | return pi; |
| 122 | } |
| 123 | }}} |
| 124 | |
| 125 | {{{ |
| 126 | public static void runMapReduce(String table, String dir) |
| 127 | throws IOException { |
| 128 | Path tempDir = new Path("/tmp/Mylog/"); |
| 129 | Path InputDir = new Path(dir); |
| 130 | FileSystem fs = FileSystem.get(conf); |
| 131 | JobConf jobConf = new JobConf(conf, LogParserGo.class); |
| 132 | jobConf.setJobName("apache log fetcher"); |
| 133 | jobConf.set(TABLE, table); |
| 134 | Path[] in = listPaths(fs, InputDir); |
| 135 | if (fs.isFile(InputDir)) { |
| 136 | jobConf.setInputPath(InputDir); |
| 137 | } else { |
| 138 | for (int i = 0; i < in.length; i++) { |
| 139 | if (fs.isFile(in[i])) { |
| 140 | jobConf.addInputPath(in[i]); |
| 141 | } else { |
| 142 | Path[] sub = listPaths(fs, in[i]); |
| 143 | for (int j = 0; j < sub.length; j++) { |
| 144 | if (fs.isFile(sub[j])) { |
| 145 | jobConf.addInputPath(sub[j]); |
| 146 | } |
| 147 | } |
| 148 | } |
| 149 | } |
| 150 | } |
| 151 | jobConf.setOutputPath(tempDir); |
| 152 | jobConf.setMapperClass(MapClass.class); |
| 153 | JobClient client = new JobClient(jobConf); |
| 154 | ClusterStatus cluster = client.getClusterStatus(); |
| 155 | jobConf.setNumMapTasks(cluster.getMapTasks()); |
| 156 | jobConf.setNumReduceTasks(0); |
| 157 | JobClient.runJob(jobConf); |
| 158 | fs.delete(tempDir); |
| 159 | fs.close(); |
| 160 | } |
| 161 | }}} |
| 162 | |
| 163 | {{{ |
| 164 | public static void creatTable(String table) throws IOException { |
| 165 | HBaseAdmin admin = new HBaseAdmin(conf); |
| 166 | if (!admin.tableExists(new Text(table))) { |
| 167 | System.out.println("1. " + table |
| 168 | + " table creating ... please wait"); |
| 169 | HTableDescriptor tableDesc = new HTableDescriptor(table); |
| 170 | tableDesc.addFamily(new HColumnDescriptor("http:")); |
| 171 | tableDesc.addFamily(new HColumnDescriptor("url:")); |
| 172 | tableDesc.addFamily(new HColumnDescriptor("referrer:")); |
| 173 | admin.createTable(tableDesc); |
| 174 | } else { |
| 175 | System.out.println("1. " + table + " table already exists."); |
| 176 | } |
| 177 | System.out.println("2. access_log files fetching using map/reduce"); |
| 178 | } |
| 179 | }}} |
| 180 | {{{ |
| 181 | public static void main(String[] args) throws IOException { |
| 182 | String table_name = "apache-log2"; |
| 183 | String dir = "/user/waue/apache-log"; |
| 184 | creatTable(table_name); |
| 185 | runMapReduce(table_name, dir); |
| 186 | } |
| 187 | } |
| 188 | }}} |
| 189 | |