MAPREDUCE-7370. Parallelize MultipleOutputs#close call (#4248). Contributed by Ashutosh Gupta.

Reviewed-by: Akira Ajisaka <aajisaka@apache.org>
Signed-off-by: Chris Nauroth <cnauroth@apache.org>
This commit is contained in:
Ashutosh Gupta 2022-10-06 23:23:05 +01:00 committed by GitHub
parent 8336b91329
commit 062c50db6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 165 additions and 8 deletions

View File

@ -17,15 +17,39 @@
*/ */
package org.apache.hadoop.mapred.lib; package org.apache.hadoop.mapred.lib;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.util.*;
/** /**
* The MultipleOutputs class simplifies writing to additional outputs other * The MultipleOutputs class simplifies writing to additional outputs other
* than the job default output via the <code>OutputCollector</code> passed to * than the job default output via the <code>OutputCollector</code> passed to
@ -132,6 +156,7 @@ public class MultipleOutputs {
* Counters group used by the counters of MultipleOutputs. * Counters group used by the counters of MultipleOutputs.
*/ */
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName(); private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
private static final Logger LOG = LoggerFactory.getLogger(MultipleOutputs.class);
/** /**
* Checks if a named output is alreadyDefined or not. * Checks if a named output is alreadyDefined or not.
@ -381,6 +406,11 @@ public static boolean getCountersEnabled(JobConf conf) {
private Map<String, RecordWriter> recordWriters; private Map<String, RecordWriter> recordWriters;
private boolean countersEnabled; private boolean countersEnabled;
@VisibleForTesting
synchronized void setRecordWriters(Map<String, RecordWriter> recordWriters) {
this.recordWriters = recordWriters;
}
/** /**
* Creates and initializes multiple named outputs support, it should be * Creates and initializes multiple named outputs support, it should be
* instantiated in the Mapper/Reducer configure method. * instantiated in the Mapper/Reducer configure method.
@ -528,8 +558,41 @@ public void collect(Object key, Object value) throws IOException {
* could not be closed properly. * could not be closed properly.
*/ */
public void close() throws IOException { public void close() throws IOException {
int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
AtomicBoolean encounteredException = new AtomicBoolean(false);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
.setUncaughtExceptionHandler(((t, e) -> {
LOG.error("Thread " + t + " failed unexpectedly", e);
encounteredException.set(true);
})).build();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
List<Callable<Object>> callableList = new ArrayList<>(recordWriters.size());
for (RecordWriter writer : recordWriters.values()) { for (RecordWriter writer : recordWriters.values()) {
writer.close(null); callableList.add(() -> {
try {
writer.close(null);
} catch (IOException e) {
LOG.error("Error while closing MultipleOutput file", e);
encounteredException.set(true);
}
return null;
});
}
try {
executorService.invokeAll(callableList);
} catch (InterruptedException e) {
LOG.warn("Closing is Interrupted");
Thread.currentThread().interrupt();
} finally {
executorService.shutdown();
}
if (encounteredException.get()) {
throw new IOException(
"One or more threads encountered exception during close. See prior errors.");
} }
} }

View File

@ -131,5 +131,7 @@ public interface MRConfig {
String MASTER_WEBAPP_UI_ACTIONS_ENABLED = String MASTER_WEBAPP_UI_ACTIONS_ENABLED =
"mapreduce.webapp.ui-actions.enabled"; "mapreduce.webapp.ui-actions.enabled";
boolean DEFAULT_MASTER_WEBAPP_UI_ACTIONS_ENABLED = true; boolean DEFAULT_MASTER_WEBAPP_UI_ACTIONS_ENABLED = true;
String MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT = "mapreduce.multiple-outputs-close-threads";
int DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT = 10;
} }

View File

