From 241336ca2b7cf97d7e0bd84dbe0542b72f304dc9 Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ozawa Date: Tue, 10 Feb 2015 03:52:42 +0900 Subject: [PATCH] MAPREDUCE-6237. Multiple mappers with DBInputFormat don't work because of reusing conections. Contributed by Kannan Rajah. --- hadoop-mapreduce-project/CHANGES.txt | 15 +++++++++ .../mapreduce/lib/db/DBInputFormat.java | 31 ++++++++++++------- .../lib/db/DataDrivenDBInputFormat.java | 5 ++- .../lib/db/OracleDataDrivenDBInputFormat.java | 2 +- 4 files changed, 37 insertions(+), 16 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 583c6c166c..c71fee8097 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -346,6 +346,21 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-6233. org.apache.hadoop.mapreduce.TestLargeSort.testLargeSort failed in trunk (zxu via rkanter) +Release 2.6.1 - UNRELEASED + + INCOMPATIBLE CHANGES + + NEW FEATURES + + IMPROVEMENTS + + OPTIMIZATIONS + + BUG FIXES + + MAPREDUCE-6237. Multiple mappers with DBInputFormat don't work because of + reusing conections. (Kannan Rajah via ozawa) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java index c0530c253a..00fbeda09a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java @@ -159,7 +159,7 @@ public void setConf(Configuration conf) { dbConf = new DBConfiguration(conf); try { - getConnection(); + this.connection = createConnection(); DatabaseMetaData dbMeta = connection.getMetaData(); this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase(); @@ -182,18 +182,25 @@ public DBConfiguration getDBConf() { } public Connection getConnection() { + // TODO Remove this code that handles backward compatibility. + if (this.connection == null) { + this.connection = createConnection(); + } + + return this.connection; + } + + public Connection createConnection() { try { - if (null == this.connection) { - // The connection was closed; reinstantiate it. - this.connection = dbConf.getConnection(); - this.connection.setAutoCommit(false); - this.connection.setTransactionIsolation( - Connection.TRANSACTION_SERIALIZABLE); - } + Connection newConnection = dbConf.getConnection(); + newConnection.setAutoCommit(false); + newConnection.setTransactionIsolation( + Connection.TRANSACTION_SERIALIZABLE); + + return newConnection; } catch (Exception e) { throw new RuntimeException(e); } - return connection; } public String getDBProductName() { @@ -210,17 +217,17 @@ protected RecordReader createDBRecordReader(DBInputSplit split, if (dbProductName.startsWith("ORACLE")) { // use Oracle-specific db reader. return new OracleDBRecordReader(split, inputClass, - conf, getConnection(), getDBConf(), conditions, fieldNames, + conf, createConnection(), getDBConf(), conditions, fieldNames, tableName); } else if (dbProductName.startsWith("MYSQL")) { // use MySQL-specific db reader. return new MySQLDBRecordReader(split, inputClass, - conf, getConnection(), getDBConf(), conditions, fieldNames, + conf, createConnection(), getDBConf(), conditions, fieldNames, tableName); } else { // Generic reader. return new DBRecordReader(split, inputClass, - conf, getConnection(), getDBConf(), conditions, fieldNames, + conf, createConnection(), getDBConf(), conditions, fieldNames, tableName); } } catch (SQLException ex) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java index 131b7bbbce..753c880587 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DataDrivenDBInputFormat.java @@ -178,7 +178,6 @@ public List getSplits(JobContext job) throws IOException { ResultSet results = null; Statement statement = null; - Connection connection = getConnection(); try { statement = connection.createStatement(); @@ -289,12 +288,12 @@ protected RecordReader createDBRecordReader(DBInputSplit split, if (dbProductName.startsWith("MYSQL")) { // use MySQL-specific db reader. return new MySQLDataDrivenDBRecordReader(split, inputClass, - conf, getConnection(), dbConf, dbConf.getInputConditions(), + conf, createConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(), dbConf.getInputTableName()); } else { // Generic reader. return new DataDrivenDBRecordReader(split, inputClass, - conf, getConnection(), dbConf, dbConf.getInputConditions(), + conf, createConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(), dbConf.getInputTableName(), dbProductName); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java index 8fbd473d09..a02471e482 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDataDrivenDBInputFormat.java @@ -84,7 +84,7 @@ protected RecordReader createDBRecordReader(DBInputSplit split, try { // Use Oracle-specific db reader return new OracleDataDrivenDBRecordReader(split, inputClass, - conf, getConnection(), dbConf, dbConf.getInputConditions(), + conf, createConnection(), dbConf, dbConf.getInputConditions(), dbConf.getInputFieldNames(), dbConf.getInputTableName()); } catch (SQLException ex) { throw new IOException(ex.getMessage());