= 目的 = This program will parse your apache log and store it into Hbase. = 如何使用 = * 1. Upload apache logs ( /var/log/apache2/access.log* ) to hdfs (default: /user/waue/apache-log) \ {{{ $ bin/hadoop dfs -put /var/log/apache2/ apache-log }}} * 2. parameter "dir" in main contains the logs. * 3. you should filter the exception contents manually, {{{ ex: ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "... }}} }}} = 結果 = 1 執行以下指令 {{{ hql > select * from apache-log; }}} 2 結果 {{{ +-------------------------+-------------------------+-------------------------+ | Row | Column | Cell | +-------------------------+-------------------------+-------------------------+ | 118.170.101.250 | http:agent | Mozilla/4.0 (compatible;| | | | MSIE 4.01; Windows 95) | +-------------------------+-------------------------+-------------------------+ | 118.170.101.250 | http:bytesize | 318 | +-------------------------+-------------------------+-------------------------+ ..........(skip)........ +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | http:method | OPTIONS | +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | http:protocol | HTTP/1.1 | +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | referrer:- | * | +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | url:* | - | +-------------------------+-------------------------+-------------------------+ 31 row(s) in set. (0.58 sec) }}} = LogParser.java = {{{ package tw.org.nchc.code; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Locale; import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; public class LogParser { private String ip; private String protocol; private String method; private String url; private String code; private String byteSize; private String referrer; private String agent; private long timestamp; private static Pattern p = Pattern .compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" + " ([^ ]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\".*"); public LogParser(String line) throws ParseException, Exception{ Matcher matcher = p.matcher(line); if(matcher.matches()){ this.ip = matcher.group(1); // IP address of the client requesting the web page. if(isIpAddress(ip)){ SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z",Locale.US); this.timestamp = sdf.parse(matcher.group(4)).getTime(); String[] http = matcher.group(5).split(" "); this.method = http[0]; this.url = http[1]; this.protocol = http[2]; this.code = matcher.group(6); this.byteSize = matcher.group(7); this.referrer = matcher.group(8); this.agent = matcher.group(9); } } } public static boolean isIpAddress(String inputString) { StringTokenizer tokenizer = new StringTokenizer(inputString, "."); if (tokenizer.countTokens() != 4) { return false; } try { for (int i = 0; i < 4; i++) { String t = tokenizer.nextToken(); int chunk = Integer.parseInt(t); if ((chunk & 255) != chunk) { return false; } } } catch (NumberFormatException e) { return false; } if (inputString.indexOf("..") >= 0) { return false; } return true; } public String getIp() { return ip; } public String getProtocol() { return protocol; } public String getMethod() { return method; } public String getUrl() { return url; } public String getCode() { return code; } public String getByteSize() { return byteSize; } public String getReferrer() { return referrer; } public String getAgent() { return agent; } public long getTimestamp() { return timestamp; } } }}} = LogParserGo.java = {{{ /** * Program: LogFetcher.java * Editor: Waue Chen * From : NCHC. Taiwn * Last Update Date: 07/02/2008 */ package tw.org.nchc.code; import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseAdmin; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTable; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; // import AccessLogParser /** * Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default, * W3CExtended, IISw3cExtended) */ public class LogParserGo { static HBaseConfiguration conf = new HBaseConfiguration(); public static final String TABLE = "table.name"; static String tableName; static HTable table = null; // static boolean eclipseRun = false; static void print(String str){ System.out.println("STR = "+str); } public static class MapClass extends MapReduceBase implements Mapper { @Override public void configure(JobConf job) { tableName = job.get(TABLE, ""); } public void map(WritableComparable key, Text value, OutputCollector output, Reporter reporter) throws IOException { try { /* print(value.toString()); FileWriter out = new FileWriter(new File( "/home/waue/mr-result.txt")); out.write(value.toString()); out.flush(); out.close(); */ LogParser log = new LogParser(value.toString()); if (table == null) table = new HTable(conf, new Text(tableName)); long lockId = table.startUpdate(new Text(log.getIp())); table.put(lockId, new Text("http:protocol"), log.getProtocol() .getBytes()); table.put(lockId, new Text("http:method"), log.getMethod() .getBytes()); table.put(lockId, new Text("http:code"), log.getCode() .getBytes()); table.put(lockId, new Text("http:bytesize"), log.getByteSize() .getBytes()); table.put(lockId, new Text("http:agent"), log.getAgent() .getBytes()); table.put(lockId, new Text("url:" + log.getUrl()), log .getReferrer().getBytes()); table.put(lockId, new Text("referrer:" + log.getReferrer()), log.getUrl().getBytes()); table.commit(lockId, log.getTimestamp()); } catch (Exception e) { e.printStackTrace(); } } } // do it to resolve warning : FileSystem.listPaths static public Path[] listPaths(FileSystem fsm, Path path) throws IOException { FileStatus[] fss = fsm.listStatus(path); int length = fss.length; Path[] pi = new Path[length]; for (int i = 0; i < length; i++) { pi[i] = fss[i].getPath(); } return pi; } public static void runMapReduce(String table, String dir) throws IOException { Path tempDir = new Path("/tmp/Mylog/"); Path InputDir = new Path(dir); FileSystem fs = FileSystem.get(conf); JobConf jobConf = new JobConf(conf, LogParserGo.class); jobConf.setJobName("apache log fetcher"); jobConf.set(TABLE, table); Path[] in = listPaths(fs, InputDir); if (fs.isFile(InputDir)) { jobConf.setInputPath(InputDir); } else { for (int i = 0; i < in.length; i++) { if (fs.isFile(in[i])) { jobConf.addInputPath(in[i]); } else { Path[] sub = listPaths(fs, in[i]); for (int j = 0; j < sub.length; j++) { if (fs.isFile(sub[j])) { jobConf.addInputPath(sub[j]); } } } } } jobConf.setOutputPath(tempDir); jobConf.setMapperClass(MapClass.class); JobClient client = new JobClient(jobConf); ClusterStatus cluster = client.getClusterStatus(); jobConf.setNumMapTasks(cluster.getMapTasks()); jobConf.setNumReduceTasks(0); JobClient.runJob(jobConf); fs.delete(tempDir); fs.close(); } public static void creatTable(String table) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); if (!admin.tableExists(new Text(table))) { System.out.println("1. " + table + " table creating ... please wait"); HTableDescriptor tableDesc = new HTableDescriptor(table); tableDesc.addFamily(new HColumnDescriptor("http:")); tableDesc.addFamily(new HColumnDescriptor("url:")); tableDesc.addFamily(new HColumnDescriptor("referrer:")); admin.createTable(tableDesc); } else { System.out.println("1. " + table + " table already exists."); } System.out.println("2. access_log files fetching using map/reduce"); } public static void main(String[] args) throws IOException { String table_name = "apache-log2"; String dir = "/user/waue/apache-log"; creatTable(table_name); runMapReduce(table_name, dir); } } }}}