| 264 | | |
| 265 | | = LogParserGo.java = |
| 266 | | |
| 267 | | {{{ |
| 268 | | /** |
| 269 | | * Program: LogFetcher.java |
| 270 | | * Editor: Waue Chen |
| 271 | | * From : NCHC. Taiwn |
| 272 | | * Last Update Date: 07/02/2008 |
| 273 | | */ |
| 274 | | |
| 275 | | package tw.org.nchc.code; |
| 276 | | |
| 277 | | |
| 278 | | import java.io.IOException; |
| 279 | | |
| 280 | | |
| 281 | | import org.apache.hadoop.fs.FileStatus; |
| 282 | | |
| 283 | | import org.apache.hadoop.fs.FileSystem; |
| 284 | | |
| 285 | | import org.apache.hadoop.fs.Path; |
| 286 | | |
| 287 | | import org.apache.hadoop.hbase.HBaseAdmin; |
| 288 | | |
| 289 | | import org.apache.hadoop.hbase.HBaseConfiguration; |
| 290 | | |
| 291 | | import org.apache.hadoop.hbase.HColumnDescriptor; |
| 292 | | |
| 293 | | import org.apache.hadoop.hbase.HTable; |
| 294 | | |
| 295 | | import org.apache.hadoop.hbase.HTableDescriptor; |
| 296 | | |
| 297 | | import org.apache.hadoop.io.Text; |
| 298 | | |
| 299 | | import org.apache.hadoop.io.Writable; |
| 300 | | |
| 301 | | import org.apache.hadoop.io.WritableComparable; |
| 302 | | |
| 303 | | import org.apache.hadoop.mapred.ClusterStatus; |
| 304 | | |
| 305 | | import org.apache.hadoop.mapred.JobClient; |
| 306 | | |
| 307 | | import org.apache.hadoop.mapred.JobConf; |
| 308 | | |
| 309 | | import org.apache.hadoop.mapred.MapReduceBase; |
| 310 | | |
| 311 | | import org.apache.hadoop.mapred.Mapper; |
| 312 | | |
| 313 | | import org.apache.hadoop.mapred.OutputCollector; |
| 314 | | |
| 315 | | import org.apache.hadoop.mapred.Reporter; |
| 316 | | |
| 317 | | |
| 318 | | |
| 319 | | // import AccessLogParser |
| 320 | | |
| 321 | | /** |
| 322 | | |
| 323 | | * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default, |
| 324 | | |
| 325 | | * W3CExtended, IISw3cExtended) |
| 326 | | |
| 327 | | */ |
| 328 | | |
| 329 | | public class LogParserGo { |
| 330 | | |
| 331 | | static HBaseConfiguration conf = new HBaseConfiguration(); |
| 332 | | |
| 333 | | |
| 334 | | |
| 335 | | public static final String TABLE = "table.name"; |
| 336 | | |
| 337 | | |
| 338 | | |
| 339 | | static String tableName; |
| 340 | | |
| 341 | | |
| 342 | | |
| 343 | | static HTable table = null; |
| 344 | | |
| 345 | | |
| 346 | | |
| 347 | | // static boolean eclipseRun = false; |
| 348 | | |
| 349 | | static void print(String str){ |
| 350 | | |
| 351 | | System.out.println("STR = "+str); |
| 352 | | |
| 353 | | } |
| 354 | | |
| 355 | | public static class MapClass extends MapReduceBase implements |
| 356 | | |
| 357 | | Mapper<WritableComparable, Text, Text, Writable> { |
| 358 | | |
| 359 | | |
| 360 | | |
| 361 | | @Override |
| 362 | | |
| 363 | | public void configure(JobConf job) { |
| 364 | | |
| 365 | | tableName = job.get(TABLE, ""); |
| 366 | | |
| 367 | | } |
| 368 | | |
| 369 | | |
| 370 | | |
| 371 | | public void map(WritableComparable key, Text value, |
| 372 | | |
| 373 | | OutputCollector<Text, Writable> output, Reporter reporter) |
| 374 | | |
| 375 | | throws IOException { |
| 376 | | |
| 377 | | |
| 378 | | |
| 379 | | try { |
| 380 | | |
| 381 | | /* |
| 382 | | |
| 383 | | print(value.toString()); |
| 384 | | |
| 385 | | FileWriter out = new FileWriter(new File( |
| 386 | | |
| 387 | | "/home/waue/mr-result.txt")); |
| 388 | | |
| 389 | | out.write(value.toString()); |
| 390 | | |
| 391 | | out.flush(); |
| 392 | | |
| 393 | | out.close(); |
| 394 | | |
| 395 | | */ |
| 396 | | |
| 397 | | LogParser log = new LogParser(value.toString()); |
| 398 | | |
| 399 | | |
| 400 | | |
| 401 | | if (table == null) |
| 402 | | |
| 403 | | table = new HTable(conf, new Text(tableName)); |
| 404 | | |
| 405 | | long lockId = table.startUpdate(new Text(log.getIp())); |
| 406 | | |
| 407 | | table.put(lockId, new Text("http:protocol"), log.getProtocol() |
| 408 | | |
| 409 | | .getBytes()); |
| 410 | | |
| 411 | | table.put(lockId, new Text("http:method"), log.getMethod() |
| 412 | | |
| 413 | | .getBytes()); |
| 414 | | |
| 415 | | table.put(lockId, new Text("http:code"), log.getCode() |
| 416 | | |
| 417 | | .getBytes()); |
| 418 | | |
| 419 | | table.put(lockId, new Text("http:bytesize"), log.getByteSize() |
| 420 | | |
| 421 | | .getBytes()); |
| 422 | | |
| 423 | | table.put(lockId, new Text("http:agent"), log.getAgent() |
| 424 | | |
| 425 | | .getBytes()); |
| 426 | | |
| 427 | | table.put(lockId, new Text("url:" + log.getUrl()), log |
| 428 | | |
| 429 | | .getReferrer().getBytes()); |
| 430 | | |
| 431 | | table.put(lockId, new Text("referrer:" + log.getReferrer()), |
| 432 | | |
| 433 | | log.getUrl().getBytes()); |
| 434 | | |
| 435 | | table.commit(lockId, log.getTimestamp()); |
| 436 | | |
| 437 | | |
| 438 | | |
| 439 | | } catch (Exception e) { |
| 440 | | |
| 441 | | e.printStackTrace(); |
| 442 | | |
| 443 | | } |
| 444 | | |
| 445 | | |
| 446 | | |
| 447 | | } |
| 448 | | |
| 449 | | } |
| 450 | | |
| 451 | | |
| 452 | | |
| 453 | | // do it to resolve warning : FileSystem.listPaths |
| 454 | | |
| 455 | | static public Path[] listPaths(FileSystem fsm, Path path) |
| 456 | | |
| 457 | | throws IOException { |
| 458 | | |
| 459 | | FileStatus[] fss = fsm.listStatus(path); |
| 460 | | |
| 461 | | int length = fss.length; |
| 462 | | |
| 463 | | Path[] pi = new Path[length]; |
| 464 | | |
| 465 | | for (int i = 0; i < length; i++) { |
| 466 | | |
| 467 | | pi[i] = fss[i].getPath(); |
| 468 | | |
| 469 | | } |
| 470 | | |
| 471 | | return pi; |
| 472 | | |
| 473 | | } |
| 474 | | |
| 475 | | |
| 476 | | |
| 477 | | public static void runMapReduce(String table, String dir) |
| 478 | | |
| 479 | | throws IOException { |
| 480 | | |
| 481 | | Path tempDir = new Path("/tmp/Mylog/"); |
| 482 | | |
| 483 | | Path InputDir = new Path(dir); |
| 484 | | |
| 485 | | FileSystem fs = FileSystem.get(conf); |
| 486 | | |
| 487 | | JobConf jobConf = new JobConf(conf, LogParserGo.class); |
| 488 | | |
| 489 | | jobConf.setJobName("apache log fetcher"); |
| 490 | | |
| 491 | | jobConf.set(TABLE, table); |
| 492 | | |
| 493 | | Path[] in = listPaths(fs, InputDir); |
| 494 | | |
| 495 | | if (fs.isFile(InputDir)) { |
| 496 | | |
| 497 | | jobConf.setInputPath(InputDir); |
| 498 | | |
| 499 | | } else { |
| 500 | | |
| 501 | | for (int i = 0; i < in.length; i++) { |
| 502 | | |
| 503 | | if (fs.isFile(in[i])) { |
| 504 | | |
| 505 | | jobConf.addInputPath(in[i]); |
| 506 | | |
| 507 | | } else { |
| 508 | | |
| 509 | | Path[] sub = listPaths(fs, in[i]); |
| 510 | | |
| 511 | | for (int j = 0; j < sub.length; j++) { |
| 512 | | |
| 513 | | if (fs.isFile(sub[j])) { |
| 514 | | |
| 515 | | jobConf.addInputPath(sub[j]); |
| 516 | | |
| 517 | | } |
| 518 | | |
| 519 | | } |
| 520 | | |
| 521 | | } |
| 522 | | |
| 523 | | } |
| 524 | | |
| 525 | | } |
| 526 | | |
| 527 | | jobConf.setOutputPath(tempDir); |
| 528 | | |
| 529 | | |
| 530 | | |
| 531 | | jobConf.setMapperClass(MapClass.class); |
| 532 | | |
| 533 | | |
| 534 | | |
| 535 | | JobClient client = new JobClient(jobConf); |
| 536 | | |
| 537 | | ClusterStatus cluster = client.getClusterStatus(); |
| 538 | | |
| 539 | | jobConf.setNumMapTasks(cluster.getMapTasks()); |
| 540 | | |
| 541 | | jobConf.setNumReduceTasks(0); |
| 542 | | |
| 543 | | |
| 544 | | |
| 545 | | JobClient.runJob(jobConf); |
| 546 | | |
| 547 | | |
| 548 | | |
| 549 | | fs.delete(tempDir); |
| 550 | | |
| 551 | | fs.close(); |
| 552 | | |
| 553 | | } |
| 554 | | |
| 555 | | |
| 556 | | |
| 557 | | public static void creatTable(String table) throws IOException { |
| 558 | | |
| 559 | | HBaseAdmin admin = new HBaseAdmin(conf); |
| 560 | | |
| 561 | | if (!admin.tableExists(new Text(table))) { |
| 562 | | |
| 563 | | System.out.println("1. " + table |
| 564 | | |
| 565 | | + " table creating ... please wait"); |
| 566 | | |
| 567 | | HTableDescriptor tableDesc = new HTableDescriptor(table); |
| 568 | | |
| 569 | | tableDesc.addFamily(new HColumnDescriptor("http:")); |
| 570 | | |
| 571 | | tableDesc.addFamily(new HColumnDescriptor("url:")); |
| 572 | | |
| 573 | | tableDesc.addFamily(new HColumnDescriptor("referrer:")); |
| 574 | | |
| 575 | | admin.createTable(tableDesc); |
| 576 | | |
| 577 | | } else { |
| 578 | | |
| 579 | | System.out.println("1. " + table + " table already exists."); |
| 580 | | |
| 581 | | } |
| 582 | | |
| 583 | | System.out.println("2. access_log files fetching using map/reduce"); |
| 584 | | } |
| 585 | | public static void main(String[] args) throws IOException { |
| 586 | | String table_name = "apache-log2"; |
| 587 | | String dir = "/user/waue/apache-log"; |
| 588 | | creatTable(table_name); |
| 589 | | runMapReduce(table_name, dir); |
| 590 | | } |
| 591 | | } |
| 592 | | |
| 593 | | }}} |