wiki:LogParser

Version 2 (modified by waue, 16 years ago) (diff)

--

目的

This program will parse your apache log and store it into Hbase.

如何使用

  • 1. Upload apache logs ( /var/log/apache2/access.log* ) to hdfs (default: /user/waue/apache-log) \
$ bin/hadoop dfs -put /var/log/apache2/ apache-log
  • 2. parameter "dir" in main contains the logs.
  • 3. you should filter the exception contents manually,

{{{ ex: ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...

}}}

}}}

結果

1 執行以下指令

	hql > select * from apache-log;

2 結果

+-------------------------+-------------------------+-------------------------+

| Row                     | Column                  | Cell                    |

+-------------------------+-------------------------+-------------------------+

| 118.170.101.250         | http:agent              | Mozilla/4.0 (compatible;|

|                         |                         |  MSIE 4.01; Windows 95) |

+-------------------------+-------------------------+-------------------------+

| 118.170.101.250         | http:bytesize           | 318                     |

+-------------------------+-------------------------+-------------------------+

..........(skip)........

+-------------------------+-------------------------+-------------------------+

| 87.65.93.58             | http:method             | OPTIONS                 |

+-------------------------+-------------------------+-------------------------+

| 87.65.93.58             | http:protocol           | HTTP/1.1                |

+-------------------------+-------------------------+-------------------------+

| 87.65.93.58             | referrer:-              | *                       |

+-------------------------+-------------------------+-------------------------+

| 87.65.93.58             | url:*                   | -                       |

+-------------------------+-------------------------+-------------------------+

31 row(s) in set. (0.58 sec)


LogParser.java

package tw.org.nchc.code;


import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Locale;

import java.util.StringTokenizer;

import java.util.regex.Matcher;

import java.util.regex.Pattern;




public class LogParser {

  private String ip;

  private String protocol;

  private String method;

  private String url;

  private String code;

  private String byteSize;

  private String referrer;

  private String agent;

  private long timestamp;


  private static Pattern p = Pattern

  .compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" +

                  " ([^ ]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\".*");

  public LogParser(String line) throws ParseException, Exception{

	 Matcher matcher = p.matcher(line);

	 if(matcher.matches()){

		 this.ip = matcher.group(1);

		 // IP address of the client requesting the web page.

		 if(isIpAddress(ip)){

			 SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z",Locale.US);

			 this.timestamp = sdf.parse(matcher.group(4)).getTime();

			 String[] http = matcher.group(5).split(" ");

			 this.method = http[0];

			 this.url = http[1];

			 this.protocol = http[2];

			 this.code = matcher.group(6);

			 this.byteSize = matcher.group(7);

			 this.referrer = matcher.group(8);

			 this.agent = matcher.group(9);

		 }

	 }



  }


  public static boolean isIpAddress(String inputString) {

    StringTokenizer tokenizer = new StringTokenizer(inputString, ".");

    if (tokenizer.countTokens() != 4) {

      return false;

    }

    try {

      for (int i = 0; i < 4; i++) {

        String t = tokenizer.nextToken();

        int chunk = Integer.parseInt(t);

        if ((chunk & 255) != chunk) {

          return false;

        }

      }

    } catch (NumberFormatException e) {

      return false;

    }

    if (inputString.indexOf("..") >= 0) {

      return false;

    }

    return true;

  }


  public String getIp() {

    return ip;

  }


  public String getProtocol() {

    return protocol;

  }


  public String getMethod() {

    return method;

  }


  public String getUrl() {

    return url;

  }


  public String getCode() {

    return code;

  }


  public String getByteSize() {

    return byteSize;

  }


  public String getReferrer() {

    return referrer;

  }


  public String getAgent() {

    return agent;

  }


  public long getTimestamp() {

    return timestamp;

  }


}

LogParserGo?.java

/**
 * Program: LogFetcher.java
 * Editor: Waue Chen 
 * From :  NCHC. Taiwn
 * Last Update Date: 07/02/2008
 */

package tw.org.nchc.code;


import java.io.IOException;


import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HBaseAdmin;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTable;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapred.ClusterStatus;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;



// import AccessLogParser

/**

 * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,

 * W3CExtended, IISw3cExtended)

 */

public class LogParserGo {

	static HBaseConfiguration conf = new HBaseConfiguration();



	public static final String TABLE = "table.name";



