MAPREDUCE-6237. Multiple mappers with DBInputFormat don't work because of reusing conections. Contributed by Kannan Rajah.

This commit is contained in:
Tsuyoshi Ozawa 2015-02-10 03:52:42 +09:00
parent 7e42088abf
commit 241336ca2b
4 changed files with 37 additions and 16 deletions

View File

@ -346,6 +346,21 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6233. org.apache.hadoop.mapreduce.TestLargeSort.testLargeSort MAPREDUCE-6233. org.apache.hadoop.mapreduce.TestLargeSort.testLargeSort
failed in trunk (zxu via rkanter) failed in trunk (zxu via rkanter)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
MAPREDUCE-6237. Multiple mappers with DBInputFormat don't work because of
reusing conections. (Kannan Rajah via ozawa)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -159,7 +159,7 @@ public void setConf(Configuration conf) {
dbConf = new DBConfiguration(conf); dbConf = new DBConfiguration(conf);
try { try {
getConnection(); this.connection = createConnection();
DatabaseMetaData dbMeta = connection.getMetaData(); DatabaseMetaData dbMeta = connection.getMetaData();
this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase(); this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
@ -182,18 +182,25 @@ public DBConfiguration getDBConf() {
} }
public Connection getConnection() { public Connection getConnection() {
// TODO Remove this code that handles backward compatibility.
if (this.connection == null) {
this.connection = createConnection();
}
return this.connection;
}
public Connection createConnection() {
try { try {
if (null == this.connection) { Connection newConnection = dbConf.getConnection();
// The connection was closed; reinstantiate it. newConnection.setAutoCommit(false);
this.connection = dbConf.getConnection(); newConnection.setTransactionIsolation(
this.connection.setAutoCommit(false); Connection.TRANSACTION_SERIALIZABLE);
this.connection.setTransactionIsolation(
Connection.TRANSACTION_SERIALIZABLE); return newConnection;
}
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return connection;
} }
public String getDBProductName() { public String getDBProductName() {
@ -210,17 +217,17 @@ protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
if (dbProductName.startsWith("ORACLE")) { if (dbProductName.startsWith("ORACLE")) {
// use Oracle-specific db reader. // use Oracle-specific db reader.
return new OracleDBRecordReader<T>(split, inputClass, return new OracleDBRecordReader<T>(split, inputClass,
conf, getConnection(), getDBConf(), conditions, fieldNames, conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName); tableName);
} else if (dbProductName.startsWith("MYSQL")) { } else if (dbProductName.startsWith("MYSQL")) {
// use MySQL-specific db reader. // use MySQL-specific db reader.
return new MySQLDBRecordReader<T>(split, inputClass, return new MySQLDBRecordReader<T>(split, inputClass,
conf, getConnection(), getDBConf(), conditions, fieldNames, conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName); tableName);
} else { } else {
// Generic reader. // Generic reader.
return new DBRecordReader<T>(split, inputClass, return new DBRecordReader<T>(split, inputClass,
conf, getConnection(), getDBConf(), conditions, fieldNames, conf, createConnection(), getDBConf(), conditions, fieldNames,
tableName); tableName);
} }
} catch (SQLException ex) { } catch (SQLException ex) {

View File

@ -178,7 +178,6 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
ResultSet results = null; ResultSet results = null;
Statement statement = null; Statement statement = null;
Connection connection = getConnection();
try { try {
statement = connection.createStatement(); statement = connection.createStatement();
@ -289,12 +288,12 @@ protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
if (dbProductName.startsWith("MYSQL")) { if (dbProductName.startsWith("MYSQL")) {
// use MySQL-specific db reader. // use MySQL-specific db reader.
return new MySQLDataDrivenDBRecordReader<T>(split, inputClass, return new MySQLDataDrivenDBRecordReader<T>(split, inputClass,
conf, getConnection(), dbConf, dbConf.getInputConditions(), conf, createConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName()); dbConf.getInputFieldNames(), dbConf.getInputTableName());
} else { } else {
// Generic reader. // Generic reader.
return new DataDrivenDBRecordReader<T>(split, inputClass, return new DataDrivenDBRecordReader<T>(split, inputClass,
conf, getConnection(), dbConf, dbConf.getInputConditions(), conf, createConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName(), dbConf.getInputFieldNames(), dbConf.getInputTableName(),
dbProductName); dbProductName);
} }

View File

@ -84,7 +84,7 @@ protected RecordReader<LongWritable, T> createDBRecordReader(DBInputSplit split,
try { try {
// Use Oracle-specific db reader // Use Oracle-specific db reader
return new OracleDataDrivenDBRecordReader<T>(split, inputClass, return new OracleDataDrivenDBRecordReader<T>(split, inputClass,
conf, getConnection(), dbConf, dbConf.getInputConditions(), conf, createConnection(), dbConf, dbConf.getInputConditions(),
dbConf.getInputFieldNames(), dbConf.getInputTableName()); dbConf.getInputFieldNames(), dbConf.getInputTableName());
} catch (SQLException ex) { } catch (SQLException ex) {
throw new IOException(ex.getMessage()); throw new IOException(ex.getMessage());