Changes between Version 15 and Version 16 of LogParser


Ignore:
Timestamp:
Jul 7, 2008, 5:51:16 PM (16 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • LogParser

    v15 v16  
    3636 = !LogParserGo.java =
    3737
     38{{{
     39public class LogParserGo {
     40        static HBaseConfiguration conf = new HBaseConfiguration();
     41        public static final String TABLE = "table.name";
     42        static String tableName;
     43        static HTable table = null;
     44        public static class MapClass;
     45        static public Path[] listPaths(FileSystem fsm, Path path);
     46        public static void runMapReduce(String table, String dir);
     47        public static void creatTable(String table) ;
     48        public static void main(String[] args) ;
     49
     50}}}
     51LogParserGo共宣告了以下幾個全域變數及方法:
     52 1 HBaseConfiguration conf為重要的控制設定參數,其定義了很多方法可以設定或取得map reduce程式運作所需要的值
     53 2 定義 TABLE 為 "table.name",table.name為 name property
     54 3 string tableName 為資料表名稱
     55 4 Htable table 在定義一個HBase的操作變數
     56 5 class MapClass 為實做map的一個內部類別
     57 6 Path[] listPaths 是個可以列出指定路徑下的檔案和目錄,原本0.16 api即宣告 Deprecated,因此為了解決warning在此實做
     58 7 void runMapReduce(String table, String dir) 跑MapReduce的程序
     59 8 void creatTable(String table)  建立hbase的資料表
     60 9 void main(String[] args)  main 函數
     61
     621~4為變數較為單純,之後將說明5~9的函數功能
     63------------------------------------
    3864{{{
    3965        public static class MapClass extends MapReduceBase implements
     
    7197        }
    7298}}}
     99此內部類別繼承了 [http://hadoop.apache.org/core/docs/r0.16.4/api/org/apache/hadoop/mapred/MapReduceBase.html org.apache.hadoop.mapred.MapReduceBase] ,並實做Mapper<WritableComparable, Text, Text, Writable> 介面,
     100不見得所有map reduce程式都需要實做此介面,但若有要讓map能分配工作就需要寫在下面此函數中:[[BR]]
     101map(WritableComparable key, Text value, OutputCollector<Text, Writable> output, Reporter reporter) [[BR]]
     102變數key為hbase中的row key,value則為值,output 可以透過collect() 功能將值寫入hbase的table中。但在此範例中,
     103並沒有用到 output的寫入方式,reporter也沒有用到。[[br]]
     104此方法因為有IO的存取,因此要宣告trows IOException, 且用try來起始。[[br]]
     105首先LogParser log = new LogParser(value.toString()); value的值為要parser的內容的某一行,因為基於hdfs的map-reduce架構上,hadoop會幫我們把資料整合起來,因此程式的邏輯只要處理好這一行即可。LogParser 在下面會介紹到,目前只要知道log物件是原始資料value透過 LogParser 處理過的產物。透過log物件的方法getIP,getProtocol(),...等,我們可以輕易取得需要的資料,用table.put( Row_Key , Column_Qualify_Name , Value) 方法將Value值填入Row_Key中的Column_Qualify_Name欄位中。接著研究table物件。[[br]]
     106table是全域變數之一,由 [http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/HTable.html org.apache.hadoop.hbase.HTable] 類別定義。產生出HTable物件'''必定要'''給兩個初始化的值,一個是另一個全域變數也是重要的設定檔conf,另一個是tableName也就是資料表的名稱
     107
     108configure(jobConf conf) 此為override org.apache.hadoop.mapred.MapReduceBase.configure(JobConf )
     109內容只是取得並回傳Table的名字而已
     110
     111------------------------------
     112{{{
     113        static public Path[] listPaths(FileSystem fsm, Path path)
     114                        throws IOException {
     115                FileStatus[] fss = fsm.listStatus(path);
     116                int length = fss.length;
     117                Path[] pi = new Path[length];
     118                for (int i = 0; i < length; i++) {
     119                        pi[i] = fss[i].getPath();
     120                }
     121                return pi;
     122        }
     123}}}
     124
     125{{{
     126        public static void runMapReduce(String table, String dir)
     127                        throws IOException {
     128                Path tempDir = new Path("/tmp/Mylog/");
     129                Path InputDir = new Path(dir);
     130                FileSystem fs = FileSystem.get(conf);
     131                JobConf jobConf = new JobConf(conf, LogParserGo.class);
     132                jobConf.setJobName("apache log fetcher");
     133                jobConf.set(TABLE, table);
     134                Path[] in = listPaths(fs, InputDir);
     135                if (fs.isFile(InputDir)) {
     136                        jobConf.setInputPath(InputDir);
     137                } else {
     138                        for (int i = 0; i < in.length; i++) {
     139                                if (fs.isFile(in[i])) {
     140                                        jobConf.addInputPath(in[i]);
     141                                } else {
     142                                        Path[] sub = listPaths(fs, in[i]);
     143                                        for (int j = 0; j < sub.length; j++) {
     144                                                if (fs.isFile(sub[j])) {
     145                                                        jobConf.addInputPath(sub[j]);
     146                                                }
     147                                        }
     148                                }
     149                        }
     150                }
     151                jobConf.setOutputPath(tempDir);
     152                jobConf.setMapperClass(MapClass.class);
     153                JobClient client = new JobClient(jobConf);
     154                ClusterStatus cluster = client.getClusterStatus();
     155                jobConf.setNumMapTasks(cluster.getMapTasks());
     156                jobConf.setNumReduceTasks(0);
     157                JobClient.runJob(jobConf);
     158                fs.delete(tempDir);
     159                fs.close();
     160        }
     161}}}
     162
     163{{{
     164        public static void creatTable(String table) throws IOException {
     165                HBaseAdmin admin = new HBaseAdmin(conf);
     166                if (!admin.tableExists(new Text(table))) {
     167                        System.out.println("1. " + table
     168                                        + " table creating ... please wait");
     169                        HTableDescriptor tableDesc = new HTableDescriptor(table);
     170                        tableDesc.addFamily(new HColumnDescriptor("http:"));
     171                        tableDesc.addFamily(new HColumnDescriptor("url:"));
     172                        tableDesc.addFamily(new HColumnDescriptor("referrer:"));
     173                        admin.createTable(tableDesc);
     174                } else {
     175                        System.out.println("1. " + table + " table already exists.");
     176                }
     177                System.out.println("2. access_log files fetching using map/reduce");
     178        }
     179}}}
     180{{{
     181        public static void main(String[] args) throws IOException {
     182                String table_name = "apache-log2";
     183                String dir = "/user/waue/apache-log";
     184                creatTable(table_name);
     185                runMapReduce(table_name, dir);
     186        }
     187}
     188}}}
     189
    73190
    74191 = LogParser.java =