	static String tableName;



	static HTable table = null;



//	static boolean eclipseRun = false;

	static void print(String str){

		System.out.println("STR  = "+str);

	}

	public static class MapClass extends MapReduceBase implements

			Mapper<WritableComparable, Text, Text, Writable> {



		@Override

		public void configure(JobConf job) {

			tableName = job.get(TABLE, "");

		}



		public void map(WritableComparable key, Text value,

				OutputCollector<Text, Writable> output, Reporter reporter)

				throws IOException {

			

			try {

				/*

				print(value.toString());

				FileWriter out = new FileWriter(new File(

				"/home/waue/mr-result.txt"));

				out.write(value.toString());

				out.flush();

				out.close();

				*/

				LogParser log = new LogParser(value.toString());

				

				if (table == null)

					table = new HTable(conf, new Text(tableName));

				long lockId = table.startUpdate(new Text(log.getIp()));

				table.put(lockId, new Text("http:protocol"), log.getProtocol()

						.getBytes());

				table.put(lockId, new Text("http:method"), log.getMethod()

						.getBytes());

				table.put(lockId, new Text("http:code"), log.getCode()

						.getBytes());

				table.put(lockId, new Text("http:bytesize"), log.getByteSize()

						.getBytes());

				table.put(lockId, new Text("http:agent"), log.getAgent()

						.getBytes());

				table.put(lockId, new Text("url:" + log.getUrl()), log

						.getReferrer().getBytes());

				table.put(lockId, new Text("referrer:" + log.getReferrer()),

						log.getUrl().getBytes());

				table.commit(lockId, log.getTimestamp());

				

			} catch (Exception e) {

				e.printStackTrace();

			}

			

		}

	}



	// do it to resolve warning : FileSystem.listPaths

	static public Path[] listPaths(FileSystem fsm, Path path)

			throws IOException {

		FileStatus[] fss = fsm.listStatus(path);

		int length = fss.length;

		Path[] pi = new Path[length];

		for (int i = 0; i < length; i++) {

			pi[i] = fss[i].getPath();

		}

		return pi;

	}



	public static void runMapReduce(String table, String dir)

			throws IOException {

		Path tempDir = new Path("/tmp/Mylog/");

		Path InputDir = new Path(dir);

		FileSystem fs = FileSystem.get(conf);

		JobConf jobConf = new JobConf(conf, LogParserGo.class);

		jobConf.setJobName("apache log fetcher");

		jobConf.set(TABLE, table);

		Path[] in = listPaths(fs, InputDir);

		if (fs.isFile(InputDir)) {

			jobConf.setInputPath(InputDir);

		} else {

			for (int i = 0; i < in.length; i++) {

				if (fs.isFile(in[i])) {

					jobConf.addInputPath(in[i]);

				} else {

					Path[] sub = listPaths(fs, in[i]);

					for (int j = 0; j < sub.length; j++) {

						if (fs.isFile(sub[j])) {

							jobConf.addInputPath(sub[j]);

						}

					}

				}

			}

		}

		jobConf.setOutputPath(tempDir);



		jobConf.setMapperClass(MapClass.class);



		JobClient client = new JobClient(jobConf);

		ClusterStatus cluster = client.getClusterStatus();

		jobConf.setNumMapTasks(cluster.getMapTasks());

		jobConf.setNumReduceTasks(0);



		JobClient.runJob(jobConf);



		fs.delete(tempDir);

		fs.close();

	}



	public static void creatTable(String table) throws IOException {

		HBaseAdmin admin = new HBaseAdmin(conf);

		if (!admin.tableExists(new Text(table))) {

			System.out.println("1. " + table

					+ " table creating ... please wait");

			HTableDescriptor tableDesc = new HTableDescriptor(table);

			tableDesc.addFamily(new HColumnDescriptor("http:"));

			tableDesc.addFamily(new HColumnDescriptor("url:"));

			tableDesc.addFamily(new HColumnDescriptor("referrer:"));

			admin.createTable(tableDesc);

		} else {

			System.out.println("1. " + table + " table already exists.");

		}

		System.out.println("2. access_log files fetching using map/reduce");
	}
	public static void main(String[] args) throws IOException {
		String table_name = "apache-log2";
		String dir = "/user/waue/apache-log";
		creatTable(table_name);
		runMapReduce(table_name, dir);
	}
}