diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 52a61809fb..32a8b2df11 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -170,6 +170,9 @@ Release 2.5.0 - UNRELEASED YARN-2159. Better logging in SchedulerNode#allocateContainer. (Ray Chiang via kasha) + YARN-2191. Added a new test to ensure NM will clean up completed applications + in the case of RM restart. (Wangda Tan via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 47d4e37143..83ebdb6ebe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -28,16 +28,20 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -46,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.http.client.params.AllClientPNames; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -303,7 +308,7 @@ private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) @SuppressWarnings("resource") @Test (timeout = 60000) - public void testAppCleanupWhenRestartedAfterAppFinished() throws Exception { + public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); @@ -336,6 +341,65 @@ public void testAppCleanupWhenRestartedAfterAppFinished() throws Exception { rm1.stop(); rm2.stop(); } + + @SuppressWarnings("resource") + @Test(timeout = 60000) + public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception { + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + + // start RM + MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 1024, rm1.getResourceTrackerService()); + nm1.registerNode(); + MockNM nm2 = + new MockNM("127.0.0.1:5678", 1024, rm1.getResourceTrackerService()); + nm2.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(200); + MockAM am0 = launchAM(app0, rm1, nm1); + + // alloc another container on nm2 + AllocateResponse allocResponse = + am0.allocate(Arrays.asList(ResourceRequest.newInstance( + Priority.newInstance(1), "*", Resource.newInstance(1024, 0), 1)), + null); + while (null == allocResponse.getAllocatedContainers() + || allocResponse.getAllocatedContainers().isEmpty()) { + nm2.nodeHeartbeat(true); + allocResponse = am0.allocate(null, null); + Thread.sleep(1000); + } + + // start new RM + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + + // nm1/nm2 register to rm2, and do a heartbeat + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm1.registerNode(Arrays.asList(NMContainerStatus.newInstance( + ContainerId.newInstance(am0.getApplicationAttemptId(), 1), + ContainerState.COMPLETE, Resource.newInstance(1024, 1), "", 0)), Arrays + .asList(app0.getApplicationId())); + nm2.setResourceTrackerService(rm2.getResourceTrackerService()); + nm2.registerNode(Arrays.asList(app0.getApplicationId())); + + // assert app state has been saved. + rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED); + + // wait for application cleanup message received on NM1 + waitForAppCleanupMessageRecved(nm1, app0.getApplicationId()); + + // wait for application cleanup message received on NM2 + waitForAppCleanupMessageRecved(nm2, app0.getApplicationId()); + + rm1.stop(); + rm2.stop(); + } public static void main(String[] args) throws Exception { TestApplicationCleanup t = new TestApplicationCleanup();