From f1ce273150c0282c39b74df4e2a0c067df5a5e5c Mon Sep 17 00:00:00 2001 From: mudit1289 <50325057+mudit1289@users.noreply.github.com> Date: Tue, 31 Oct 2023 04:28:22 +0530 Subject: [PATCH] MAPREDUCE-7457: Added support to limit count of spill files (#6155) Contributed by Mudit Sharma. Reviewed-by: Shilun Fan Signed-off-by: Shilun Fan --- .../org/apache/hadoop/mapred/MapTask.java | 31 +++++++- .../apache/hadoop/mapreduce/MRJobConfig.java | 1 + .../src/main/resources/mapred-default.xml | 9 +++ .../org/apache/hadoop/mapred/TestMapTask.java | 78 +++++++++++++++++++ 4 files changed, 116 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 06d9fbbe7a..4f86f91283 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -955,7 +955,10 @@ public static class MapOutputBuffer new ArrayList(); private int totalIndexCacheMemory; private int indexCacheMemoryLimit; + private int spillFilesCountLimit; private static final int INDEX_CACHE_MEMORY_LIMIT_DEFAULT = 1024 * 1024; + private static final int SPILL_FILES_COUNT_LIMIT_DEFAULT = -1; + private static final int SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE = -1; private MapTask mapTask; private MapOutputFile mapOutputFile; @@ -984,10 +987,17 @@ public void init(MapOutputCollector.Context context MRJobConfig.DEFAULT_IO_SORT_MB); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); + spillFilesCountLimit = job.getInt(JobContext.SPILL_FILES_COUNT_LIMIT, + SPILL_FILES_COUNT_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } + if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE + && spillFilesCountLimit < 0) { + throw new IOException("Invalid value for \"" + JobContext.SPILL_FILES_COUNT_LIMIT + "\", " + + "current value: " + spillFilesCountLimit); + } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); @@ -1698,7 +1708,7 @@ private void sortAndSpill() throws IOException, ClassNotFoundException, spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); - ++numSpills; + incrementNumSpills(); } finally { if (out != null) out.close(); if (partitionOut != null) { @@ -1774,7 +1784,7 @@ private void spillSingleRecord(final K key, final V value, totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } - ++numSpills; + incrementNumSpills(); } finally { if (out != null) out.close(); if (partitionOut != null) { @@ -2022,7 +2032,7 @@ private void sameVolRename(Path srcPath, if (!dst.getParentFile().exists()) { if (!dst.getParentFile().mkdirs()) { throw new IOException("Unable to rename " + src + " to " - + dst + ": couldn't create parent directory"); + + dst + ": couldn't create parent directory"); } } @@ -2030,6 +2040,21 @@ private void sameVolRename(Path srcPath, throw new IOException("Unable to rename " + src + " to " + dst); } } + + /** + * Increments numSpills local counter by taking into consideration + * the max limit on spill files being generated by the job. + * If limit is reached, this function throws an IOException + */ + private void incrementNumSpills() throws IOException { + ++numSpills; + if(spillFilesCountLimit != SPILL_FILES_COUNT_UNBOUNDED_LIMIT_VALUE + && numSpills > spillFilesCountLimit) { + throw new IOException("Too many spill files got created, control it with " + + "mapreduce.task.spill.files.count.limit, current value: " + spillFilesCountLimit + + ", current spill count: " + numSpills); + } + } } // MapOutputBuffer /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 8ec984e777..289159ad92 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -323,6 +323,7 @@ public interface MRJobConfig { public static final int DEFAULT_IO_SORT_MB = 100; public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes"; + String SPILL_FILES_COUNT_LIMIT = "mapreduce.task.spill.files.count.limit"; public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 9b0d8b563d..ca144a7b15 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -62,6 +62,15 @@ set to less than .5 + + mapreduce.task.spill.files.count.limit + -1 + Number of spill files that can be created by a MapTask. + After breaching this, task will fail. Default value for this config is -1 + which indicates that there is no limit on number of spill files being + created + + mapreduce.job.local-fs.single-disk-limit.bytes -1 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java index d5164de46d..fef179994f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapTask.java @@ -27,14 +27,20 @@ import org.apache.hadoop.mapred.MapTask.MapOutputBuffer; import org.apache.hadoop.mapred.Task.TaskReporter; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.util.Progress; import org.junit.After; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; @@ -51,6 +57,9 @@ public void cleanup() throws Exception { FileUtil.fullyDelete(TEST_ROOT_DIR); } + @Rule + public ExpectedException exception = ExpectedException.none(); + // Verify output files for shuffle have group read permission even when // the configured umask normally would prevent it. @Test @@ -84,4 +93,73 @@ public void testShufflePermissions() throws Exception { Assert.assertEquals("Incorrect index file perms", (short)0640, perms.toShort()); } + + @Test + public void testSpillFilesCountLimitInvalidValue() throws Exception { + JobConf conf = new JobConf(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath()); + conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, -2); + MapOutputFile mof = new MROutputFiles(); + mof.setConf(conf); + TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1); + MapTask mockTask = mock(MapTask.class); + doReturn(mof).when(mockTask).getMapOutputFile(); + doReturn(attemptId).when(mockTask).getTaskID(); + doReturn(new Progress()).when(mockTask).getSortPhase(); + TaskReporter mockReporter = mock(TaskReporter.class); + doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class)); + MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter); + MapOutputBuffer mob = new MapOutputBuffer<>(); + + exception.expect(IOException.class); + exception.expectMessage("Invalid value for \"mapreduce.task.spill.files.count.limit\", " + + "current value: -2"); + + mob.init(ctx); + mob.close(); + } + + @Test + public void testSpillFilesCountBreach() throws Exception { + JobConf conf = new JobConf(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + conf.set(MRConfig.LOCAL_DIR, TEST_ROOT_DIR.getAbsolutePath()); + conf.setInt(MRJobConfig.SPILL_FILES_COUNT_LIMIT, 2); + MapOutputFile mof = new MROutputFiles(); + mof.setConf(conf); + TaskAttemptID attemptId = new TaskAttemptID("12345", 1, TaskType.MAP, 1, 1); + MapTask mockTask = mock(MapTask.class); + doReturn(mof).when(mockTask).getMapOutputFile(); + doReturn(attemptId).when(mockTask).getTaskID(); + doReturn(new Progress()).when(mockTask).getSortPhase(); + TaskReporter mockReporter = mock(TaskReporter.class); + doReturn(new Counter()).when(mockReporter).getCounter(any(TaskCounter.class)); + MapOutputCollector.Context ctx = new MapOutputCollector.Context(mockTask, conf, mockReporter); + MapOutputBuffer mob = new MapOutputBuffer<>(); + mob.numSpills = 2; + mob.init(ctx); + + Method method = mob.getClass().getDeclaredMethod("incrementNumSpills"); + method.setAccessible(true); + boolean gotExceptionWithMessage = false; + try { + method.invoke(mob); + } catch(InvocationTargetException e) { + Throwable targetException = e.getTargetException(); + if (targetException != null) { + String errorMessage = targetException.getMessage(); + if (errorMessage != null) { + if(errorMessage.equals("Too many spill files got created, control it with " + + "mapreduce.task.spill.files.count.limit, current value: 2, current spill count: 3")) { + gotExceptionWithMessage = true; + } + } + } + } + + mob.close(); + + Assert.assertTrue(gotExceptionWithMessage); + } }