From 088156de43abb07bec590a3fcd1a5af2feb02cd2 Mon Sep 17 00:00:00 2001 From: Xuan Date: Fri, 8 May 2015 15:10:43 -0700 Subject: [PATCH] YARN-2331. Distinguish shutdown during supervision vs. shutdown for rolling upgrade. Contributed by Jason Lowe --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 4 + .../src/main/resources/yarn-default.xml | 9 ++ .../ContainerManagerImpl.java | 7 +- .../logaggregation/LogAggregationService.java | 5 +- .../TestContainerManagerRecovery.java | 119 +++++++++++++++--- 6 files changed, 124 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 47f689c67b..2183d3f9bf 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -206,6 +206,9 @@ Release 2.8.0 - UNRELEASED yarn.scheduler.capacity.node-locality-delay in code and default xml file. (Nijel SF via vinodkv) + YARN-2331. Distinguish shutdown during supervision vs. shutdown for + rolling upgrade. (Jason Lowe via xgong) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5291ff22de..0851f3c554 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1158,6 +1158,10 @@ private static void addDeprecatedKeys() { public static final String NM_RECOVERY_DIR = NM_RECOVERY_PREFIX + "dir"; + public static final String NM_RECOVERY_SUPERVISED = + NM_RECOVERY_PREFIX + "supervised"; + public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false; + //////////////////////////////// // Web Proxy Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e1e0ebd3d7..4d74f7622f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1192,6 +1192,15 @@ ${hadoop.tmp.dir}/yarn-nm-recovery + + Whether the nodemanager is running under supervision. A + nodemanager that supports recovery and is running under supervision + will not try to cleanup containers as it exits with the assumption + it will be immediately be restarted and recover containers. + yarn.nodemanager.recovery.supervised + false + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index c48df64bd9..494fa8fbd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -530,8 +530,11 @@ public void cleanUpApplicationsOnNMShutDown() { if (this.context.getNMStateStore().canRecover() && !this.context.getDecommissioned()) { - // do not cleanup apps as they can be recovered on restart - return; + if (getConfig().getBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, + YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED)) { + // do not cleanup apps as they can be recovered on restart + return; + } } List appIds = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 0018d56421..dbbfcd5deb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -145,10 +145,13 @@ protected void serviceStop() throws Exception { private void stopAggregators() { threadPool.shutdown(); + boolean supervised = getConfig().getBoolean( + YarnConfiguration.NM_RECOVERY_SUPERVISED, + YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED); // if recovery on restart is supported then leave outstanding aggregations // to the next restart boolean shouldAbort = context.getNMStateStore().canRecover() - && !context.getDecommissioned(); + && !context.getDecommissioned() && supervised; // politely ask to finish for (AppLogAggregator aggregator : appLogAggregators.values()) { if (shouldAbort) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index c45ffbb93d..781950e08d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -22,7 +22,11 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; @@ -68,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -82,27 +87,18 @@ public class TestContainerManagerRecovery { public void testApplicationRecovery() throws Exception { YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234"); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user"); NMStateStoreService stateStore = new NMMemoryStateStoreService(); stateStore.init(conf); stateStore.start(); - Context context = new NMContext(new NMContainerTokenSecretManager( - conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore); + Context context = createContext(conf, stateStore); ContainerManagerImpl cm = createContainerManager(context); cm.init(conf); cm.start(); - // simulate registration with RM - MasterKey masterKey = new MasterKeyPBImpl(); - masterKey.setKeyId(123); - masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123) - .byteValue() })); - context.getContainerTokenSecretManager().setMasterKey(masterKey); - context.getNMTokenSecretManager().setMasterKey(masterKey); - // add an application by starting a container String appUser = "app_user1"; String modUser = "modify_user1"; @@ -155,9 +151,7 @@ public void testApplicationRecovery() throws Exception { // reset container manager and verify app recovered with proper acls cm.stop(); - context = new NMContext(new NMContainerTokenSecretManager( - conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore); + context = createContext(conf, stateStore); cm = createContainerManager(context); cm.init(conf); cm.start(); @@ -201,9 +195,7 @@ public void testApplicationRecovery() throws Exception { // restart and verify app is marked for finishing cm.stop(); - context = new NMContext(new NMContainerTokenSecretManager( - conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore); + context = createContext(conf, stateStore); cm = createContainerManager(context); cm.init(conf); cm.start(); @@ -233,9 +225,7 @@ public void testApplicationRecovery() throws Exception { // restart and verify app is no longer present after recovery cm.stop(); - context = new NMContext(new NMContainerTokenSecretManager( - conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore); + context = createContext(conf, stateStore); cm = createContainerManager(context); cm.init(conf); cm.start(); @@ -243,6 +233,95 @@ public void testApplicationRecovery() throws Exception { cm.stop(); } + @Test + public void testContainerCleanupOnShutdown() throws Exception { + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + Map localResources = Collections.emptyMap(); + Map containerEnv = Collections.emptyMap(); + List containerCmds = Collections.emptyList(); + Map serviceData = Collections.emptyMap(); + Credentials containerCreds = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + containerCreds.writeTokenStorageToStream(dob); + ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + Map acls = Collections.emptyMap(); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, containerCmds, serviceData, + containerTokens, acls); + // create the logAggregationContext + LogAggregationContext logAggregationContext = + LogAggregationContext.newInstance("includePattern", "excludePattern"); + + // verify containers are stopped on shutdown without recovery + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false); + conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234"); + Context context = createContext(conf, new NMNullStateStoreService()); + ContainerManagerImpl cm = spy(createContainerManager(context)); + cm.init(conf); + cm.start(); + StartContainersResponse startResponse = startContainer(context, cm, cid, + clc, logAggregationContext); + assertEquals(1, startResponse.getSuccessfullyStartedContainers().size()); + cm.stop(); + verify(cm).handle(isA(CMgrCompletedAppsEvent.class)); + + // verify containers are stopped on shutdown with unsupervised recovery + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false); + NMMemoryStateStoreService memStore = new NMMemoryStateStoreService(); + memStore.init(conf); + memStore.start(); + context = createContext(conf, memStore); + cm = spy(createContainerManager(context)); + cm.init(conf); + cm.start(); + startResponse = startContainer(context, cm, cid, + clc, logAggregationContext); + assertEquals(1, startResponse.getSuccessfullyStartedContainers().size()); + cm.stop(); + memStore.close(); + verify(cm).handle(isA(CMgrCompletedAppsEvent.class)); + + // verify containers are not stopped on shutdown with supervised recovery + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + memStore = new NMMemoryStateStoreService(); + memStore.init(conf); + memStore.start(); + context = createContext(conf, memStore); + cm = spy(createContainerManager(context)); + cm.init(conf); + cm.start(); + startResponse = startContainer(context, cm, cid, + clc, logAggregationContext); + assertEquals(1, startResponse.getSuccessfullyStartedContainers().size()); + cm.stop(); + memStore.close(); + verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class)); + } + + private NMContext createContext(YarnConfiguration conf, + NMStateStoreService stateStore) { + NMContext context = new NMContext(new NMContainerTokenSecretManager( + conf), new NMTokenSecretManagerInNM(), null, + new ApplicationACLsManager(conf), stateStore); + + // simulate registration with RM + MasterKey masterKey = new MasterKeyPBImpl(); + masterKey.setKeyId(123); + masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123) + .byteValue() })); + context.getContainerTokenSecretManager().setMasterKey(masterKey); + context.getNMTokenSecretManager().setMasterKey(masterKey); + return context; + } + private StartContainersResponse startContainer(Context context, final ContainerManagerImpl cm, ContainerId cid, ContainerLaunchContext clc, LogAggregationContext logAggregationContext)