wiki:LogParser

Version 16 (modified by waue, 16 years ago) (diff)

--

目的

This program will parse your apache log and store it into Hbase.

如何使用

1 Upload apache logs ( /var/log/apache2/access.log* ) to hdfs (default: /user/waue/apache-log)

$ bin/hadoop dfs -put /var/log/apache2/ apache-log

2 parameter "dir" in main contains the logs. 3 you should filter the exception contents manually,

ex:  ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...

結果

1 執行以下指令

	hql > select * from apache-log;

2 結果

+-------------------------+-------------------------+-------------------------+
| Row                     | Column                  | Cell                    |
+-------------------------+-------------------------+-------------------------+
| 118.170.101.250         | http:agent              | Mozilla/4.0 (compatible;|
|                         |                         |  MSIE 4.01; Windows 95) |
..........(skip)........
+-------------------------+-------------------------+-------------------------+
| 87.65.93.58             | http:method             | OPTIONS                 |
+-------------------------+-------------------------+-------------------------+
| 87.65.93.58             | http:protocol           | HTTP/1.1                |
31 row(s) in set. (0.58 sec)

LogParserGo.java

public class LogParserGo {
	static HBaseConfiguration conf = new HBaseConfiguration();
	public static final String TABLE = "table.name";
	static String tableName;
	static HTable table = null;
	public static class MapClass;
	static public Path[] listPaths(FileSystem fsm, Path path);
	public static void runMapReduce(String table, String dir);
	public static void creatTable(String table) ;
	public static void main(String[] args) ;

LogParserGo共宣告了以下幾個全域變數及方法:

1 HBaseConfiguration conf為重要的控制設定參數,其定義了很多方法可以設定或取得map reduce程式運作所需要的值 2 定義 TABLE 為 "table.name",table.name為 name property 3 string tableName 為資料表名稱 4 Htable table 在定義一個HBase的操作變數 5 class MapClass? 為實做map的一個內部類別 6 Path[] listPaths 是個可以列出指定路徑下的檔案和目錄,原本0.16 api即宣告 Deprecated,因此為了解決warning在此實做 7 void runMapReduce(String table, String dir) 跑MapReduce的程序 8 void creatTable(String table) 建立hbase的資料表 9 void main(String[] args) main 函數

1~4為變數較為單純,之後將說明5~9的函數功能


