78 | | 宣告了table的名稱,要parser的檔案放在'''hdfs'''當中的哪個路徑下,注意此路徑為hdfs,若給的是local file system的路徑的話,程式跑的時候會產生!NullPointer Exception的錯誤。然後呼叫creatTable函數其功能用來創建table,接著跑runMapReduce函數,而整個程式主體就是在runMapReduce |
79 | | |
80 | | ------------------------------------ |
81 | | {{{ |
82 | | #!java |
83 | | public static class MapClass extends MapReduceBase implements |
84 | | Mapper<WritableComparable, Text, Text, Writable> { |
85 | | public void configure(JobConf job) { |
86 | | tableName = job.get(TABLE, ""); |
87 | | } |
88 | | public void map(WritableComparable key, Text value, |
89 | | OutputCollector<Text, Writable> output, Reporter reporter) |
90 | | throws IOException { |
91 | | try { |
92 | | LogParser log = new LogParser(value.toString()); |
93 | | if (table == null) |
94 | | table = new HTable(conf, new Text(tableName)); |
95 | | long lockId = table.startUpdate(new Text(log.getIp())); |
96 | | table.put(lockId, new Text("http:protocol"), log.getProtocol() |
97 | | .getBytes()); |
98 | | table.put(lockId, new Text("http:method"), log.getMethod() |
99 | | .getBytes()); |
100 | | table.put(lockId, new Text("http:code"), log.getCode() |
101 | | .getBytes()); |
102 | | table.put(lockId, new Text("http:bytesize"), log.getByteSize() |
103 | | .getBytes()); |
104 | | table.put(lockId, new Text("http:agent"), log.getAgent() |
105 | | .getBytes()); |
106 | | table.put(lockId, new Text("url:" + log.getUrl()), log |
107 | | .getReferrer().getBytes()); |
108 | | table.put(lockId, new Text("referrer:" + log.getReferrer()), |
109 | | log.getUrl().getBytes()); |
110 | | table.commit(lockId, log.getTimestamp()); |
111 | | } catch (Exception e) { |
112 | | e.printStackTrace(); |
113 | | } |
114 | | } |
115 | | } |
116 | | }}} |
117 | | 此內部類別繼承了 [http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/MapReduceBase.html org.apache.hadoop.mapred.MapReduceBase] ,並實做Mapper<!WritableComparable, Text, Text, Writable> 介面, |
118 | | 不見得所有map reduce程式都需要實做此介面,但若有要讓map能分配工作就需要寫在下面此函數中:[[BR]] |
119 | | map(!WritableComparable key, Text value,!OutputCollector<Text, Writable> output, Reporter reporter) [[BR]] |
120 | | 變數key為hbase中的row key,value則為值,output 可以透過collect() 功能將值寫入hbase的table中。但在此範例中, |
121 | | 並沒有用到 output的寫入方式,reporter也沒有用到。[[br]] |
122 | | 此方法因為有IO的存取,因此要宣告trows !IOException, 且用try來起始。[[br]][[br]] |
123 | | 首先!LogParser log = new !LogParser(value.toString()); value的值為要parser的內容的某一行,因為基於hdfs的map-reduce架構上,hadoop會幫我們把資料整合起來,因此程式的邏輯只要處理好這一行即可。!LogParser 在下面會介紹到,目前只要知道log物件是原始資料value透過 !LogParser 處理過的產物。透過log物件的方法getIP,getProtocol(),...等,我們可以輕易取得需要的資料,用table.put( Row_Key , Column_Qualify_Name , Value) 方法將Value值填入Row_Key中的Column_Qualify_Name欄位中。接著研究table物件。[[br]] |
124 | | table是全域變數之一,由 [http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html org.apache.hadoop.hbase.HTable] 類別定義。產生出HTable物件'''必定要'''給兩個初始化的值,一個是另一個全域變數也是重要的設定檔conf,另一個是tableName也就是資料表的名稱,當HTable 的 table 物件產生出來之後,我們就可以利用put來放入資料。然而一個新的資料表,要如何給他row_key呢? |
125 | | 因此 table.startUpdate(new Text(log.getIp())) 的功能就是 將 ip設定為table的row_key。有興趣的話可以參考[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html#startUpdate(org.apache.hadoop.io.Text) 官方的startUpdate說明] [[br]][[br]] |
126 | | 用此方法可以回傳一個型態為Long的值,將他宣告成為lockId變數,之後就可以用他當主key,而將值一個一個輸入到對應的欄位中。因此我們可以把結構看成是這樣 |
127 | | || key\欄位 || http:protocol || http:method || http:code || http:bytesize || http:agent || url: || referrer: || http:code || |
128 | | || row_key || value1 || value2 || value3 || value4 || value5 || value6 || value7 || value8 || |
129 | | 需要注意的是,由於Htable 是三維度的結構,row_key、column、timestamp。因此最後commit時間變數則是利用以下方法:[[br]] |
130 | | table.commit(lockId, log.getTimestamp()),將log內取得的時間update到整個row去。[[br]][[br]] |
131 | | |
132 | | configure(jobConf conf) 此為override org.apache.hadoop.mapred.MapReduceBase.configure(JobConf ) |
133 | | 內容只是取得並回傳Table的名字而已 |
134 | | |
135 | | ------------------------------ |
136 | | {{{ |
137 | | #!java |
138 | | static public Path[] listPaths(FileSystem fsm, Path path) |
139 | | throws IOException { |
140 | | FileStatus[] fss = fsm.listStatus(path); |
141 | | int length = fss.length; |
142 | | Path[] pi = new Path[length]; |
143 | | for (int i = 0; i < length; i++) { |
144 | | pi[i] = fss[i].getPath(); |
145 | | } |
146 | | return pi; |
147 | | } |
148 | | }}} |
149 | | |
| 78 | 宣告了table的名稱,要parser的檔案放在'''hdfs'''當中的哪個路徑下,注意此路徑為hdfs,若給的是local file system的路徑的話,程式跑的時候會產生!NullPointer Exception的錯誤。然後呼叫creatTable函數其功能用來創建table,接著跑runMapReduce函數,而整個程式主體就是在runMapReduce. |
| 79 | ------- |
| 80 | {{{ |
| 81 | #!java |
| 82 | public static void creatTable(String table) throws IOException { |
| 83 | HBaseAdmin admin = new HBaseAdmin(conf); |
| 84 | if (!admin.tableExists(new Text(table))) { |
| 85 | System.out.println("1. " + table |
| 86 | + " table creating ... please wait"); |
| 87 | HTableDescriptor tableDesc = new HTableDescriptor(table); |
| 88 | tableDesc.addFamily(new HColumnDescriptor("http:")); |
| 89 | tableDesc.addFamily(new HColumnDescriptor("url:")); |
| 90 | tableDesc.addFamily(new HColumnDescriptor("referrer:")); |
| 91 | admin.createTable(tableDesc); |
| 92 | } else { |
| 93 | System.out.println("1. " + table + " table already exists."); |
| 94 | } |
| 95 | System.out.println("2. access_log files fetching using map/reduce"); |
| 96 | } |
| 97 | }}} |
| 98 | |
| 99 | 此段為建立Hbase 的table,首先建立[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HBaseAdmin.html HBaseAdmin]的 admin物件出來,在用admin的tableExists函數來檢查此資料表是否已經存在,若沒存在的話,則用HTableDescriptor類別建立 tableDesc物件,此物件的參數是table name,可以了解透過addFamily()建立http、url、referrer等column family,最後HBaseAdmin類別中有createTable的函數,將之前的tableDesc物件當參數餵給admin物件的createTable函數則表單建立完成。 |
| 100 | --------- |
188 | | |
189 | | {{{ |
190 | | #!java |
191 | | public static void creatTable(String table) throws IOException { |
192 | | HBaseAdmin admin = new HBaseAdmin(conf); |
193 | | if (!admin.tableExists(new Text(table))) { |
194 | | System.out.println("1. " + table |
195 | | + " table creating ... please wait"); |
196 | | HTableDescriptor tableDesc = new HTableDescriptor(table); |
197 | | tableDesc.addFamily(new HColumnDescriptor("http:")); |
198 | | tableDesc.addFamily(new HColumnDescriptor("url:")); |
199 | | tableDesc.addFamily(new HColumnDescriptor("referrer:")); |
200 | | admin.createTable(tableDesc); |
201 | | } else { |
202 | | System.out.println("1. " + table + " table already exists."); |
203 | | } |
204 | | System.out.println("2. access_log files fetching using map/reduce"); |
205 | | } |
206 | | }}} |
207 | | |
| 139 | 這段function code中,[http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/fs/Path.html Path] 是宣告檔案路徑的類別,可以把Path想做是標準JAVA IO的File類別,用來定義檔案路徑。[ http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/fs/FileSystem.html FileSystem] 的物件生成方式不是用new的方式,而是用get(JobConf) 的方法,裡面填的變數是之前宣告過的全域變數HBase的設定參數conf。[[br]][[br]] |
| 140 | 後面的參數jobConf 則是重要的map reduce 設定檔,由於其生命週期只在此function,因此並不用移到外面作全域變數。[[br]] |
| 141 | JobConf的建構子有很多種型態,詳細可以看[http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/JobConf.html JobConf官網API] ,一般較基本的範例程式都是用JobConf(Class exampleClass) 即可,但此範例用JobConf(Configuration conf, Class exampleClass) ,是因有個HBaseConfiguration 的conf,HBaseConfiguration為Configuration的子類別,因此conf可以放入JonConf當其設定元素,因此用'''JobConf jobConf = new JobConf(conf, LogParserGo.class);''' 。[[br]][[br]] |
| 142 | 接著看到jobConf.set(TABLE, table); jobConf並沒有set的方法,而是其父類別 org.apache.hadoop.conf.Configuration 有,因此用set將環境參數設定進去,按照格式設定table 的名稱。 |
| 143 | || jobConf.set|| (TABLE || table) || |
| 144 | || || name property || value || |
| 145 | || || table.name || table_name || |
| 146 | |
| 147 | 接下來一整段,都是在設定放在hdfs內要parser的log的檔案路徑。listPaths 這個函數原本在 FileSystem 是有定義的,但hadoop 0.16 API已經predecated,而Hadoop 0.17 API已經徹底拿掉,為了消錯也為了日後升級方便,因此在這類別裡寫了listPaths來用,其功能主要是將 path 裡的檔案或路徑都回傳紀錄到path [] 陣列。因此 isFile() 函數用來判斷是否為檔案,若不是檔案則為目錄,setInputPath 可以設定一個輸入路徑,但若有很多個路徑要加入,則可以用addInputPath[[br]][[br]] |
| 148 | |
| 149 | output目錄只是中間的暫時產物,因此當程式跑到最後就 fs.delete 來刪除之。[[br]][[br]] |
| 150 | |
| 151 | client.getClusterStatus(); 此段程式碼是取得環境中有多少個nodes可以使用,並設到設定參數中。[[br]][[br]] |
| 152 | |
| 153 | 怎麼知道程式跑完map-reduce ,JobClient.runJob() 則是 hadoop 呼叫開始跑map reduce的,如同thread 的run() 相似。 |
| 154 | |
| 155 | [[br]][[br]] |
| 156 | |
| 157 | jobConf.setMapperClass(MapClass.class); 是用來設定 mapClass 如何運作,也就是下一段就會介紹到的內部類別 MapClass.class,而像此程式沒有用到reduce,因此不用設定;而若map 或 reduce 其中有一個沒用到其功能的話,也可以設定基本元件 IdentityReducer、IdentityMapper來取代。 |
| 158 | |
| 159 | |
| 160 | ---- |
| 161 | {{{ |
| 162 | #!java |
| 163 | public static class MapClass extends MapReduceBase implements |
| 164 | Mapper<WritableComparable, Text, Text, Writable> { |
| 165 | public void configure(JobConf job) { |
| 166 | tableName = job.get(TABLE, ""); |
| 167 | } |
| 168 | public void map(WritableComparable key, Text value, |
| 169 | OutputCollector<Text, Writable> output, Reporter reporter) |
| 170 | throws IOException { |
| 171 | try { |
| 172 | LogParser log = new LogParser(value.toString()); |
| 173 | if (table == null) |
| 174 | table = new HTable(conf, new Text(tableName)); |
| 175 | long lockId = table.startUpdate(new Text(log.getIp())); |
| 176 | table.put(lockId, new Text("http:protocol"), log.getProtocol() |
| 177 | .getBytes()); |
| 178 | table.put(lockId, new Text("http:method"), log.getMethod() |
| 179 | .getBytes()); |
| 180 | table.put(lockId, new Text("http:code"), log.getCode() |
| 181 | .getBytes()); |
| 182 | table.put(lockId, new Text("http:bytesize"), log.getByteSize() |
| 183 | .getBytes()); |
| 184 | table.put(lockId, new Text("http:agent"), log.getAgent() |
| 185 | .getBytes()); |
| 186 | table.put(lockId, new Text("url:" + log.getUrl()), log |
| 187 | .getReferrer().getBytes()); |
| 188 | table.put(lockId, new Text("referrer:" + log.getReferrer()), |
| 189 | log.getUrl().getBytes()); |
| 190 | table.commit(lockId, log.getTimestamp()); |
| 191 | } catch (Exception e) { |
| 192 | e.printStackTrace(); |
| 193 | } |
| 194 | } |
| 195 | } |
| 196 | }}} |
| 197 | 此內部類別繼承了 [http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/MapReduceBase.html org.apache.hadoop.mapred.MapReduceBase] ,並實做Mapper<!WritableComparable, Text, Text, Writable> 介面, |
| 198 | 不見得所有map reduce程式都需要實做此介面,但若有要讓map能分配工作就需要寫在下面此函數中:[[BR]] |
| 199 | map(!WritableComparable key, Text value,!OutputCollector<Text, Writable> output, Reporter reporter) [[BR]] |
| 200 | 變數key為hbase中的row key,value則為值,output 可以透過collect() 功能將值寫入hbase的table中。但在此範例中, |
| 201 | 並沒有用到 output的寫入方式,reporter也沒有用到。[[br]] |
| 202 | 此方法因為有IO的存取,因此要宣告trows !IOException, 且用try來起始。[[br]][[br]] |
| 203 | 首先!LogParser log = new !LogParser(value.toString()); value的值為要parser的內容的某一行,因為基於hdfs的map-reduce架構上,hadoop會幫我們把資料整合起來,因此程式的邏輯只要處理好這一行即可。!LogParser 在下面會介紹到,目前只要知道log物件是原始資料value透過 !LogParser 處理過的產物。透過log物件的方法getIP,getProtocol(),...等,我們可以輕易取得需要的資料,用table.put( Row_Key , Column_Qualify_Name , Value) 方法將Value值填入Row_Key中的Column_Qualify_Name欄位中。接著研究table物件。[[br]] |
| 204 | table是全域變數之一,由 [http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html org.apache.hadoop.hbase.HTable] 類別定義。產生出HTable物件'''必定要'''給兩個初始化的值,一個是另一個全域變數也是重要的Hbase設定檔conf,另一個是tableName也就是資料表的名稱,當HTable 的 table 物件產生出來之後,我們就可以利用put來放入資料。然而一個新的資料表,要如何給他row_key呢? |
| 205 | 因此 table.startUpdate(new Text(log.getIp())) 的功能就是 將 ip設定為table的row_key。有興趣的話可以參考[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html#startUpdate(org.apache.hadoop.io.Text) 官方的startUpdate說明] [[br]][[br]] |
| 206 | 用此方法可以回傳一個型態為Long的值,將他宣告成為lockId變數,之後就可以用他當主key,而將值一個一個輸入到對應的欄位中。因此我們可以把結構看成是這樣 |
| 207 | || key\欄位 || http:protocol || http:method || http:code || http:bytesize || http:agent || url: || referrer: || http:code || |
| 208 | || row_key || value1 || value2 || value3 || value4 || value5 || value6 || value7 || value8 || |
| 209 | 需要注意的是,由於Htable 是三維度的結構,row_key、column、timestamp。因此最後commit時間變數則是利用以下方法:[[br]] |
| 210 | table.commit(lockId, log.getTimestamp()),將log內取得的時間update到整個row去。[[br]][[br]] |
| 211 | |
| 212 | configure(!JobConf conf) 此為override org.apache.hadoop.mapred.MapReduceBase.configure(!JobConf ) |
| 213 | 內容只是取得並回傳Table的名字而已 |
| 214 | |
| 215 | ---- |
| 216 | {{{ |
| 217 | #!java |
| 218 | static public Path[] listPaths(FileSystem fsm, Path path) |
| 219 | throws IOException { |
| 220 | FileStatus[] fss = fsm.listStatus(path); |
| 221 | int length = fss.length; |
| 222 | Path[] pi = new Path[length]; |
| 223 | for (int i = 0; i < length; i++) { |
| 224 | pi[i] = fss[i].getPath(); |
| 225 | } |
| 226 | return pi; |
| 227 | } |
| 228 | }}} |
| 229 | 此函數用來列出路徑中的目錄或檔案,是利用listStatus實做原本的listPaths。 |
| 230 | ---- |