Hadoop 與 RDBMS 的支援
Hadoop 0.20 + MySQL 5
說明
- 需先安裝 hadoop 0.20 , apache2 , MySQL 5 server & client , phpmyadmin
- 需將 mysql-connector-java-*.jar 放到 lib 目錄下
- MySQL 內,先建立一個school 的資料庫,內含 teacher 的table ,並新增一些資料,如下:
DROP TABLE IF EXISTS `school`.`teacher`; CREATE TABLE `school`.`teacher` ( `id` int(11) default NULL, `name` char(20) default NULL, `age` int(11) default NULL, `departmentID` int(11) default NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
程式碼
DBAccess.java
package db; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; import org.apache.hadoop.mapred.lib.db.DBWritable; public class DBAccess { @SuppressWarnings("deprecation") public static void main(String[] args) throws IOException { String[] argc={"jdbc:mysql://localhost/school","root", "itri"}; argv=argc; try { JobConf conf = new JobConf(DBAccess.class); Class.forName("com.mysql.jdbc.Driver"); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", argv[0], argv[1], argv[2]); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); conf.setInputFormat(DBInputFormat.class); Path dstPath = new Path("dboutput3"); FileOutputFormat.setOutputPath(conf, dstPath); String[] fields = { "id", "name", "age", "departmentID" }; DBInputFormat.setInput(conf, TeacherRecord.class, "teacher", null, "id", fields); conf.setMapperClass(DBAccessMapper.class); conf.setReducerClass(IdentityReducer.class); FileSystem hdfs = dstPath.getFileSystem(conf); if (hdfs.exists(dstPath)) { hdfs.delete(dstPath, true); } JobClient.runJob(conf); } catch(ClassNotFoundException e) { System.err.println("mysql.jdbc.Driver not found"); } } }
DBAccessMapper.java
package db; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; public class DBAccessMapper extends MapReduceBase implements Mapper<LongWritable, TeacherRecord, LongWritable, Text> { public void map(LongWritable key, TeacherRecord value, OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException { collector.collect(new LongWritable(value.id), new Text(value.toString())); } }
TeacherRecord.java
package db; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; public class TeacherRecord implements Writable, DBWritable { int id; String name; int age; int departmentID; @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.id = in.readInt(); this.name = Text.readString(in); this.age = in.readInt(); this.departmentID = in.readInt(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeInt(this.id); Text.writeString(out, this.name); out.writeInt(this.age); out.writeInt(this.departmentID); } @Override public void readFields(ResultSet result) throws SQLException { // TODO Auto-generated method stub this.id = result.getInt(1); this.name = result.getString(2); this.age = result.getInt(3); this.departmentID = result.getInt(4); } @Override public void write(PreparedStatement stmt) throws SQLException { // TODO Auto-generated method stub stmt.setInt(1, this.id); stmt.setString(2, this.name); stmt.setInt(3, this.age); stmt.setInt(4, this.departmentID); } @Override public String toString() { // TODO Auto-generated method stub return new String(this.name + " " + this.age + " " + this.departmentID); } }
執行結果
$ /opt/hadoop/bin/hadoop dfs -cat dboutput/part-00000 0 waue 29 920 1 rock 30 1231 1 2 3 4
Last modified 14 years ago
Last modified on Apr 25, 2011, 2:09:50 PM
Attachments (1)
- pd1.png (30.2 KB) - added by waue 14 years ago.
Download all attachments as: .zip