wiki:NCHCCloudCourse100928_MYSQL

Version 5 (modified by waue, 14 years ago) (diff)

--

Hadoop 與 RDBMS 的支援
Hadoop 0.20 + MySQL 5

說明

DROP TABLE IF EXISTS `school`.`teacher`;
CREATE TABLE  `school`.`teacher` (
  `id` int(11) default NULL,
  `name` char(20) default NULL,
  `age` int(11) default NULL,
  `departmentID` int(11) default NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
  • MySQL 內,先建立一個school 的資料庫,內含 teacher 的table ,並新增一些資料,如下:

程式碼

DBAccess.java

package db;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;
public class DBAccess {
  @SuppressWarnings("deprecation")
  public static void main(String[] args) throws IOException {
    
    try {
      
      JobConf conf = new JobConf(DBAccess.class);
        Class.forName("com.mysql.jdbc.Driver");
      DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
          "jdbc:mysql://localhost/school", "user", "xxxxxxxx");
      conf.setOutputKeyClass(LongWritable.class);
      conf.setOutputValueClass(Text.class);
      conf.setInputFormat(DBInputFormat.class);
      Path dstPath = new Path("dboutput3");
      FileOutputFormat.setOutputPath(conf, dstPath);
      String[] fields = { "id", "name", "age", "departmentID" };
      DBInputFormat.setInput(conf, TeacherRecord.class, "teacher", null,
          "id", fields);
      conf.setMapperClass(DBAccessMapper.class);
      conf.setReducerClass(IdentityReducer.class);
      
      FileSystem hdfs = dstPath.getFileSystem(conf);
      if (hdfs.exists(dstPath)) {
        hdfs.delete(dstPath, true);
      }
      JobClient.runJob(conf);
    }
    catch(ClassNotFoundException e) {
        System.err.println("mysql.jdbc.Driver not found");  
    } 
  }
}

DBAccessMapper.java

package db;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class DBAccessMapper extends MapReduceBase implements
    Mapper<LongWritable, TeacherRecord, LongWritable, Text> {
  public void map(LongWritable key, TeacherRecord value,
      OutputCollector<LongWritable, Text> collector, Reporter reporter)
      throws IOException {
    collector.collect(new LongWritable(value.id),
    new Text(value.toString()));
  }
}

TeacherRecord.java

package db;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
public class TeacherRecord implements Writable, DBWritable {
  int id;
  String name;
  int age;
  int departmentID;
  @Override
  public void readFields(DataInput in) throws IOException {
    // TODO Auto-generated method stub
    this.id = in.readInt();
    this.name = Text.readString(in);
    this.age = in.readInt();
    this.departmentID = in.readInt();
  }
  @Override
  public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
    out.writeInt(this.id);
    Text.writeString(out, this.name);
    out.writeInt(this.age);
    out.writeInt(this.departmentID);
  }
  @Override
  public void readFields(ResultSet result) throws SQLException {
    // TODO Auto-generated method stub
    this.id = result.getInt(1);
    this.name = result.getString(2);
    this.age = result.getInt(3);
    this.departmentID = result.getInt(4);
  }
  @Override
  public void write(PreparedStatement stmt) throws SQLException {
    // TODO Auto-generated method stub
    stmt.setInt(1, this.id);
    stmt.setString(2, this.name);
    stmt.setInt(3, this.age);
    stmt.setInt(4, this.departmentID);
  }
  @Override
  public String toString() {
    // TODO Auto-generated method stub
    return new String(this.name + " " + this.age + " " + this.departmentID);
  }
}

執行結果

$ /opt/hadoop/bin/hadoop dfs -cat dboutput/part-00000
0	waue 29 920
1	rock 30 1231
1	2 3 4

Attachments (1)

Download all attachments as: .zip