From 3b3b63081b39270ba363eb4558c5fb37fd5172d5 Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Mon, 23 May 2016 07:24:31 +0900 Subject: [PATCH] MAPREDUCE-6607. Enable regex pattern matching when mapreduce.task.files.preserve.filepattern is set. Contributed by Kai Sasaki. --- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 32 ++++- .../mapreduce/v2/app/TestStagingCleanup.java | 120 ++++++++++++++++++ 2 files changed, 146 insertions(+), 6 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index aa34fcfaa2..f8d54c57ba 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.nio.file.Paths; import java.security.NoSuchAlgorithmException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; @@ -34,6 +35,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; @@ -567,9 +570,27 @@ protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) { NoopAMPreemptionPolicy.class, AMPreemptionPolicy.class), conf); } - protected boolean keepJobFiles(JobConf conf) { - return (conf.getKeepTaskFilesPattern() != null || conf - .getKeepFailedTaskFiles()); + private boolean isJobNamePatternMatch(JobConf conf, String jobTempDir) { + // Matched staging files should be preserved after job is finished. + if (conf.getKeepTaskFilesPattern() != null && jobTempDir != null) { + String jobFileName = Paths.get(jobTempDir).getFileName().toString(); + Pattern pattern = Pattern.compile(conf.getKeepTaskFilesPattern()); + Matcher matcher = pattern.matcher(jobFileName); + return matcher.find(); + } else { + return false; + } + } + + private boolean isKeepFailedTaskFiles(JobConf conf) { + // TODO: Decide which failed task files that should + // be kept are in application log directory. + return conf.getKeepFailedTaskFiles(); + } + + protected boolean keepJobFiles(JobConf conf, String jobTempDir) { + return isJobNamePatternMatch(conf, jobTempDir) + || isKeepFailedTaskFiles(conf); } /** @@ -592,11 +613,10 @@ protected Credentials getCredentials() { */ public void cleanupStagingDir() throws IOException { /* make sure we clean the staging files */ - String jobTempDir = null; + String jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR); FileSystem fs = getFileSystem(getConfig()); try { - if (!keepJobFiles(new JobConf(getConfig()))) { - jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR); + if (!keepJobFiles(new JobConf(getConfig()), jobTempDir)) { if (jobTempDir == null) { LOG.warn("Job Staging directory is null"); return; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index fc64996a8e..5d5b4fabc4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -76,6 +77,11 @@ public class TestStagingCleanup { private final static RecordFactory recordFactory = RecordFactoryProvider. getRecordFactory(null); + @After + public void tearDown() { + conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, false); + } + @Test public void testDeletionofStagingOnUnregistrationFailure() throws IOException { @@ -245,6 +251,120 @@ public void testDeletionofStagingOnKillLastTry() throws IOException { verify(fs).delete(stagingJobPath, true); } + @Test + public void testByPreserveFailedStaging() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + // TODO: Decide which failed task files that should + // be kept are in application log directory. + // Currently all files are not deleted from staging dir. + conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + //Staging Dir exists + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.FAILED, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + appMaster.init(conf); + appMaster.start(); + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry()); + verify(fs, times(0)).delete(stagingJobPath, true); + } + + @Test + public void testPreservePatternMatchedStaging() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + // The staging files that are matched to the pattern + // should not be deleted + conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir"); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + //Staging Dir exists + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + appMaster.init(conf); + appMaster.start(); + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry()); + verify(fs, times(0)).delete(stagingJobPath, true); + } + + @Test + public void testNotPreserveNotPatternMatchedStaging() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "NotMatching"); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + //Staging Dir exists + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + appMaster.init(conf); + appMaster.start(); + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry()); + //Staging dir should be deleted because it is not matched with + //PRESERVE_FILES_PATTERN + verify(fs, times(1)).delete(stagingJobPath, true); + } + + @Test + public void testPreservePatternMatchedAndFailedStaging() throws IOException { + conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); + // When RESERVE_FILES_PATTERN and PRESERVE_FAILED_TASK_FILES are set, + // files in staging dir are always kept. + conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir"); + conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true); + fs = mock(FileSystem.class); + when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true); + //Staging Dir exists + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path stagingDir = MRApps.getStagingAreaDir(conf, user); + when(fs.exists(stagingDir)).thenReturn(true); + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 0); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + JobId jobid = recordFactory.newRecordInstance(JobId.class); + jobid.setAppId(appId); + ContainerAllocator mockAlloc = mock(ContainerAllocator.class); + Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, + JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + appMaster.init(conf); + appMaster.start(); + appMaster.shutDownJob(); + //test whether notifyIsLastAMRetry called + Assert.assertEquals(true, ((TestMRApp) appMaster).getTestIsLastAMRetry()); + verify(fs, times(0)).delete(stagingJobPath, true); + } + private class TestMRApp extends MRAppMaster { ContainerAllocator allocator; boolean testIsLastAMRetry = false;