	public static class MapClass extends MapReduceBase implements
			Mapper<WritableComparable, Text, Text, Writable> {
		public void configure(JobConf job) {
			tableName = job.get(TABLE, "");
		}
		public void map(WritableComparable key, Text value,
				OutputCollector<Text, Writable> output, Reporter reporter)
				throws IOException {
			try {
				LogParser log = new LogParser(value.toString());
				if (table == null)
					table = new HTable(conf, new Text(tableName));
				long lockId = table.startUpdate(new Text(log.getIp()));
				table.put(lockId, new Text("http:protocol"), log.getProtocol()
						.getBytes());
				table.put(lockId, new Text("http:method"), log.getMethod()
						.getBytes());
				table.put(lockId, new Text("http:code"), log.getCode()
						.getBytes());
				table.put(lockId, new Text("http:bytesize"), log.getByteSize()
						.getBytes());
				table.put(lockId, new Text("http:agent"), log.getAgent()
						.getBytes());
				table.put(lockId, new Text("url:" + log.getUrl()), log
						.getReferrer().getBytes());
				table.put(lockId, new Text("referrer:" + log.getReferrer()),
						log.getUrl().getBytes());
				table.commit(lockId, log.getTimestamp());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

此內部類別繼承了 org.apache.hadoop.mapred.MapReduceBase ,並實做Mapper<WritableComparable?, Text, Text, Writable> 介面, 不見得所有map reduce程式都需要實做此介面,但若有要讓map能分配工作就需要寫在下面此函數中:
map(WritableComparable? key, Text value, OutputCollector?<Text, Writable> output, Reporter reporter)
變數key為hbase中的row key,value則為值,output 可以透過collect() 功能將值寫入hbase的table中。但在此範例中, 並沒有用到 output的寫入方式,reporter也沒有用到。
此方法因為有IO的存取,因此要宣告trows IOException, 且用try來起始。
首先LogParser log = new LogParser(value.toString()); value的值為要parser的內容的某一行,因為基於hdfs的map-reduce架構上,hadoop會幫我們把資料整合起來,因此程式的邏輯只要處理好這一行即可。LogParser 在下面會介紹到,目前只要知道log物件是原始資料value透過 LogParser 處理過的產物。透過log物件的方法getIP,getProtocol(),...等,我們可以輕易取得需要的資料,用table.put( Row_Key , Column_Qualify_Name , Value) 方法將Value值填入Row_Key中的Column_Qualify_Name欄位中。接著研究table物件。
table是全域變數之一,由 org.apache.hadoop.hbase.HTable 類別定義。產生出HTable物件必定要給兩個初始化的值,一個是另一個全域變數也是重要的設定檔conf,另一個是tableName也就是資料表的名稱

configure(jobConf conf) 此為override org.apache.hadoop.mapred.MapReduceBase?.configure(JobConf? ) 內容只是取得並回傳Table的名字而已


	static public Path[] listPaths(FileSystem fsm, Path path)
			throws IOException {
		FileStatus[] fss = fsm.listStatus(path);
		int length = fss.length;
		Path[] pi = new Path[length];
		for (int i = 0; i < length; i++) {
			pi[i] = fss[i].getPath();
		}
		return pi;
	}
	public static void runMapReduce(String table, String dir)
			throws IOException {
		Path tempDir = new Path("/tmp/Mylog/");
		Path InputDir = new Path(dir);
		FileSystem fs = FileSystem.get(conf);
		JobConf jobConf = new JobConf(conf, LogParserGo.class);
		jobConf.setJobName("apache log fetcher");
		jobConf.set(TABLE, table);
		Path[] in = listPaths(fs, InputDir);
		if (fs.isFile(InputDir)) {
			jobConf.setInputPath(InputDir);
		} else {
			for (int i = 0; i < in.length; i++) {
				if (fs.isFile(in[i])) {
					jobConf.addInputPath(in[i]);
				} else {
					Path[] sub = listPaths(fs, in[i]);
					for (int j = 0; j < sub.length; j++) {
						if (fs.isFile(sub[j])) {
							jobConf.addInputPath(sub[j]);
						}
					}
				}
			}
		}
		jobConf.setOutputPath(tempDir);
		jobConf.setMapperClass(MapClass.class);
		JobClient client = new JobClient(jobConf);
		ClusterStatus cluster = client.getClusterStatus();
		jobConf.setNumMapTasks(cluster.getMapTasks());
		jobConf.setNumReduceTasks(0);
		JobClient.runJob(jobConf);
		fs.delete(tempDir);
		fs.close();
	}
	public static void creatTable(String table) throws IOException {
		HBaseAdmin admin = new HBaseAdmin(conf);
		if (!admin.tableExists(new Text(table))) {
			System.out.println("1. " + table
					+ " table creating ... please wait");
			HTableDescriptor tableDesc = new HTableDescriptor(table);
			tableDesc.addFamily(new HColumnDescriptor("http:"));
			tableDesc.addFamily(new HColumnDescriptor("url:"));
			tableDesc.addFamily(new HColumnDescriptor("referrer:"));
			admin.createTable(tableDesc);
		} else {
			System.out.println("1. " + table + " table already exists.");
		}
		System.out.println("2. access_log files fetching using map/reduce");
	}
	public static void main(String[] args) throws IOException {
		String table_name = "apache-log2";
		String dir = "/user/waue/apache-log";
		creatTable(table_name);
		runMapReduce(table_name, dir);
	}
}

LogParser.java

這個java檔的任務是分析log檔案中的每行資訊

  private String ip;
  private String protocol;
  private String method;
  private String url;
  private String code;
  private String byteSize;
  private String referrer;
  private String agent;
  private long timestamp;
  private static Pattern p = Pattern
  .compile("([︿ ]*) ([︿ ]*) ([︿ ]*) \\[([︿]]*)\\] \"([︿\"]*)\"" +
                  " ([︿ ]*) ([︿ ]*) \"([︿\"]*)\" \"([︿\"]*)\".*");

首先先宣告產生一個物件 java.util.regex.Pattern 這個類別沒有建構子,因此宣告出來之後用compile(String regex)敘述來建立滿足正規表示式的物件,功能說明:

Compiles the given regular expression into a pattern.

將正規表示式的字串當引數輸入之後,就可以得到一個p的Pattern物件,而此正規表示式:
([︿ ]*) ([︿ ]*) ([︿ ]*)
[([︿]]*)
] \"([︿\"]*)\" ([︿ ]*) ([︿ ]*) \"([︿\"]*)\" \"([︿\"]*)\".*

若apache log範例為:
140.110.138.176 - - [02/Jul/2008:16:55:02 +0800] "GET /hbase-0.1.3.zip HTTP/1.0" 200 10249801 "-" "Wget/1.10.2"
則此正規表示法可看成

([︿ ]*) ([︿ ]*) ([︿ ]*)
[([︿]]*)
]
\"([︿\"]*)\" ([︿ ]*) ([︿ ]*) \"([︿\"]*)\" \"([︿\"]*)\".*
ip - - 時間 "http " 回傳碼 長度 "指引" "代理器"
140.110.138.176 - - [02/Jul/2008:16:55:02 +0800] "GET /hbase-0.1.3.zip HTTP/1.0" 200 10249801 " -" "Wget/1.10.2"

在此可以把Pattern 當成是一個雛型類別,用compiler(表示式) 則告知了 以"表示式"為規則產生一個p的模板出來