@ -19,14 +19,23 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* The MultipleOutputs class simplifies writing output data * The MultipleOutputs class simplifies writing output data
@ -191,6 +200,8 @@ public class MultipleOutputs<KEYOUT, VALUEOUT> {
* Counters group used by the counters of MultipleOutputs. * Counters group used by the counters of MultipleOutputs.
*/ */
private static final String COUNTERS_GROUP = MultipleOutputs.class.getName(); private static final String COUNTERS_GROUP = MultipleOutputs.class.getName();
private static final Logger LOG =
LoggerFactory.getLogger(org.apache.hadoop.mapred.lib.MultipleOutputs.class);
/** /**
* Cache for the taskContexts * Cache for the taskContexts
@ -345,6 +356,11 @@ public static boolean getCountersEnabled(JobContext job) {
return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false); return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
} }
@VisibleForTesting
synchronized void setRecordWriters(Map<String, RecordWriter<?, ?>> recordWriters) {
this.recordWriters = recordWriters;
}
/** /**
* Wraps RecordWriter to increment counters. * Wraps RecordWriter to increment counters.
*/ */
@ -568,8 +584,43 @@ public void setStatus(String status) {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void close() throws IOException, InterruptedException { public void close() throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
int nThreads = conf.getInt(MRConfig.MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT,
MRConfig.DEFAULT_MULTIPLE_OUTPUTS_CLOSE_THREAD_COUNT);
AtomicBoolean encounteredException = new AtomicBoolean(false);
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("MultipleOutputs-close")
.setUncaughtExceptionHandler(((t, e) -> {
LOG.error("Thread " + t + " failed unexpectedly", e);
encounteredException.set(true);
})).build();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
List<Callable<Object>> callableList = new ArrayList<>(recordWriters.size());
for (RecordWriter writer : recordWriters.values()) { for (RecordWriter writer : recordWriters.values()) {
writer.close(context); callableList.add(() -> {
try {
writer.close(context);
} catch (IOException e) {
LOG.error("Error while closing MultipleOutput file", e);
encounteredException.set(true);
}
return null;
});
}
try {
executorService.invokeAll(callableList);
} catch (InterruptedException e) {
LOG.warn("Closing is Interrupted");
Thread.currentThread().interrupt();
} finally {
executorService.shutdown();
}
if (encounteredException.get()) {
throw new IOException(
"One or more threads encountered exception during close. See prior errors.");
} }
} }
} }

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.RunningJob;
@ -46,11 +47,16 @@
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestMultipleOutputs extends HadoopTestCase { public class TestMultipleOutputs extends HadoopTestCase {
@ -70,6 +76,19 @@ public void testWithCounters() throws Exception {
_testMOWithJavaSerialization(true); _testMOWithJavaSerialization(true);
} }
@SuppressWarnings("unchecked")
@Test(expected = IOException.class)
public void testParallelCloseIOException() throws IOException {
RecordWriter writer = mock(RecordWriter.class);
Map<String, RecordWriter> recordWriters = mock(Map.class);
when(recordWriters.values()).thenReturn(Arrays.asList(writer, writer));
doThrow(new IOException("test IO exception")).when(writer).close(null);
JobConf conf = createJobConf();
MultipleOutputs mos = new MultipleOutputs(conf);
mos.setRecordWriters(recordWriters);
mos.close();
}
private static final Path ROOT_DIR = new Path("testing/mo"); private static final Path ROOT_DIR = new Path("testing/mo");
private static final Path IN_DIR = new Path(ROOT_DIR, "input"); private static final Path IN_DIR = new Path(ROOT_DIR, "input");
private static final Path OUT_DIR = new Path(ROOT_DIR, "output"); private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
@ -307,6 +326,7 @@ protected void _testMultipleOutputs(boolean withCounters) throws Exception {
} }
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
public static class MOMap implements Mapper<LongWritable, Text, LongWritable, public static class MOMap implements Mapper<LongWritable, Text, LongWritable,
Text> { Text> {

View File

@ -31,7 +31,9 @@
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MapReduceTestUtil; import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -39,10 +41,15 @@
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Map;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestMRMultipleOutputs extends HadoopTestCase { public class TestMRMultipleOutputs extends HadoopTestCase {
@ -62,6 +69,20 @@ public void testWithCounters() throws Exception {
_testMOWithJavaSerialization(true); _testMOWithJavaSerialization(true);
} }
@SuppressWarnings("unchecked")
@Test(expected = IOException.class)
public void testParallelCloseIOException() throws IOException, InterruptedException {
RecordWriter writer = mock(RecordWriter.class);
Map recordWriters = mock(Map.class);
when(recordWriters.values()).thenReturn(Arrays.asList(writer, writer));
Mapper.Context taskInputOutputContext = mock(Mapper.Context.class);
when(taskInputOutputContext.getConfiguration()).thenReturn(createJobConf());
doThrow(new IOException("test IO exception")).when(writer).close(taskInputOutputContext);
MultipleOutputs<Long, String> mos = new MultipleOutputs<Long, String>(taskInputOutputContext);
mos.setRecordWriters(recordWriters);
mos.close();
}
private static String localPathRoot = private static String localPathRoot =
System.getProperty("test.build.data", "/tmp"); System.getProperty("test.build.data", "/tmp");
private static final Path ROOT_DIR = new Path(localPathRoot, "testing/mo"); private static final Path ROOT_DIR = new Path(localPathRoot, "testing/mo");