MAPREDUCE-6246. DBOutputFormat.java appending extra semicolon to query which is incompatible with DB2. Contributed by ramtin and Gergely Novák.

This commit is contained in:
Junping Du 2017-07-07 13:23:43 -07:00
parent f10864a820
commit f484a6ff60
2 changed files with 58 additions and 2 deletions

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@ -51,6 +52,8 @@ public class DBOutputFormat<K extends DBWritable, V>
extends OutputFormat<K,V> {
private static final Log LOG = LogFactory.getLog(DBOutputFormat.class);
public String dbProductName = "DEFAULT";
public void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException {}
@ -158,7 +161,12 @@ public String constructQuery(String table, String[] fieldNames) {
query.append(",");
}
}
if (dbProductName.startsWith("DB2") || dbProductName.startsWith("ORACLE")) {
query.append(")");
} else {
query.append(");");
}
return query.toString();
}
@ -178,6 +186,9 @@ public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
Connection connection = dbConf.getConnection();
PreparedStatement statement = null;
DatabaseMetaData dbMeta = connection.getMetaData();
this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
statement = connection.prepareStatement(
constructQuery(tableName, fieldNames));
return new DBRecordWriter(connection, statement);

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.mapreduce.lib.db;
import java.io.IOException;
import java.lang.reflect.Field;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
@ -26,6 +28,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
public class TestDBOutputFormat {
private String[] fieldNames = new String[] { "id", "name", "value" };
@ -46,6 +49,48 @@ public void testConstructQuery() {
assertEquals(nullExpected, actual);
}
@Test
public void testDB2ConstructQuery() {
String db2expected = StringUtils.removeEnd(expected, ";");
String db2nullExpected = StringUtils.removeEnd(nullExpected, ";");
try {
Class<?> clazz = this.format.getClass();
Field field = clazz.getDeclaredField("dbProductName");
field.setAccessible(true);
field.set(format, "DB2");
} catch (IllegalAccessException | NoSuchFieldException e) {
fail(e.getMessage());
}
String actual = format.constructQuery("hadoop_output", fieldNames);
assertEquals(db2expected, actual);
actual = format.constructQuery("hadoop_output", nullFieldNames);
assertEquals(db2nullExpected, actual);
}
@Test
public void testORACLEConstructQuery() {
String oracleExpected = StringUtils.removeEnd(expected, ";");
String oracleNullExpected = StringUtils.removeEnd(nullExpected, ";");
try {
Class<?> clazz = this.format.getClass();
Field field = clazz.getDeclaredField("dbProductName");
field.setAccessible(true);
field.set(format, "ORACLE");
} catch (IllegalAccessException | NoSuchFieldException e) {
fail(e.getMessage());
}
String actual = format.constructQuery("hadoop_output", fieldNames);
assertEquals(oracleExpected, actual);
actual = format.constructQuery("hadoop_output", nullFieldNames);
assertEquals(oracleNullExpected, actual);
}
@Test
public void testSetOutput() throws IOException {
Job job = Job.getInstance(new Configuration());