diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java
index 3ef6601fbf..a214420df8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultipleOutputs.java
@@ -17,15 +17,39 @@
*/
package org.apache.hadoop.mapred.lib;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.util.Progressable;
-import java.io.IOException;
-import java.util.*;
-
/**
* The MultipleOutputs class simplifies writing to additional outputs other
* than the job default output via the OutputCollector
passed to
@@ -132,6 +156,7 @@ public class MultipleOutputs {
* Counters group used by the counters of MultipleOutputs.
*/
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
+ private static final Logger LOG = LoggerFactory.getLogger(MultipleOutputs.class);
/**
* Checks if a named output is alreadyDefined or not.
@@ -381,6 +406,11 @@ public static boolean getCountersEnabled(JobConf conf) {
private Map recordWriters;
private boolean countersEnabled;
+ @VisibleForTesting
+ synchronized void setRecordWriters(Map recordWriters) {
+ this.recordWriters = recordWriters;
+ }
+
/**
* Creates and initializes multiple named outputs support, it should be
* instantiated in the Mapper/Reducer configure method.
@@ -528,8 +558,41 @@ public void collect(Object key, Object value) throws IOException {
* could not be closed properly.
*/
public void close() throws IOException {
+ int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
+ MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
+ AtomicBoolean encounteredException = new AtomicBoolean(false);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
+ .setUncaughtExceptionHandler(((t, e) -> {
+ LOG.error("Thread " + t + " failed unexpectedly", e);
+ encounteredException.set(true);
+ })).build();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
+
+ List> callableList = new ArrayList<>(recordWriters.size());
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(null);
+ callableList.add(() -> {
+ try {
+ writer.close(null);
+ } catch (IOException e) {
+ LOG.error("Error while closing MultipleOutput file", e);
+ encounteredException.set(true);
+ }
+ return null;
+ });
+ }
+ try {
+ executorService.invokeAll(callableList);
+ } catch (InterruptedException e) {
+ LOG.warn("Closing is Interrupted");
+ Thread.currentThread().interrupt();
+ } finally {
+ executorService.shutdown();
+ }
+
+ if (encounteredException.get()) {
+ throw new IOException(
+ "One or more threads encountered exception during close. See prior errors.");
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
index b4d91491e1..8671eb30b9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
@@ -131,5 +131,7 @@ public interface MRConfig {
String MASTER_WEBAPP_UI_ACTIONS_ENABLED =
"mapreduce.webapp.ui-actions.enabled";
boolean DEFAULT_MASTER_WEBAPP_UI_ACTIONS_ENABLED = true;
+ String MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT = "mapreduce.multiple-outputs-close-threads";
+ int DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT = 10;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
index a4d8acbbbf..3c36dfb8bb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
@@ -19,14 +19,23 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The MultipleOutputs class simplifies writing output data
@@ -191,6 +200,8 @@ public class MultipleOutputs {
* Counters group used by the counters of MultipleOutputs.
*/
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
+ private static final Logger LOG =
+ LoggerFactory.getLogger(org.apache.hadoop.mapred.lib.MultipleOutputs.class);
/**
* Cache for the taskContexts
@@ -345,6 +356,11 @@ public static boolean getCountersEnabled(JobContext job) {
return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
}
+ @VisibleForTesting
+ synchronized void setRecordWriters(Map> recordWriters) {
+ this.recordWriters = recordWriters;
+ }
+
/**
* Wraps RecordWriter to increment counters.
*/
@@ -568,8 +584,43 @@ public void setStatus(String status) {
*/
@SuppressWarnings("unchecked")
public void close() throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
+ MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
+ AtomicBoolean encounteredException = new AtomicBoolean(false);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
+ .setUncaughtExceptionHandler(((t, e) -> {
+ LOG.error("Thread " + t + " failed unexpectedly", e);
+ encounteredException.set(true);
+ })).build();
+ ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
+
+ List> callableList = new ArrayList<>(recordWriters.size());
+
for (RecordWriter writer : recordWriters.values()) {
- writer.close(context);
+ callableList.add(() -> {
+ try {
+ writer.close(context);
+ } catch (IOException e) {
+ LOG.error("Error while closing MultipleOutput file", e);
+ encounteredException.set(true);
+ }
+ return null;
+ });
+ }
+ try {
+ executorService.invokeAll(callableList);
+ } catch (InterruptedException e) {
+ LOG.warn("Closing is Interrupted");
+ Thread.currentThread().interrupt();
+ } finally {
+ executorService.shutdown();
+ }
+
+ if (encounteredException.get()) {
+ throw new IOException(
+ "One or more threads encountered exception during close. See prior errors.");
}
}
}
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
index f3e58930ea..8829a093b1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestMultipleOutputs.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
@@ -46,11 +47,16 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.Arrays;
import java.util.Iterator;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestMultipleOutputs extends HadoopTestCase {
@@ -70,6 +76,19 @@ public void testWithCounters() throws Exception {
_testMOWithJavaSerialization(true);
}
+ @SuppressWarnings("unchecked")
+ @Test(expected = IOException.class)
+ public void testParallelCloseIOException() throws IOException {
+ RecordWriter writer = mock(RecordWriter.class);
+ Map recordWriters = mock(Map.class);
+ when(recordWriters.values()).thenReturn(Arrays.asList(writer, writer));
+ doThrow(new IOException("test IO exception")).when(writer).close(null);
+ JobConf conf = createJobConf();
+ MultipleOutputs mos = new MultipleOutputs(conf);
+ mos.setRecordWriters(recordWriters);
+ mos.close();
+ }
+
private static final Path ROOT_DIR = new Path("testing/mo");
private static final Path IN_DIR = new Path(ROOT_DIR, "input");
private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
@@ -307,6 +326,7 @@ protected void _testMultipleOutputs(boolean withCounters) throws Exception {
}
+
@SuppressWarnings({"unchecked"})
public static class MOMap implements Mapper {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
index babd20e66c..717163ce24 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMRMultipleOutputs.java
@@ -31,7 +31,9 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -39,10 +41,15 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestMRMultipleOutputs extends HadoopTestCase {
@@ -62,6 +69,20 @@ public void testWithCounters() throws Exception {
_testMOWithJavaSerialization(true);
}
+ @SuppressWarnings("unchecked")
+ @Test(expected = IOException.class)
+ public void testParallelCloseIOException() throws IOException, InterruptedException {
+ RecordWriter writer = mock(RecordWriter.class);
+ Map recordWriters = mock(Map.class);
+ when(recordWriters.values()).thenReturn(Arrays.asList(writer, writer));
+ Mapper.Context taskInputOutputContext = mock(Mapper.Context.class);
+ when(taskInputOutputContext.getConfiguration()).thenReturn(createJobConf());
+ doThrow(new IOException("test IO exception")).when(writer).close(taskInputOutputContext);
+ MultipleOutputs mos = new MultipleOutputs(taskInputOutputContext);
+ mos.setRecordWriters(recordWriters);
+ mos.close();
+ }
+
private static String localPathRoot =
System.getProperty("test.build.data", "/tmp");
private static final Path ROOT_DIR = new Path(localPathRoot, "testing/mo");
@@ -85,7 +106,7 @@ public void tearDown() throws Exception {
fs.delete(ROOT_DIR, true);
super.tearDown();
}
-
+
protected void _testMOWithJavaSerialization(boolean withCounters) throws Exception {
String input = "a\nb\nc\nd\ne\nc\nd\ne";