Changes between Initial Version and Version 1 of LogParser


Ignore:
Timestamp:
Jul 4, 2008, 4:33:50 PM (16 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • LogParser

    v1 v1  
     1 = 目的 =
     2 This program will parse your apache log and store it into Hbase.
     3 
     4
     5 = 如何使用 =
     6 1. Upload apache logs ( /var/log/apache2/access.log* ) to hdfs (default: /user/waue/apache-log) \
     7
     8{{{ $ bin/hadoop dfs -put /var/log/apache2/ apache-log
     9 }}}
     10 2. parameter "dir" in main contains the logs.
     11
     12 3. you should filter the exception contents manually,
     13{{{ ex:  ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...
     14  }}}
     15}}}
     16= 結果 =
     17 1. 執行以下指令
     18{{{
     19        hql > select * from apache-log;
     20
     21}}}
     22 2. 結果
     23
     24{{{
     25+-------------------------+-------------------------+-------------------------+
     26
     27| Row                     | Column                  | Cell                    |
     28
     29+-------------------------+-------------------------+-------------------------+
     30
     31| 118.170.101.250         | http:agent              | Mozilla/4.0 (compatible;|
     32
     33|                         |                         |  MSIE 4.01; Windows 95) |
     34
     35+-------------------------+-------------------------+-------------------------+
     36
     37| 118.170.101.250         | http:bytesize           | 318                     |
     38
     39+-------------------------+-------------------------+-------------------------+
     40
     41..........(skip)........
     42
     43+-------------------------+-------------------------+-------------------------+
     44
     45| 87.65.93.58             | http:method             | OPTIONS                 |
     46
     47+-------------------------+-------------------------+-------------------------+
     48
     49| 87.65.93.58             | http:protocol           | HTTP/1.1                |
     50
     51+-------------------------+-------------------------+-------------------------+
     52
     53| 87.65.93.58             | referrer:-              | *                       |
     54
     55+-------------------------+-------------------------+-------------------------+
     56
     57| 87.65.93.58             | url:*                   | -                       |
     58
     59+-------------------------+-------------------------+-------------------------+
     60
     6131 row(s) in set. (0.58 sec)
     62
     63
     64}}}
     65
     66 = LogParser.java =
     67
     68{{{
     69package tw.org.nchc.code;
     70
     71
     72import java.text.ParseException;
     73
     74import java.text.SimpleDateFormat;
     75
     76import java.util.Locale;
     77
     78import java.util.StringTokenizer;
     79
     80import java.util.regex.Matcher;
     81
     82import java.util.regex.Pattern;
     83
     84
     85
     86
     87public class LogParser {
     88
     89  private String ip;
     90
     91  private String protocol;
     92
     93  private String method;
     94
     95  private String url;
     96
     97  private String code;
     98
     99  private String byteSize;
     100
     101  private String referrer;
     102
     103  private String agent;
     104
     105  private long timestamp;
     106
     107
     108  private static Pattern p = Pattern
     109
     110  .compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" +
     111
     112                  " ([^ ]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\".*");
     113
     114  public LogParser(String line) throws ParseException, Exception{
     115
     116         Matcher matcher = p.matcher(line);
     117
     118         if(matcher.matches()){
     119
     120                 this.ip = matcher.group(1);
     121
     122                 // IP address of the client requesting the web page.
     123
     124                 if(isIpAddress(ip)){
     125
     126                         SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z",Locale.US);
     127
     128                         this.timestamp = sdf.parse(matcher.group(4)).getTime();
     129
     130                         String[] http = matcher.group(5).split(" ");
     131
     132                         this.method = http[0];
     133
     134                         this.url = http[1];
     135
     136                         this.protocol = http[2];
     137
     138                         this.code = matcher.group(6);
     139
     140                         this.byteSize = matcher.group(7);
     141
     142                         this.referrer = matcher.group(8);
     143
     144                         this.agent = matcher.group(9);
     145
     146                 }
     147
     148         }
     149
     150
     151
     152  }
     153
     154
     155  public static boolean isIpAddress(String inputString) {
     156
     157    StringTokenizer tokenizer = new StringTokenizer(inputString, ".");
     158
     159    if (tokenizer.countTokens() != 4) {
     160
     161      return false;
     162
     163    }
     164
     165    try {
     166
     167      for (int i = 0; i < 4; i++) {
     168
     169        String t = tokenizer.nextToken();
     170
     171        int chunk = Integer.parseInt(t);
     172
     173        if ((chunk & 255) != chunk) {
     174
     175          return false;
     176
     177        }
     178
     179      }
     180
     181    } catch (NumberFormatException e) {
     182
     183      return false;
     184
     185    }
     186
     187    if (inputString.indexOf("..") >= 0) {
     188
     189      return false;
     190
     191    }
     192
     193    return true;
     194
     195  }
     196
     197
     198  public String getIp() {
     199
     200    return ip;
     201
     202  }
     203
     204
     205  public String getProtocol() {
     206
     207    return protocol;
     208
     209  }
     210
     211
     212  public String getMethod() {
     213
     214    return method;
     215
     216  }
     217
     218
     219  public String getUrl() {
     220
     221    return url;
     222
     223  }
     224
     225
     226  public String getCode() {
     227
     228    return code;
     229
     230  }
     231
     232
     233  public String getByteSize() {
     234
     235    return byteSize;
     236
     237  }
     238
     239
     240  public String getReferrer() {
     241
     242    return referrer;
     243
     244  }
     245
     246
     247  public String getAgent() {
     248
     249    return agent;
     250
     251  }
     252
     253
     254  public long getTimestamp() {
     255
     256    return timestamp;
     257
     258  }
     259
     260
     261}
     262}}}
     263
     264 = LogParserGo.java =
     265
     266{{{
     267/**
     268 * Program: LogFetcher.java
     269 * Editor: Waue Chen
     270 * From :  NCHC. Taiwn
     271 * Last Update Date: 07/02/2008
     272 */
     273
     274package tw.org.nchc.code;
     275
     276
     277import java.io.IOException;
     278
     279
     280import org.apache.hadoop.fs.FileStatus;
     281
     282import org.apache.hadoop.fs.FileSystem;
     283
     284import org.apache.hadoop.fs.Path;
     285
     286import org.apache.hadoop.hbase.HBaseAdmin;
     287
     288import org.apache.hadoop.hbase.HBaseConfiguration;
     289
     290import org.apache.hadoop.hbase.HColumnDescriptor;
     291
     292import org.apache.hadoop.hbase.HTable;
     293
     294import org.apache.hadoop.hbase.HTableDescriptor;
     295
     296import org.apache.hadoop.io.Text;
     297
     298import org.apache.hadoop.io.Writable;
     299
     300import org.apache.hadoop.io.WritableComparable;
     301
     302import org.apache.hadoop.mapred.ClusterStatus;
     303
     304import org.apache.hadoop.mapred.JobClient;
     305
     306import org.apache.hadoop.mapred.JobConf;
     307
     308import org.apache.hadoop.mapred.MapReduceBase;
     309
     310import org.apache.hadoop.mapred.Mapper;
     311
     312import org.apache.hadoop.mapred.OutputCollector;
     313
     314import org.apache.hadoop.mapred.Reporter;
     315
     316
     317
     318// import AccessLogParser
     319
     320/**
     321
     322 * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
     323
     324 * W3CExtended, IISw3cExtended)
     325
     326 */
     327
     328public class LogParserGo {
     329
     330        static HBaseConfiguration conf = new HBaseConfiguration();
     331
     332
     333
     334        public static final String TABLE = "table.name";
     335
     336
     337
     338        static String tableName;
     339
     340
     341
     342        static HTable table = null;
     343
     344
     345
     346//      static boolean eclipseRun = false;
     347
     348        static void print(String str){
     349
     350                System.out.println("STR  = "+str);
     351
     352        }
     353
     354        public static class MapClass extends MapReduceBase implements
     355
     356                        Mapper<WritableComparable, Text, Text, Writable> {
     357
     358
     359
     360                @Override
     361
     362                public void configure(JobConf job) {
     363
     364                        tableName = job.get(TABLE, "");
     365
     366                }
     367
     368
     369
     370                public void map(WritableComparable key, Text value,
     371
     372                                OutputCollector<Text, Writable> output, Reporter reporter)
     373
     374                                throws IOException {
     375
     376                       
     377
     378                        try {
     379
     380                                /*
     381
     382                                print(value.toString());
     383
     384                                FileWriter out = new FileWriter(new File(
     385
     386                                "/home/waue/mr-result.txt"));
     387
     388                                out.write(value.toString());
     389
     390                                out.flush();
     391
     392                                out.close();
     393
     394                                */
     395
     396                                LogParser log = new LogParser(value.toString());
     397
     398                               
     399
     400                                if (table == null)
     401
     402                                        table = new HTable(conf, new Text(tableName));
     403
     404                                long lockId = table.startUpdate(new Text(log.getIp()));
     405
     406                                table.put(lockId, new Text("http:protocol"), log.getProtocol()
     407
     408                                                .getBytes());
     409
     410                                table.put(lockId, new Text("http:method"), log.getMethod()
     411
     412                                                .getBytes());
     413
     414                                table.put(lockId, new Text("http:code"), log.getCode()
     415
     416                                                .getBytes());
     417
     418                                table.put(lockId, new Text("http:bytesize"), log.getByteSize()
     419
     420                                                .getBytes());
     421
     422                                table.put(lockId, new Text("http:agent"), log.getAgent()
     423
     424                                                .getBytes());
     425
     426                                table.put(lockId, new Text("url:" + log.getUrl()), log
     427
     428                                                .getReferrer().getBytes());
     429
     430                                table.put(lockId, new Text("referrer:" + log.getReferrer()),
     431
     432                                                log.getUrl().getBytes());
     433
     434                                table.commit(lockId, log.getTimestamp());
     435
     436                               
     437
     438                        } catch (Exception e) {
     439
     440                                e.printStackTrace();
     441
     442                        }
     443
     444                       
     445
     446                }
     447
     448        }
     449
     450
     451
     452        // do it to resolve warning : FileSystem.listPaths
     453
     454        static public Path[] listPaths(FileSystem fsm, Path path)
     455
     456                        throws IOException {
     457
     458                FileStatus[] fss = fsm.listStatus(path);
     459
     460                int length = fss.length;
     461
     462                Path[] pi = new Path[length];
     463
     464                for (int i = 0; i < length; i++) {
     465
     466                        pi[i] = fss[i].getPath();
     467
     468                }
     469
     470                return pi;
     471
     472        }
     473
     474
     475
     476        public static void runMapReduce(String table, String dir)
     477
     478                        throws IOException {
     479
     480                Path tempDir = new Path("/tmp/Mylog/");
     481
     482                Path InputDir = new Path(dir);
     483
     484                FileSystem fs = FileSystem.get(conf);
     485
     486                JobConf jobConf = new JobConf(conf, LogParserGo.class);
     487
     488                jobConf.setJobName("apache log fetcher");
     489
     490                jobConf.set(TABLE, table);
     491
     492                Path[] in = listPaths(fs, InputDir);
     493
     494                if (fs.isFile(InputDir)) {
     495
     496                        jobConf.setInputPath(InputDir);
     497
     498                } else {
     499
     500                        for (int i = 0; i < in.length; i++) {
     501
     502                                if (fs.isFile(in[i])) {
     503
     504                                        jobConf.addInputPath(in[i]);
     505
     506                                } else {
     507
     508                                        Path[] sub = listPaths(fs, in[i]);
     509
     510                                        for (int j = 0; j < sub.length; j++) {
     511
     512                                                if (fs.isFile(sub[j])) {
     513
     514                                                        jobConf.addInputPath(sub[j]);
     515
     516                                                }
     517
     518                                        }
     519
     520                                }
     521
     522                        }
     523
     524                }
     525
     526                jobConf.setOutputPath(tempDir);
     527
     528
     529
     530                jobConf.setMapperClass(MapClass.class);
     531
     532
     533
     534                JobClient client = new JobClient(jobConf);
     535
     536                ClusterStatus cluster = client.getClusterStatus();
     537
     538                jobConf.setNumMapTasks(cluster.getMapTasks());
     539
     540                jobConf.setNumReduceTasks(0);
     541
     542
     543
     544                JobClient.runJob(jobConf);
     545
     546
     547
     548                fs.delete(tempDir);
     549
     550                fs.close();
     551
     552        }
     553
     554
     555
     556        public static void creatTable(String table) throws IOException {
     557
     558                HBaseAdmin admin = new HBaseAdmin(conf);
     559
     560                if (!admin.tableExists(new Text(table))) {
     561
     562                        System.out.println("1. " + table
     563
     564                                        + " table creating ... please wait");
     565
     566                        HTableDescriptor tableDesc = new HTableDescriptor(table);
     567
     568                        tableDesc.addFamily(new HColumnDescriptor("http:"));
     569
     570                        tableDesc.addFamily(new HColumnDescriptor("url:"));
     571
     572                        tableDesc.addFamily(new HColumnDescriptor("referrer:"));
     573
     574                        admin.createTable(tableDesc);
     575
     576                } else {
     577
     578                        System.out.println("1. " + table + " table already exists.");
     579
     580                }
     581
     582                System.out.println("2. access_log files fetching using map/reduce");
     583
     584        }
     585
     586
     587
     588        public static void main(String[] args) throws IOException {
     589
     590                String table_name = "apache-log2";
     591
     592                String dir = "/user/waue/apache-log";
     593
     594
     595                creatTable(table_name);
     596
     597                runMapReduce(table_name, dir);
     598
     599
     600
     601        }
     602
     603
     604
     605}
     606
     607}}}