diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 03ebbdb0bb..17caf5a0de 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -310,6 +310,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4040. History links should use hostname rather than IP address. (Bhallamudi Venkata Siva Kamesh via sseth) + MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory + after the history service is stopped. (Jason Lowe via sseth) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 680872c986..f76ae5a9db 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -285,6 +285,11 @@ public void init(final Configuration conf) { addIfService(containerLauncher); dispatcher.register(ContainerLauncher.EventType.class, containerLauncher); + // Add the staging directory cleaner before the history server but after + // the container allocator so the staging directory is cleaned after + // the history has been flushed but before unregistering with the RM. + addService(createStagingDirCleaningService()); + // Add the JobHistoryEventHandler last so that it is properly stopped first. // This will guarantee that all history-events are flushed before AM goes // ahead with shutdown. @@ -406,13 +411,6 @@ public void handle(JobFinishEvent event) { e.printStackTrace(); } - // Cleanup staging directory - try { - cleanupStagingDir(); - } catch(IOException io) { - LOG.warn("Failed to delete staging dir", io); - } - try { // Stop all services // This will also send the final report to the ResourceManager @@ -512,6 +510,10 @@ protected EventHandler createJobHistoryHandler( return this.jobHistoryEventHandler; } + protected AbstractService createStagingDirCleaningService() { + return new StagingDirCleaningService(); + } + protected Speculator createSpeculator(Configuration conf, AppContext context) { Class speculatorClass; @@ -710,6 +712,22 @@ public synchronized void stop() { } } + private final class StagingDirCleaningService extends AbstractService { + StagingDirCleaningService() { + super(StagingDirCleaningService.class.getName()); + } + + @Override + public synchronized void stop() { + try { + cleanupStagingDir(); + } catch (IOException io) { + LOG.error("Failed to cleanup staging dir: ", io); + } + super.stop(); + } + } + private class RunningAppContext implements AppContext { private final Map jobs = new ConcurrentHashMap(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index feb62724da..5bf26fed0f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -428,9 +428,13 @@ protected void attemptLaunched(TaskAttemptId attemptID) { @Override protected ContainerAllocator createContainerAllocator( ClientService clientService, final AppContext context) { - return new ContainerAllocator(){ - private int containerCount; - @Override + return new MRAppContainerAllocator(); + } + + protected class MRAppContainerAllocator implements ContainerAllocator { + private int containerCount; + + @Override public void handle(ContainerAllocatorEvent event) { ContainerId cId = recordFactory.newRecordInstance(ContainerId.class); cId.setApplicationAttemptId(getContext().getApplicationAttemptId()); @@ -452,7 +456,6 @@ public void handle(ContainerAllocatorEvent event) { new TaskAttemptContainerAssignedEvent(event.getAttemptID(), container, null)); } - }; } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index 68d07a7ef2..3ca9c24bad 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -18,11 +18,10 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.spy; -import java.io.IOException; import java.util.Iterator; import junit.framework.Assert; @@ -36,14 +35,11 @@ import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; -import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.YarnException; import org.junit.Test; /** @@ -237,71 +233,6 @@ public void checkTaskStateTypeConversion() { } } - private final class MRAppTestCleanup extends MRApp { - boolean hasStopped; - boolean cleanedBeforeStopped; - - public MRAppTestCleanup(int maps, int reduces, boolean autoComplete, - String testName, boolean cleanOnStart) { - super(maps, reduces, autoComplete, testName, cleanOnStart); - hasStopped = false; - cleanedBeforeStopped = false; - } - - @Override - protected Job createJob(Configuration conf) { - UserGroupInformation currentUser = null; - try { - currentUser = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - throw new YarnException(e); - } - Job newJob = new TestJob(getJobId(), getAttemptID(), conf, - getDispatcher().getEventHandler(), - getTaskAttemptListener(), getContext().getClock(), - getCommitter(), isNewApiCommitter(), - currentUser.getUserName(), getContext()); - ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); - - getDispatcher().register(JobFinishEvent.Type.class, - createJobFinishEventHandler()); - - return newJob; - } - - @Override - public void cleanupStagingDir() throws IOException { - cleanedBeforeStopped = !hasStopped; - } - - @Override - public synchronized void stop() { - hasStopped = true; - super.stop(); - } - - @Override - protected void sysexit() { - } - } - - @Test - public void testStagingCleanupOrder() throws Exception { - MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, - this.getClass().getName(), true); - JobImpl job = (JobImpl)app.submit(new Configuration()); - app.waitForState(job, JobState.SUCCEEDED); - app.verifyCompleted(); - - int waitTime = 20 * 1000; - while (waitTime > 0 && !app.cleanedBeforeStopped) { - Thread.sleep(100); - waitTime -= 100; - } - Assert.assertTrue("Staging directory not cleaned before notifying RM", - app.cleanedBeforeStopped); - } - public static void main(String[] args) throws Exception { TestMRApp t = new TestMRApp(); t.testMapReduce(); @@ -310,6 +241,5 @@ public static void main(String[] args) throws Exception { t.testCompletedMapsForReduceSlowstart(); t.testJobError(); t.testCountersOnJobFinish(); - t.testStagingCleanupOrder(); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java index e0dbac97b6..063fcfa2cf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java @@ -26,6 +26,7 @@ import java.io.IOException; +import junit.framework.Assert; import junit.framework.TestCase; import org.apache.commons.logging.Log; @@ -35,12 +36,21 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.Test; @@ -103,4 +113,89 @@ public Configuration getConfig() { } } + private final class MRAppTestCleanup extends MRApp { + boolean stoppedContainerAllocator; + boolean cleanedBeforeContainerAllocatorStopped; + + public MRAppTestCleanup(int maps, int reduces, boolean autoComplete, + String testName, boolean cleanOnStart) { + super(maps, reduces, autoComplete, testName, cleanOnStart); + stoppedContainerAllocator = false; + cleanedBeforeContainerAllocatorStopped = false; + } + + @Override + protected Job createJob(Configuration conf) { + UserGroupInformation currentUser = null; + try { + currentUser = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new YarnException(e); + } + Job newJob = new TestJob(getJobId(), getAttemptID(), conf, + getDispatcher().getEventHandler(), + getTaskAttemptListener(), getContext().getClock(), + getCommitter(), isNewApiCommitter(), + currentUser.getUserName(), getContext()); + ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); + + getDispatcher().register(JobFinishEvent.Type.class, + createJobFinishEventHandler()); + + return newJob; + } + + @Override + protected ContainerAllocator createContainerAllocator( + ClientService clientService, AppContext context) { + return new TestCleanupContainerAllocator(); + } + + private class TestCleanupContainerAllocator extends AbstractService + implements ContainerAllocator { + private MRAppContainerAllocator allocator; + + TestCleanupContainerAllocator() { + super(TestCleanupContainerAllocator.class.getName()); + allocator = new MRAppContainerAllocator(); + } + + @Override + public void handle(ContainerAllocatorEvent event) { + allocator.handle(event); + } + + @Override + public synchronized void stop() { + stoppedContainerAllocator = true; + super.stop(); + } + } + + @Override + public void cleanupStagingDir() throws IOException { + cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator; + } + + @Override + protected void sysexit() { + } + } + + @Test + public void testStagingCleanupOrder() throws Exception { + MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true, + this.getClass().getName(), true); + JobImpl job = (JobImpl)app.submit(new Configuration()); + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + + int waitTime = 20 * 1000; + while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) { + Thread.sleep(100); + waitTime -= 100; + } + Assert.assertTrue("Staging directory not cleaned before notifying RM", + app.cleanedBeforeContainerAllocatorStopped); + } } \ No newline at end of file