  public LogParser(String line) throws ParseException, Exception{ 
	 Matcher matcher = p.matcher(line);
	 if(matcher.matches()){
		 this.ip = matcher.group(1);
		 // IP address of the client requesting the web page.
		 if(isIpAddress(ip)){
			 SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z",Locale.US);
			 this.timestamp = sdf.parse(matcher.group(4)).getTime();
			 String[] http = matcher.group(5).split(" ");
			 this.method = http[0];
			 this.url = http[1];
			 this.protocol = http[2];
			 this.code = matcher.group(6);
			 this.byteSize = matcher.group(7);
			 this.referrer = matcher.group(8);
			 this.agent = matcher.group(9);
		 }
	 }
  }

接著定義建構子,宣告了一個 java.util.regex.Matcher 此物件可以用來與之前的 Pattern搭配。
剛剛宣告的模板p有個函數 matcher(String) ,此功能會將材料(String敘述 )壓印成模板的形狀,並把這個壓出物件叫做matcher。 之後要取用matcher的第n段,只要用matcher.group(n)就可以把第n段的內容以String的形式取回。
回頭對照傳近來的內容

1 2 3 4 5 6 7 8 9
ip - - 時間 "http " 回傳碼 長度 "指引" "代理器"
140.110.138.176 - - [02/Jul/2008:16:55:02 +0800] "GET /hbase-0.1.3.zip HTTP/1.0" 200 10249801 " -" "Wget/1.10.2"

之後就很顯而易見,用matcher.group(n)取得值後,一一的用this.參數來作設定,但其實不用this 編譯依然能過關,只是習慣在建構子內用到該class的參數會這麼用(以跟繼承到父類別的參數作區別?)其中時間需要用SimpleDateFormat小轉譯一下,http的內容需要用split()來作更細部的分解。

  public static boolean isIpAddress(String inputString) {
    StringTokenizer tokenizer = new StringTokenizer(inputString, ".");
    if (tokenizer.countTokens() != 4) {
      return false;
    }
    try {
      for (int i = 0; i < 4; i++) {
        String t = tokenizer.nextToken();
        int chunk = Integer.parseInt(t);
        if ((chunk & 255) != chunk) {
          return false;
        }
      }
    } catch (NumberFormatException e) {
      return false;
    }
    if (inputString.indexOf("..") >= 0) {
      return false;
    }
    return true;
  }
}

此函數用來檢查IP的格式是否正確而已