Changes between Version 2 and Version 3 of LogParser


Ignore:
Timestamp:
Jul 4, 2008, 4:37:43 PM (16 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • LogParser

    v2 v3  
    6666
    6767 = LogParser.java =
    68 
    6968{{{
    7069package tw.org.nchc.code;
    7170
    72 
    7371import java.text.ParseException;
    7472
     
    197195
    198196
     197
    199198  public String getIp() {
    200199
     
    204203
    205204
     205
    206206  public String getProtocol() {
    207207
     
    211211
    212212
     213
    213214  public String getMethod() {
    214215
     
    218219
    219220
     221
    220222  public String getUrl() {
    221223
     
    225227
    226228
     229
    227230  public String getCode() {
    228231
     
    232235
    233236
     237
    234238  public String getByteSize() {
    235239
     
    239243
    240244
     245
    241246  public String getReferrer() {
    242247
     
    246251
    247252
     253
    248254  public String getAgent() {
    249255
     
    253259
    254260
     261
    255262  public long getTimestamp() {
    256263
     
    259266  }
    260267
    261 
    262268}
    263269}}}
    264 
    265  = LogParserGo.java =
    266 
    267 {{{
    268 /**
    269  * Program: LogFetcher.java
    270  * Editor: Waue Chen
    271  * From :  NCHC. Taiwn
    272  * Last Update Date: 07/02/2008
    273  */
    274 
    275 package tw.org.nchc.code;
    276 
    277 
    278 import java.io.IOException;
    279 
    280 
    281 import org.apache.hadoop.fs.FileStatus;
    282 
    283 import org.apache.hadoop.fs.FileSystem;
    284 
    285 import org.apache.hadoop.fs.Path;
    286 
    287 import org.apache.hadoop.hbase.HBaseAdmin;
    288 
    289 import org.apache.hadoop.hbase.HBaseConfiguration;
    290 
    291 import org.apache.hadoop.hbase.HColumnDescriptor;
    292 
    293 import org.apache.hadoop.hbase.HTable;
    294 
    295 import org.apache.hadoop.hbase.HTableDescriptor;
    296 
    297 import org.apache.hadoop.io.Text;
    298 
    299 import org.apache.hadoop.io.Writable;
    300 
    301 import org.apache.hadoop.io.WritableComparable;
    302 
    303 import org.apache.hadoop.mapred.ClusterStatus;
    304 
    305 import org.apache.hadoop.mapred.JobClient;
    306 
    307 import org.apache.hadoop.mapred.JobConf;
    308 
    309 import org.apache.hadoop.mapred.MapReduceBase;
    310 
    311 import org.apache.hadoop.mapred.Mapper;
    312 
    313 import org.apache.hadoop.mapred.OutputCollector;
    314 
    315 import org.apache.hadoop.mapred.Reporter;
    316 
    317 
    318 
    319 // import AccessLogParser
    320 
    321 /**
    322 
    323  * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
    324 
    325  * W3CExtended, IISw3cExtended)
    326 
    327  */
    328 
    329 public class LogParserGo {
    330 
    331         static HBaseConfiguration conf = new HBaseConfiguration();
    332 
    333 
    334 
    335         public static final String TABLE = "table.name";
    336 
    337 
    338 
    339         static String tableName;
    340 
    341 
    342 
    343         static HTable table = null;
    344 
    345 
    346 
    347 //      static boolean eclipseRun = false;
    348 
    349         static void print(String str){
    350 
    351                 System.out.println("STR  = "+str);
    352 
    353         }
    354 
    355         public static class MapClass extends MapReduceBase implements
    356 
    357                         Mapper<WritableComparable, Text, Text, Writable> {
    358 
    359 
    360 
    361                 @Override
    362 
    363                 public void configure(JobConf job) {
    364 
    365                         tableName = job.get(TABLE, "");
    366 
    367                 }
    368 
    369 
    370 
    371                 public void map(WritableComparable key, Text value,
    372 
    373                                 OutputCollector<Text, Writable> output, Reporter reporter)
    374 
    375                                 throws IOException {
    376 
    377                        
    378 
    379                         try {
    380 
    381                                 /*
    382 
    383                                 print(value.toString());
    384 
    385                                 FileWriter out = new FileWriter(new File(
    386 
    387                                 "/home/waue/mr-result.txt"));
    388 
    389                                 out.write(value.toString());
    390 
    391                                 out.flush();
    392 
    393                                 out.close();
    394 
    395                                 */
    396 
    397                                 LogParser log = new LogParser(value.toString());
    398 
    399                                
    400 
    401                                 if (table == null)
    402 
    403                                         table = new HTable(conf, new Text(tableName));
    404 
    405                                 long lockId = table.startUpdate(new Text(log.getIp()));
    406 
    407                                 table.put(lockId, new Text("http:protocol"), log.getProtocol()
    408 
    409                                                 .getBytes());
    410 
    411                                 table.put(lockId, new Text("http:method"), log.getMethod()
    412 
    413                                                 .getBytes());
    414 
    415                                 table.put(lockId, new Text("http:code"), log.getCode()
    416 
    417                                                 .getBytes());
    418 
    419                                 table.put(lockId, new Text("http:bytesize"), log.getByteSize()
    420 
    421                                                 .getBytes());
    422 
    423                                 table.put(lockId, new Text("http:agent"), log.getAgent()
    424 
    425                                                 .getBytes());
    426 
    427                                 table.put(lockId, new Text("url:" + log.getUrl()), log
    428 
    429                                                 .getReferrer().getBytes());
    430 
    431                                 table.put(lockId, new Text("referrer:" + log.getReferrer()),
    432 
    433                                                 log.getUrl().getBytes());
    434 
    435                                 table.commit(lockId, log.getTimestamp());
    436 
    437                                
    438 
    439                         } catch (Exception e) {
    440 
    441                                 e.printStackTrace();
    442 
    443                         }
    444 
    445                        
    446 
    447                 }
    448 
    449         }
    450 
    451 
    452 
    453         // do it to resolve warning : FileSystem.listPaths
    454 
    455         static public Path[] listPaths(FileSystem fsm, Path path)
    456 
    457                         throws IOException {
    458 
    459                 FileStatus[] fss = fsm.listStatus(path);
    460 
    461                 int length = fss.length;
    462 
    463                 Path[] pi = new Path[length];
    464 
    465                 for (int i = 0; i < length; i++) {
    466 
    467                         pi[i] = fss[i].getPath();
    468 
    469                 }
    470 
    471                 return pi;
    472 
    473         }
    474 
    475 
    476 
    477         public static void runMapReduce(String table, String dir)
    478 
    479                         throws IOException {
    480 
    481                 Path tempDir = new Path("/tmp/Mylog/");
    482 
    483                 Path InputDir = new Path(dir);
    484 
    485                 FileSystem fs = FileSystem.get(conf);
    486 
    487                 JobConf jobConf = new JobConf(conf, LogParserGo.class);
    488 
    489                 jobConf.setJobName("apache log fetcher");
    490 
    491                 jobConf.set(TABLE, table);
    492 
    493                 Path[] in = listPaths(fs, InputDir);
    494 
    495                 if (fs.isFile(InputDir)) {
    496 
    497                         jobConf.setInputPath(InputDir);
    498 
    499                 } else {
    500 
    501                         for (int i = 0; i < in.length; i++) {
    502 
    503                                 if (fs.isFile(in[i])) {
    504 
    505                                         jobConf.addInputPath(in[i]);
    506 
    507                                 } else {
    508 
    509                                         Path[] sub = listPaths(fs, in[i]);
    510 
    511                                         for (int j = 0; j < sub.length; j++) {
    512 
    513                                                 if (fs.isFile(sub[j])) {
    514 
    515                                                         jobConf.addInputPath(sub[j]);
    516 
    517                                                 }
    518 
    519                                         }
    520 
    521                                 }
    522 
    523                         }
    524 
    525                 }
    526 
    527                 jobConf.setOutputPath(tempDir);
    528 
    529 
    530 
    531                 jobConf.setMapperClass(MapClass.class);
    532 
    533 
    534 
    535                 JobClient client = new JobClient(jobConf);
    536 
    537                 ClusterStatus cluster = client.getClusterStatus();
    538 
    539                 jobConf.setNumMapTasks(cluster.getMapTasks());
    540 
    541                 jobConf.setNumReduceTasks(0);
    542 
    543 
    544 
    545                 JobClient.runJob(jobConf);
    546 
    547 
    548 
    549                 fs.delete(tempDir);
    550 
    551                 fs.close();
    552 
    553         }
    554 
    555 
    556 
    557         public static void creatTable(String table) throws IOException {
    558 
    559                 HBaseAdmin admin = new HBaseAdmin(conf);
    560 
    561                 if (!admin.tableExists(new Text(table))) {
    562 
    563                         System.out.println("1. " + table
    564 
    565                                         + " table creating ... please wait");
    566 
    567                         HTableDescriptor tableDesc = new HTableDescriptor(table);
    568 
    569                         tableDesc.addFamily(new HColumnDescriptor("http:"));
    570 
    571                         tableDesc.addFamily(new HColumnDescriptor("url:"));
    572 
    573                         tableDesc.addFamily(new HColumnDescriptor("referrer:"));
    574 
    575                         admin.createTable(tableDesc);
    576 
    577                 } else {
    578 
    579                         System.out.println("1. " + table + " table already exists.");
    580 
    581                 }
    582 
    583                 System.out.println("2. access_log files fetching using map/reduce");
    584         }
    585         public static void main(String[] args) throws IOException {
    586                 String table_name = "apache-log2";
    587                 String dir = "/user/waue/apache-log";
    588                 creatTable(table_name);
    589                 runMapReduce(table_name, dir);
    590         }
    591 }
    592 
    593 }}}