MAPREDUCE-7457: Added support to limit count of spill files (#6155) Contributed by Mudit Sharma.

Reviewed-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
mudit1289 2023-10-31 04:28:22 +05:30 committed by GitHub
parent 254dbab5a3
commit f1ce273150
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 116 additions and 3 deletions

View File

@ -955,7 +955,10 @@ public static class MapOutputBuffer<K extends Object, V extends Object>
new ArrayList<SpillRecord>(); new ArrayList<SpillRecord>();
private int totalIndexCacheMemory; private int totalIndexCacheMemory;
private int indexCacheMemoryLimit; private int indexCacheMemoryLimit;
private int spillFilesCountLimit;
private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024; private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024;
private static final int SPILL_FILES_COUNT_LIMIT_DEFAULT = -1;
private static final int SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1;
private MapTask mapTask; private MapTask mapTask;
private MapOutputFile mapOutputFile; private MapOutputFile mapOutputFile;
@ -984,10 +987,17 @@ public void init(MapOutputCollector.Context context
MRJobConfig.DEFAULT_IO_SORT_MB); MRJobConfig.DEFAULT_IO_SORT_MB);
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT); INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
spillFilesCountLimit = job.getInt(JobContext.SPILL_FILES_COUNT_LIMIT,
SPILL_FILES_COUNT_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) { if (spillper > (float)1.0 || spillper <= (float)0.0) {
throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
"\": " + spillper); "\": " + spillper);
} }
if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
&& spillFilesCountLimit < 0) {
throw new IOException("Invalid value for \"" + JobContext.SPILL_FILES_COUNT_LIMIT + "\", " +
"current value: " + spillFilesCountLimit);
}
if ((sortmb & 0x7FF) != sortmb) { if ((sortmb & 0x7FF) != sortmb) {
throw new IOException( throw new IOException(
"Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
@ -1698,7 +1708,7 @@ private void sortAndSpill() throws IOException, ClassNotFoundException,
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
} }
LOG.info("Finished spill " + numSpills); LOG.info("Finished spill " + numSpills);
++numSpills; incrementNumSpills();
} finally { } finally {
if (out != null) out.close(); if (out != null) out.close();
if (partitionOut != null) { if (partitionOut != null) {
@ -1774,7 +1784,7 @@ private void spillSingleRecord(final K key, final V value,
totalIndexCacheMemory += totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
} }
++numSpills; incrementNumSpills();
} finally { } finally {
if (out != null) out.close(); if (out != null) out.close();
if (partitionOut != null) { if (partitionOut != null) {
@ -2030,6 +2040,21 @@ private void sameVolRename(Path srcPath,
throw new IOException("Unable to rename " + src + " to " + dst); throw new IOException("Unable to rename " + src + " to " + dst);
} }
} }
/**
* Increments numSpills local counter by taking into consideration
* the max limit on spill files being generated by the job.
* If limit is reached, this function throws an IOException
*/
private void incrementNumSpills() throws IOException {
++numSpills;
if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE
&& numSpills > spillFilesCountLimit) {
throw new IOException("Too many spill files got created, control it with " +
"mapreduce.task.spill.files.count.limit, current value: " + spillFilesCountLimit +
", current spill count: " + numSpills);
}
}
} // MapOutputBuffer } // MapOutputBuffer
/** /**

View File

@ -323,6 +323,7 @@ public interface MRJobConfig {
public static final int DEFAULT_IO_SORT_MB = 100; public static final int DEFAULT_IO_SORT_MB = 100;
public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes"; public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes";
String SPILL_FILES_COUNT_LIMIT = "mapreduce.task.spill.files.count.limit";
public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks"; public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks";

View File

@ -62,6 +62,15 @@
set to less than .5</description> set to less than .5</description>
</property> </property>
<property>
<name>mapreduce.task.spill.files.count.limit</name>
<value>-1</value>
<description>Number of spill files that can be created by a MapTask.
After breaching this, task will fail. Default value for this config is -1
which indicates that there is no limit on number of spill files being
created</description>
</property>
<property> <property>
<name>mapreduce.job.local-fs.single-disk-limit.bytes</name> <name>mapreduce.job.local-fs.single-disk-limit.bytes</name>
<value>-1</value> <value>-1</value>

View File

@ -27,14 +27,20 @@
import org.apache.hadoop.mapred.MapTask.MapOutputBuffer; import org.apache.hadoop.mapred.MapTask.MapOutputBuffer;
import org.apache.hadoop.mapred.Task.TaskReporter; import org.apache.hadoop.mapred.Task.TaskReporter;
import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progress;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@ -51,6 +57,9 @@ public void cleanup() throws Exception {
FileUtil.fullyDelete(TEST_ROOT_DIR); FileUtil.fullyDelete(TEST_ROOT_DIR);
} }
@Rule
public ExpectedException exception = ExpectedException.none();
// Verify output files for shuffle have group read permission even when // Verify output files for shuffle have group read permission even when
// the configured umask normally would prevent it. // the configured umask normally would prevent it.
@Test @Test
@ -84,4 +93,73 @@ public void testShufflePermissions() throws Exception {
Assert.assertEquals("Incorrect index file perms", Assert.assertEquals("Incorrect index file perms",
(short)0640, perms.toShort()); (short)0640, perms.toShort());
} }
@Test
public void testSpillFilesCountLimitInvalidValue() throws Exception {
JobConf conf = new JobConf();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath());
conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, -2);
MapOutputFile mof = new MROutputFiles();
mof.setConf(conf);
TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1);
MapTask mockTask = mock(MapTask.class);
doReturn(mof).when(mockTask).getMapOutputFile();
doReturn(attemptId).when(mockTask).getTaskID();
doReturn(new Progress()).when(mockTask).getSortPhase();
TaskReporter mockReporter = mock(TaskReporter.class);
doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class));
MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter);
MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();
exception.expect(IOException.class);
exception.expectMessage("Invalid value for \"mapreduce.task.spill.files.count.limit\", " +
"current value: -2");
mob.init(ctx);
mob.close();
}
@Test
public void testSpillFilesCountBreach() throws Exception {
JobConf conf = new JobConf();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath());
conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, 2);
MapOutputFile mof = new MROutputFiles();
mof.setConf(conf);
TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1);
MapTask mockTask = mock(MapTask.class);
doReturn(mof).when(mockTask).getMapOutputFile();
doReturn(attemptId).when(mockTask).getTaskID();
doReturn(new Progress()).when(mockTask).getSortPhase();
TaskReporter mockReporter = mock(TaskReporter.class);
doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class));
MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter);
MapOutputBuffer<Object, Object> mob = new MapOutputBuffer<>();
mob.numSpills = 2;
mob.init(ctx);
Method method = mob.getClass().getDeclaredMethod("incrementNumSpills");
method.setAccessible(true);
boolean gotExceptionWithMessage = false;
try {
method.invoke(mob);
} catch(InvocationTargetException e) {
Throwable targetException = e.getTargetException();
if (targetException != null) {
String errorMessage = targetException.getMessage();
if (errorMessage != null) {
if(errorMessage.equals("Too many spill files got created, control it with " +
"mapreduce.task.spill.files.count.limit, current value: 2, current spill count: 3")) {
gotExceptionWithMessage = true;
}
}
}
}
mob.close();
Assert.assertTrue(gotExceptionWithMessage);
}
} }