| Version 2 (modified by waue, 17 years ago) (diff) |
|---|
目的
This program will parse your apache log and store it into Hbase.
如何使用
- 1. Upload apache logs ( /var/log/apache2/access.log* ) to hdfs (default: /user/waue/apache-log) \
$ bin/hadoop dfs -put /var/log/apache2/ apache-log
- 2. parameter "dir" in main contains the logs.
- 3. you should filter the exception contents manually,
{{{ ex: ::1 - - [29/Jun/2008:07:35:15 +0800] "GET / HTTP/1.0" 200 729 "...
}}}
}}}
結果
1 執行以下指令
hql > select * from apache-log;2 結果
+-------------------------+-------------------------+-------------------------+ | Row | Column | Cell | +-------------------------+-------------------------+-------------------------+ | 118.170.101.250 | http:agent | Mozilla/4.0 (compatible;| | | | MSIE 4.01; Windows 95) | +-------------------------+-------------------------+-------------------------+ | 118.170.101.250 | http:bytesize | 318 | +-------------------------+-------------------------+-------------------------+ ..........(skip)........ +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | http:method | OPTIONS | +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | http:protocol | HTTP/1.1 | +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | referrer:- | * | +-------------------------+-------------------------+-------------------------+ | 87.65.93.58 | url:* | - | +-------------------------+-------------------------+-------------------------+ 31 row(s) in set. (0.58 sec)
LogParser.java
package tw.org.nchc.code;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class LogParser {
private String ip;
private String protocol;
private String method;
private String url;
private String code;
private String byteSize;
private String referrer;
private String agent;
private long timestamp;
private static Pattern p = Pattern
.compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\"" +
" ([^ ]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\".*");
public LogParser(String line) throws ParseException, Exception{
Matcher matcher = p.matcher(line);
if(matcher.matches()){
this.ip = matcher.group(1);
// IP address of the client requesting the web page.
if(isIpAddress(ip)){
SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z",Locale.US);
this.timestamp = sdf.parse(matcher.group(4)).getTime();
String[] http = matcher.group(5).split(" ");
this.method = http[0];
this.url = http[1];
this.protocol = http[2];
this.code = matcher.group(6);
this.byteSize = matcher.group(7);
this.referrer = matcher.group(8);
this.agent = matcher.group(9);
}
}
}
public static boolean isIpAddress(String inputString) {
StringTokenizer tokenizer = new StringTokenizer(inputString, ".");
if (tokenizer.countTokens() != 4) {
return false;
}
try {
for (int i = 0; i < 4; i++) {
String t = tokenizer.nextToken();
int chunk = Integer.parseInt(t);
if ((chunk & 255) != chunk) {
return false;
}
}
} catch (NumberFormatException e) {
return false;
}
if (inputString.indexOf("..") >= 0) {
return false;
}
return true;
}
public String getIp() {
return ip;
}
public String getProtocol() {
return protocol;
}
public String getMethod() {
return method;
}
public String getUrl() {
return url;
}
public String getCode() {
return code;
}
public String getByteSize() {
return byteSize;
}
public String getReferrer() {
return referrer;
}
public String getAgent() {
return agent;
}
public long getTimestamp() {
return timestamp;
}
}
LogParserGo?.java
/**
* Program: LogFetcher.java
* Editor: Waue Chen
* From : NCHC. Taiwn
* Last Update Date: 07/02/2008
*/
package tw.org.nchc.code;
import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseAdmin;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
// import AccessLogParser
/**
* Access_log fetcher. TODO: FgnStatLog, Error_log, Access_log (Default,
* W3CExtended, IISw3cExtended)
*/
public class LogParserGo {
static HBaseConfiguration conf = new HBaseConfiguration();
public static final String TABLE = "table.name";
static String tableName;
static HTable table = null;
// static boolean eclipseRun = false;
static void print(String str){
System.out.println("STR = "+str);
}
public static class MapClass extends MapReduceBase implements
Mapper<WritableComparable, Text, Text, Writable> {
@Override
public void configure(JobConf job) {
tableName = job.get(TABLE, "");
}
public void map(WritableComparable key, Text value,
OutputCollector<Text, Writable> output, Reporter reporter)
throws IOException {
try {
/*
print(value.toString());
FileWriter out = new FileWriter(new File(
"/home/waue/mr-result.txt"));
out.write(value.toString());
out.flush();
out.close();
*/
LogParser log = new LogParser(value.toString());
if (table == null)
table = new HTable(conf, new Text(tableName));
long lockId = table.startUpdate(new Text(log.getIp()));
table.put(lockId, new Text("http:protocol"), log.getProtocol()
.getBytes());
table.put(lockId, new Text("http:method"), log.getMethod()
.getBytes());
table.put(lockId, new Text("http:code"), log.getCode()
.getBytes());
table.put(lockId, new Text("http:bytesize"), log.getByteSize()
.getBytes());
table.put(lockId, new Text("http:agent"), log.getAgent()
.getBytes());
table.put(lockId, new Text("url:" + log.getUrl()), log
.getReferrer().getBytes());
table.put(lockId, new Text("referrer:" + log.getReferrer()),
log.getUrl().getBytes());
table.commit(lockId, log.getTimestamp());
} catch (Exception e) {
e.printStackTrace();
}
}
}
// do it to resolve warning : FileSystem.listPaths
static public Path[] listPaths(FileSystem fsm, Path path)
throws IOException {
FileStatus[] fss = fsm.listStatus(path);
int length = fss.length;
Path[] pi = new Path[length];
for (int i = 0; i < length; i++) {
pi[i] = fss[i].getPath();
}
return pi;
}
public static void runMapReduce(String table, String dir)
throws IOException {
Path tempDir = new Path("/tmp/Mylog/");
Path InputDir = new Path(dir);
FileSystem fs = FileSystem.get(conf);
JobConf jobConf = new JobConf(conf, LogParserGo.class);
jobConf.setJobName("apache log fetcher");
jobConf.set(TABLE, table);
Path[] in = listPaths(fs, InputDir);
if (fs.isFile(InputDir)) {
jobConf.setInputPath(InputDir);
} else {
for (int i = 0; i < in.length; i++) {
if (fs.isFile(in[i])) {
jobConf.addInputPath(in[i]);
} else {
Path[] sub = listPaths(fs, in[i]);
for (int j = 0; j < sub.length; j++) {
if (fs.isFile(sub[j])) {
jobConf.addInputPath(sub[j]);
}
}
}
}
}
jobConf.setOutputPath(tempDir);
jobConf.setMapperClass(MapClass.class);
JobClient client = new JobClient(jobConf);
ClusterStatus cluster = client.getClusterStatus();
jobConf.setNumMapTasks(cluster.getMapTasks());
jobConf.setNumReduceTasks(0);
JobClient.runJob(jobConf);
fs.delete(tempDir);
fs.close();
}
public static void creatTable(String table) throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
if (!admin.tableExists(new Text(table))) {
System.out.println("1. " + table
+ " table creating ... please wait");
HTableDescriptor tableDesc = new HTableDescriptor(table);
tableDesc.addFamily(new HColumnDescriptor("http:"));
tableDesc.addFamily(new HColumnDescriptor("url:"));
tableDesc.addFamily(new HColumnDescriptor("referrer:"));
admin.createTable(tableDesc);
} else {
System.out.println("1. " + table + " table already exists.");
}
System.out.println("2. access_log files fetching using map/reduce");
}
public static void main(String[] args) throws IOException {
String table_name = "apache-log2";
String dir = "/user/waue/apache-log";
creatTable(table_name);
runMapReduce(table_name, dir);
}
}
