264 | | |
265 | | = LogParserGo.java = |
266 | | |
267 | | {{{ |
268 | | /** |
269 | | * Program: LogFetcher.java |
270 | | * Editor: Waue Chen |
271 | | * From : NCHC. Taiwn |
272 | | * Last Update Date: 07/02/2008 |
273 | | */ |
274 | | |
275 | | package tw.org.nchc.code; |
276 | | |
277 | | |
278 | | import java.io.IOException; |
279 | | |
280 | | |
281 | | import org.apache.hadoop.fs.FileStatus; |
282 | | |
283 | | import org.apache.hadoop.fs.FileSystem; |
284 | | |
285 | | import org.apache.hadoop.fs.Path; |
286 | | |
287 | | import org.apache.hadoop.hbase.HBaseAdmin; |
288 | | |
289 | | import org.apache.hadoop.hbase.HBaseConfiguration; |
290 | | |
291 | | import org.apache.hadoop.hbase.HColumnDescriptor; |
292 | | |
293 | | import org.apache.hadoop.hbase.HTable; |
294 | | |
295 | | import org.apache.hadoop.hbase.HTableDescriptor; |
296 | | |
297 | | import org.apache.hadoop.io.Text; |
298 | | |
299 | | import org.apache.hadoop.io.Writable; |
300 | | |
301 | | import org.apache.hadoop.io.WritableComparable; |
302 | | |
303 | | import org.apache.hadoop.mapred.ClusterStatus; |
304 | | |
305 | | import org.apache.hadoop.mapred.JobClient; |
306 | | |
307 | | import org.apache.hadoop.mapred.JobConf; |
308 | | |
309 | | import org.apache.hadoop.mapred.MapReduceBase; |
310 | | |
311 | | import org.apache.hadoop.mapred.Mapper; |
312 | | |
313 | | import org.apache.hadoop.mapred.OutputCollector; |
314 | | |
315 | | import org.apache.hadoop.mapred.Reporter; |
316 | | |
317 | | |
318 | | |
319 | | // import AccessLogParser |
320 | | |
321 | | /** |
322 | | |
323 | | * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default, |
324 | | |
325 | | * W3CExtended, IISw3cExtended) |
326 | | |
327 | | */ |
328 | | |
329 | | public class LogParserGo { |
330 | | |
331 | | static HBaseConfiguration conf = new HBaseConfiguration(); |
332 | | |
333 | | |
334 | | |
335 | | public static final String TABLE = "table.name"; |
336 | | |
337 | | |
338 | | |
339 | | static String tableName; |
340 | | |
341 | | |
342 | | |
343 | | static HTable table = null; |
344 | | |
345 | | |
346 | | |
347 | | // static boolean eclipseRun = false; |
348 | | |
349 | | static void print(String str){ |
350 | | |
351 | | System.out.println("STR = "+str); |
352 | | |
353 | | } |
354 | | |
355 | | public static class MapClass extends MapReduceBase implements |
356 | | |
357 | | Mapper<WritableComparable, Text, Text, Writable> { |
358 | | |
359 | | |
360 | | |
361 | | @Override |
362 | | |
363 | | public void configure(JobConf job) { |
364 | | |
365 | | tableName = job.get(TABLE, ""); |
366 | | |
367 | | } |
368 | | |
369 | | |
370 | | |
371 | | public void map(WritableComparable key, Text value, |
372 | | |
373 | | OutputCollector<Text, Writable> output, Reporter reporter) |
374 | | |
375 | | throws IOException { |
376 | | |
377 | | |
378 | | |
379 | | try { |
380 | | |
381 | | /* |
382 | | |
383 | | print(value.toString()); |
384 | | |
385 | | FileWriter out = new FileWriter(new File( |
386 | | |
387 | | "/home/waue/mr-result.txt")); |
388 | | |
389 | | out.write(value.toString()); |
390 | | |
391 | | out.flush(); |
392 | | |
393 | | out.close(); |
394 | | |
395 | | */ |
396 | | |
397 | | LogParser log = new LogParser(value.toString()); |
398 | | |
399 | | |
400 | | |
401 | | if (table == null) |
402 | | |
403 | | table = new HTable(conf, new Text(tableName)); |
404 | | |
405 | | long lockId = table.startUpdate(new Text(log.getIp())); |
406 | | |
407 | | table.put(lockId, new Text("http:protocol"), log.getProtocol() |
408 | | |
409 | | .getBytes()); |
410 | | |
411 | | table.put(lockId, new Text("http:method"), log.getMethod() |
412 | | |
413 | | .getBytes()); |
414 | | |
415 | | table.put(lockId, new Text("http:code"), log.getCode() |
416 | | |
417 | | .getBytes()); |
418 | | |
419 | | table.put(lockId, new Text("http:bytesize"), log.getByteSize() |
420 | | |
421 | | .getBytes()); |
422 | | |
423 | | table.put(lockId, new Text("http:agent"), log.getAgent() |
424 | | |
425 | | .getBytes()); |
426 | | |
427 | | table.put(lockId, new Text("url:" + log.getUrl()), log |
428 | | |
429 | | .getReferrer().getBytes()); |
430 | | |
431 | | table.put(lockId, new Text("referrer:" + log.getReferrer()), |
432 | | |
433 | | log.getUrl().getBytes()); |
434 | | |
435 | | table.commit(lockId, log.getTimestamp()); |
436 | | |
437 | | |
438 | | |
439 | | } catch (Exception e) { |
440 | | |
441 | | e.printStackTrace(); |
442 | | |
443 | | } |
444 | | |
445 | | |
446 | | |
447 | | } |
448 | | |
449 | | } |
450 | | |
451 | | |
452 | | |
453 | | // do it to resolve warning : FileSystem.listPaths |
454 | | |
455 | | static public Path[] listPaths(FileSystem fsm, Path path) |
456 | | |
457 | | throws IOException { |
458 | | |
459 | | FileStatus[] fss = fsm.listStatus(path); |
460 | | |
461 | | int length = fss.length; |
462 | | |
463 | | Path[] pi = new Path[length]; |
464 | | |
465 | | for (int i = 0; i < length; i++) { |
466 | | |
467 | | pi[i] = fss[i].getPath(); |
468 | | |
469 | | } |
470 | | |
471 | | return pi; |
472 | | |
473 | | } |
474 | | |
475 | | |
476 | | |
477 | | public static void runMapReduce(String table, String dir) |
478 | | |
479 | | throws IOException { |
480 | | |
481 | | Path tempDir = new Path("/tmp/Mylog/"); |
482 | | |
483 | | Path InputDir = new Path(dir); |
484 | | |
485 | | FileSystem fs = FileSystem.get(conf); |
486 | | |
487 | | JobConf jobConf = new JobConf(conf, LogParserGo.class); |
488 | | |
489 | | jobConf.setJobName("apache log fetcher"); |
490 | | |
491 | | jobConf.set(TABLE, table); |
492 | | |
493 | | Path[] in = listPaths(fs, InputDir); |
494 | | |
495 | | if (fs.isFile(InputDir)) { |
496 | | |
497 | | jobConf.setInputPath(InputDir); |
498 | | |
499 | | } else { |
500 | | |
501 | | for (int i = 0; i < in.length; i++) { |
502 | | |
503 | | if (fs.isFile(in[i])) { |
504 | | |
505 | | jobConf.addInputPath(in[i]); |
506 | | |
507 | | } else { |
508 | | |
509 | | Path[] sub = listPaths(fs, in[i]); |
510 | | |
511 | | for (int j = 0; j < sub.length; j++) { |
512 | | |
513 | | if (fs.isFile(sub[j])) { |
514 | | |
515 | | jobConf.addInputPath(sub[j]); |
516 | | |
517 | | } |
518 | | |
519 | | } |
520 | | |
521 | | } |
522 | | |
523 | | } |
524 | | |
525 | | } |
526 | | |
527 | | jobConf.setOutputPath(tempDir); |
528 | | |
529 | | |
530 | | |
531 | | jobConf.setMapperClass(MapClass.class); |
532 | | |
533 | | |
534 | | |
535 | | JobClient client = new JobClient(jobConf); |
536 | | |
537 | | ClusterStatus cluster = client.getClusterStatus(); |
538 | | |
539 | | jobConf.setNumMapTasks(cluster.getMapTasks()); |
540 | | |
541 | | jobConf.setNumReduceTasks(0); |
542 | | |
543 | | |
544 | | |
545 | | JobClient.runJob(jobConf); |
546 | | |
547 | | |
548 | | |
549 | | fs.delete(tempDir); |
550 | | |
551 | | fs.close(); |
552 | | |
553 | | } |
554 | | |
555 | | |
556 | | |
557 | | public static void creatTable(String table) throws IOException { |
558 | | |
559 | | HBaseAdmin admin = new HBaseAdmin(conf); |
560 | | |
561 | | if (!admin.tableExists(new Text(table))) { |
562 | | |
563 | | System.out.println("1. " + table |
564 | | |
565 | | + " table creating ... please wait"); |
566 | | |
567 | | HTableDescriptor tableDesc = new HTableDescriptor(table); |
568 | | |
569 | | tableDesc.addFamily(new HColumnDescriptor("http:")); |
570 | | |
571 | | tableDesc.addFamily(new HColumnDescriptor("url:")); |
572 | | |
573 | | tableDesc.addFamily(new HColumnDescriptor("referrer:")); |
574 | | |
575 | | admin.createTable(tableDesc); |
576 | | |
577 | | } else { |
578 | | |
579 | | System.out.println("1. " + table + " table already exists."); |
580 | | |
581 | | } |
582 | | |
583 | | System.out.println("2. access_log files fetching using map/reduce"); |
584 | | } |
585 | | public static void main(String[] args) throws IOException { |
586 | | String table_name = "apache-log2"; |
587 | | String dir = "/user/waue/apache-log"; |
588 | | creatTable(table_name); |
589 | | runMapReduce(table_name, dir); |
590 | | } |
591 | | } |
592 | | |
593 | | }}} |