| 264 |  |  | 
                          | 265 |  |  = LogParserGo.java =  | 
                          | 266 |  |  | 
                          | 267 |  | {{{ | 
                          | 268 |  | /** | 
                          | 269 |  |  * Program: LogFetcher.java | 
                          | 270 |  |  * Editor: Waue Chen  | 
                          | 271 |  |  * From :  NCHC. Taiwn | 
                          | 272 |  |  * Last Update Date: 07/02/2008 | 
                          | 273 |  |  */ | 
                          | 274 |  |  | 
                          | 275 |  | package tw.org.nchc.code; | 
                          | 276 |  |  | 
                          | 277 |  |  | 
                          | 278 |  | import java.io.IOException; | 
                          | 279 |  |  | 
                          | 280 |  |  | 
                          | 281 |  | import org.apache.hadoop.fs.FileStatus; | 
                          | 282 |  |  | 
                          | 283 |  | import org.apache.hadoop.fs.FileSystem; | 
                          | 284 |  |  | 
                          | 285 |  | import org.apache.hadoop.fs.Path; | 
                          | 286 |  |  | 
                          | 287 |  | import org.apache.hadoop.hbase.HBaseAdmin; | 
                          | 288 |  |  | 
                          | 289 |  | import org.apache.hadoop.hbase.HBaseConfiguration; | 
                          | 290 |  |  | 
                          | 291 |  | import org.apache.hadoop.hbase.HColumnDescriptor; | 
                          | 292 |  |  | 
                          | 293 |  | import org.apache.hadoop.hbase.HTable; | 
                          | 294 |  |  | 
                          | 295 |  | import org.apache.hadoop.hbase.HTableDescriptor; | 
                          | 296 |  |  | 
                          | 297 |  | import org.apache.hadoop.io.Text; | 
                          | 298 |  |  | 
                          | 299 |  | import org.apache.hadoop.io.Writable; | 
                          | 300 |  |  | 
                          | 301 |  | import org.apache.hadoop.io.WritableComparable; | 
                          | 302 |  |  | 
                          | 303 |  | import org.apache.hadoop.mapred.ClusterStatus; | 
                          | 304 |  |  | 
                          | 305 |  | import org.apache.hadoop.mapred.JobClient; | 
                          | 306 |  |  | 
                          | 307 |  | import org.apache.hadoop.mapred.JobConf; | 
                          | 308 |  |  | 
                          | 309 |  | import org.apache.hadoop.mapred.MapReduceBase; | 
                          | 310 |  |  | 
                          | 311 |  | import org.apache.hadoop.mapred.Mapper; | 
                          | 312 |  |  | 
                          | 313 |  | import org.apache.hadoop.mapred.OutputCollector; | 
                          | 314 |  |  | 
                          | 315 |  | import org.apache.hadoop.mapred.Reporter; | 
                          | 316 |  |  | 
                          | 317 |  |  | 
                          | 318 |  |  | 
                          | 319 |  | // import AccessLogParser | 
                          | 320 |  |  | 
                          | 321 |  | /** | 
                          | 322 |  |  | 
                          | 323 |  |  * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default, | 
                          | 324 |  |  | 
                          | 325 |  |  * W3CExtended, IISw3cExtended) | 
                          | 326 |  |  | 
                          | 327 |  |  */ | 
                          | 328 |  |  | 
                          | 329 |  | public class LogParserGo { | 
                          | 330 |  |  | 
                          | 331 |  |         static HBaseConfiguration conf = new HBaseConfiguration(); | 
                          | 332 |  |  | 
                          | 333 |  |  | 
                          | 334 |  |  | 
                          | 335 |  |         public static final String TABLE = "table.name"; | 
                          | 336 |  |  | 
                          | 337 |  |  | 
                          | 338 |  |  | 
                          | 339 |  |         static String tableName; | 
                          | 340 |  |  | 
                          | 341 |  |  | 
                          | 342 |  |  | 
                          | 343 |  |         static HTable table = null; | 
                          | 344 |  |  | 
                          | 345 |  |  | 
                          | 346 |  |  | 
                          | 347 |  | //      static boolean eclipseRun = false; | 
                          | 348 |  |  | 
                          | 349 |  |         static void print(String str){ | 
                          | 350 |  |  | 
                          | 351 |  |                 System.out.println("STR  = "+str); | 
                          | 352 |  |  | 
                          | 353 |  |         } | 
                          | 354 |  |  | 
                          | 355 |  |         public static class MapClass extends MapReduceBase implements | 
                          | 356 |  |  | 
                          | 357 |  |                         Mapper<WritableComparable, Text, Text, Writable> { | 
                          | 358 |  |  | 
                          | 359 |  |  | 
                          | 360 |  |  | 
                          | 361 |  |                 @Override | 
                          | 362 |  |  | 
                          | 363 |  |                 public void configure(JobConf job) { | 
                          | 364 |  |  | 
                          | 365 |  |                         tableName = job.get(TABLE, ""); | 
                          | 366 |  |  | 
                          | 367 |  |                 } | 
                          | 368 |  |  | 
                          | 369 |  |  | 
                          | 370 |  |  | 
                          | 371 |  |                 public void map(WritableComparable key, Text value, | 
                          | 372 |  |  | 
                          | 373 |  |                                 OutputCollector<Text, Writable> output, Reporter reporter) | 
                          | 374 |  |  | 
                          | 375 |  |                                 throws IOException { | 
                          | 376 |  |  | 
                          | 377 |  |                          | 
                          | 378 |  |  | 
                          | 379 |  |                         try { | 
                          | 380 |  |  | 
                          | 381 |  |                                 /* | 
                          | 382 |  |  | 
                          | 383 |  |                                 print(value.toString()); | 
                          | 384 |  |  | 
                          | 385 |  |                                 FileWriter out = new FileWriter(new File( | 
                          | 386 |  |  | 
                          | 387 |  |                                 "/home/waue/mr-result.txt")); | 
                          | 388 |  |  | 
                          | 389 |  |                                 out.write(value.toString()); | 
                          | 390 |  |  | 
                          | 391 |  |                                 out.flush(); | 
                          | 392 |  |  | 
                          | 393 |  |                                 out.close(); | 
                          | 394 |  |  | 
                          | 395 |  |                                 */ | 
                          | 396 |  |  | 
                          | 397 |  |                                 LogParser log = new LogParser(value.toString()); | 
                          | 398 |  |  | 
                          | 399 |  |                                  | 
                          | 400 |  |  | 
                          | 401 |  |                                 if (table == null) | 
                          | 402 |  |  | 
                          | 403 |  |                                         table = new HTable(conf, new Text(tableName)); | 
                          | 404 |  |  | 
                          | 405 |  |                                 long lockId = table.startUpdate(new Text(log.getIp())); | 
                          | 406 |  |  | 
                          | 407 |  |                                 table.put(lockId, new Text("http:protocol"), log.getProtocol() | 
                          | 408 |  |  | 
                          | 409 |  |                                                 .getBytes()); | 
                          | 410 |  |  | 
                          | 411 |  |                                 table.put(lockId, new Text("http:method"), log.getMethod() | 
                          | 412 |  |  | 
                          | 413 |  |                                                 .getBytes()); | 
                          | 414 |  |  | 
                          | 415 |  |                                 table.put(lockId, new Text("http:code"), log.getCode() | 
                          | 416 |  |  | 
                          | 417 |  |                                                 .getBytes()); | 
                          | 418 |  |  | 
                          | 419 |  |                                 table.put(lockId, new Text("http:bytesize"), log.getByteSize() | 
                          | 420 |  |  | 
                          | 421 |  |                                                 .getBytes()); | 
                          | 422 |  |  | 
                          | 423 |  |                                 table.put(lockId, new Text("http:agent"), log.getAgent() | 
                          | 424 |  |  | 
                          | 425 |  |                                                 .getBytes()); | 
                          | 426 |  |  | 
                          | 427 |  |                                 table.put(lockId, new Text("url:" + log.getUrl()), log | 
                          | 428 |  |  | 
                          | 429 |  |                                                 .getReferrer().getBytes()); | 
                          | 430 |  |  | 
                          | 431 |  |                                 table.put(lockId, new Text("referrer:" + log.getReferrer()), | 
                          | 432 |  |  | 
                          | 433 |  |                                                 log.getUrl().getBytes()); | 
                          | 434 |  |  | 
                          | 435 |  |                                 table.commit(lockId, log.getTimestamp()); | 
                          | 436 |  |  | 
                          | 437 |  |                                  | 
                          | 438 |  |  | 
                          | 439 |  |                         } catch (Exception e) { | 
                          | 440 |  |  | 
                          | 441 |  |                                 e.printStackTrace(); | 
                          | 442 |  |  | 
                          | 443 |  |                         } | 
                          | 444 |  |  | 
                          | 445 |  |                          | 
                          | 446 |  |  | 
                          | 447 |  |                 } | 
                          | 448 |  |  | 
                          | 449 |  |         } | 
                          | 450 |  |  | 
                          | 451 |  |  | 
                          | 452 |  |  | 
                          | 453 |  |         // do it to resolve warning : FileSystem.listPaths | 
                          | 454 |  |  | 
                          | 455 |  |         static public Path[] listPaths(FileSystem fsm, Path path) | 
                          | 456 |  |  | 
                          | 457 |  |                         throws IOException { | 
                          | 458 |  |  | 
                          | 459 |  |                 FileStatus[] fss = fsm.listStatus(path); | 
                          | 460 |  |  | 
                          | 461 |  |                 int length = fss.length; | 
                          | 462 |  |  | 
                          | 463 |  |                 Path[] pi = new Path[length]; | 
                          | 464 |  |  | 
                          | 465 |  |                 for (int i = 0; i < length; i++) { | 
                          | 466 |  |  | 
                          | 467 |  |                         pi[i] = fss[i].getPath(); | 
                          | 468 |  |  | 
                          | 469 |  |                 } | 
                          | 470 |  |  | 
                          | 471 |  |                 return pi; | 
                          | 472 |  |  | 
                          | 473 |  |         } | 
                          | 474 |  |  | 
                          | 475 |  |  | 
                          | 476 |  |  | 
                          | 477 |  |         public static void runMapReduce(String table, String dir) | 
                          | 478 |  |  | 
                          | 479 |  |                         throws IOException { | 
                          | 480 |  |  | 
                          | 481 |  |                 Path tempDir = new Path("/tmp/Mylog/"); | 
                          | 482 |  |  | 
                          | 483 |  |                 Path InputDir = new Path(dir); | 
                          | 484 |  |  | 
                          | 485 |  |                 FileSystem fs = FileSystem.get(conf); | 
                          | 486 |  |  | 
                          | 487 |  |                 JobConf jobConf = new JobConf(conf, LogParserGo.class); | 
                          | 488 |  |  | 
                          | 489 |  |                 jobConf.setJobName("apache log fetcher"); | 
                          | 490 |  |  | 
                          | 491 |  |                 jobConf.set(TABLE, table); | 
                          | 492 |  |  | 
                          | 493 |  |                 Path[] in = listPaths(fs, InputDir); | 
                          | 494 |  |  | 
                          | 495 |  |                 if (fs.isFile(InputDir)) { | 
                          | 496 |  |  | 
                          | 497 |  |                         jobConf.setInputPath(InputDir); | 
                          | 498 |  |  | 
                          | 499 |  |                 } else { | 
                          | 500 |  |  | 
                          | 501 |  |                         for (int i = 0; i < in.length; i++) { | 
                          | 502 |  |  | 
                          | 503 |  |                                 if (fs.isFile(in[i])) { | 
                          | 504 |  |  | 
                          | 505 |  |                                         jobConf.addInputPath(in[i]); | 
                          | 506 |  |  | 
                          | 507 |  |                                 } else { | 
                          | 508 |  |  | 
                          | 509 |  |                                         Path[] sub = listPaths(fs, in[i]); | 
                          | 510 |  |  | 
                          | 511 |  |                                         for (int j = 0; j < sub.length; j++) { | 
                          | 512 |  |  | 
                          | 513 |  |                                                 if (fs.isFile(sub[j])) { | 
                          | 514 |  |  | 
                          | 515 |  |                                                         jobConf.addInputPath(sub[j]); | 
                          | 516 |  |  | 
                          | 517 |  |                                                 } | 
                          | 518 |  |  | 
                          | 519 |  |                                         } | 
                          | 520 |  |  | 
                          | 521 |  |                                 } | 
                          | 522 |  |  | 
                          | 523 |  |                         } | 
                          | 524 |  |  | 
                          | 525 |  |                 } | 
                          | 526 |  |  | 
                          | 527 |  |                 jobConf.setOutputPath(tempDir); | 
                          | 528 |  |  | 
                          | 529 |  |  | 
                          | 530 |  |  | 
                          | 531 |  |                 jobConf.setMapperClass(MapClass.class); | 
                          | 532 |  |  | 
                          | 533 |  |  | 
                          | 534 |  |  | 
                          | 535 |  |                 JobClient client = new JobClient(jobConf); | 
                          | 536 |  |  | 
                          | 537 |  |                 ClusterStatus cluster = client.getClusterStatus(); | 
                          | 538 |  |  | 
                          | 539 |  |                 jobConf.setNumMapTasks(cluster.getMapTasks()); | 
                          | 540 |  |  | 
                          | 541 |  |                 jobConf.setNumReduceTasks(0); | 
                          | 542 |  |  | 
                          | 543 |  |  | 
                          | 544 |  |  | 
                          | 545 |  |                 JobClient.runJob(jobConf); | 
                          | 546 |  |  | 
                          | 547 |  |  | 
                          | 548 |  |  | 
                          | 549 |  |                 fs.delete(tempDir); | 
                          | 550 |  |  | 
                          | 551 |  |                 fs.close(); | 
                          | 552 |  |  | 
                          | 553 |  |         } | 
                          | 554 |  |  | 
                          | 555 |  |  | 
                          | 556 |  |  | 
                          | 557 |  |         public static void creatTable(String table) throws IOException { | 
                          | 558 |  |  | 
                          | 559 |  |                 HBaseAdmin admin = new HBaseAdmin(conf); | 
                          | 560 |  |  | 
                          | 561 |  |                 if (!admin.tableExists(new Text(table))) { | 
                          | 562 |  |  | 
                          | 563 |  |                         System.out.println("1. " + table | 
                          | 564 |  |  | 
                          | 565 |  |                                         + " table creating ... please wait"); | 
                          | 566 |  |  | 
                          | 567 |  |                         HTableDescriptor tableDesc = new HTableDescriptor(table); | 
                          | 568 |  |  | 
                          | 569 |  |                         tableDesc.addFamily(new HColumnDescriptor("http:")); | 
                          | 570 |  |  | 
                          | 571 |  |                         tableDesc.addFamily(new HColumnDescriptor("url:")); | 
                          | 572 |  |  | 
                          | 573 |  |                         tableDesc.addFamily(new HColumnDescriptor("referrer:")); | 
                          | 574 |  |  | 
                          | 575 |  |                         admin.createTable(tableDesc); | 
                          | 576 |  |  | 
                          | 577 |  |                 } else { | 
                          | 578 |  |  | 
                          | 579 |  |                         System.out.println("1. " + table + " table already exists."); | 
                          | 580 |  |  | 
                          | 581 |  |                 } | 
                          | 582 |  |  | 
                          | 583 |  |                 System.out.println("2. access_log files fetching using map/reduce"); | 
                          | 584 |  |         } | 
                          | 585 |  |         public static void main(String[] args) throws IOException { | 
                          | 586 |  |                 String table_name = "apache-log2"; | 
                          | 587 |  |                 String dir = "/user/waue/apache-log"; | 
                          | 588 |  |                 creatTable(table_name); | 
                          | 589 |  |                 runMapReduce(table_name, dir); | 
                          | 590 |  |         } | 
                          | 591 |  | } | 
                          | 592 |  |  | 
                          | 593 |  | }}} |