From 8b9c1e68ab33b8d7720dbca9d9de9e92f9b6b447 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 17 Jul 2013 04:24:44 +0000 Subject: [PATCH] YARN-62. Modified NodeManagers to avoid AMs from abusing container tokens for repetitive container launches. Contributed by Omkar Vinit Joshi. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1503986 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../ContainerManagerImpl.java | 6 +- .../application/ApplicationImpl.java | 3 - .../NMContainerTokenSecretManager.java | 116 ++++++++---------- .../application/TestApplication.java | 45 +++++-- .../server/TestContainerManagerSecurity.java | 30 ++++- 6 files changed, 125 insertions(+), 78 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d48ccfe8e2..9f6a90385d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -68,6 +68,9 @@ Release 2.1.1-beta - UNRELEASED YARN-820. Fixed an invalid state transition in NodeManager caused by failing resource localization. (Mayank Bansal via vinodkv) + YARN-62. Modified NodeManagers to avoid AMs from abusing container tokens for + repetitive container launches. (Omkar Vinit Joshi via vinodkv) + Release 2.1.0-beta - 2013-07-02 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 93addb235c..841e9a7135 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -24,8 +24,11 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -166,7 +169,6 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec, new ContainersMonitorImpl(exec, dispatcher, this.context); addService(this.containersMonitor); - dispatcher.register(ContainerEventType.class, new ContainerEventDispatcher()); dispatcher.register(ApplicationEventType.class, @@ -345,7 +347,7 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, .append(ugi.getUserName()).append(" Found: ") .append(nmTokenIdentifier.getApplicationAttemptId().toString()); } else if (!this.context.getContainerTokenSecretManager() - .isValidStartContainerRequest(containerId)) { + .isValidStartContainerRequest(containerTokenIdentifier)) { // Is the container being relaunched? Or RPC layer let startCall with // tokens generated off old-secret through? unauthorized = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index 7a2ccb8ea6..104896568b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -392,9 +392,6 @@ static class AppCompletelyDoneTransition implements @Override public void transition(ApplicationImpl app, ApplicationEvent event) { - // Inform the ContainerTokenSecretManager - app.context.getContainerTokenSecretManager().appFinished(app.appId); - // Inform the logService app.dispatcher.getEventHandler().handle( new LogHandlerAppFinishedEvent(app.appId)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java index bc349f4960..8860a95252 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java @@ -18,17 +18,17 @@ package org.apache.hadoop.yarn.server.nodemanager.security; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; @@ -48,14 +48,15 @@ public class NMContainerTokenSecretManager extends .getLog(NMContainerTokenSecretManager.class); private MasterKeyData previousMasterKey; + private final TreeMap> recentlyStartedContainerTracker; + - private final Map> oldMasterKeys; private String nodeHostAddr; public NMContainerTokenSecretManager(Configuration conf) { super(conf); - this.oldMasterKeys = - new HashMap>(); + recentlyStartedContainerTracker = + new TreeMap>(); } /** @@ -93,9 +94,6 @@ public synchronized void setMasterKey(MasterKey masterKeyRecord) { public synchronized byte[] retrievePassword( ContainerTokenIdentifier identifier) throws SecretManager.InvalidToken { int keyId = identifier.getMasterKeyId(); - ContainerId containerId = identifier.getContainerID(); - ApplicationId appId = - containerId.getApplicationAttemptId().getApplicationId(); MasterKeyData masterKeyToUse = null; if (this.previousMasterKey != null @@ -107,19 +105,6 @@ public synchronized byte[] retrievePassword( // A container-launch has come in with a token generated off the current // master-key masterKeyToUse = super.currentMasterKey; - } else if (this.oldMasterKeys.containsKey(appId) - && this.oldMasterKeys.get(appId).containsKey(containerId)) { - // This means on the following happened: - // (1) a stopContainer() or a getStatus() happened for a container with - // token generated off a master-key that is neither current nor the - // previous one. - // (2) a container-relaunch has come in with a token generated off a - // master-key that is neither current nor the previous one. - // This basically lets stop and getStatus() calls with old-tokens to pass - // through without any issue, i.e. (1). - // Start-calls for repetitive launches (2) also pass through RPC here, but - // get thwarted at the app-layer as part of startContainer() call. - masterKeyToUse = this.oldMasterKeys.get(appId).get(containerId); } if (nodeHostAddr != null @@ -143,61 +128,64 @@ public synchronized byte[] retrievePassword( } /** - * Container start has gone through. Store the corresponding keys so that - * stopContainer() and getContainerStatus() can be authenticated long after - * the container-start went through. + * Container start has gone through. We need to store the containerId in order + * to block future container start requests with same container token. This + * container token needs to be saved till its container token expires. */ public synchronized void startContainerSuccessful( ContainerTokenIdentifier tokenId) { - int keyId = tokenId.getMasterKeyId(); - if (currentMasterKey.getMasterKey().getKeyId() == keyId) { - addKeyForContainerId(tokenId.getContainerID(), currentMasterKey); - } else if (previousMasterKey != null - && previousMasterKey.getMasterKey().getKeyId() == keyId) { - addKeyForContainerId(tokenId.getContainerID(), previousMasterKey); + + removeAnyContainerTokenIfExpired(); + + Long expTime = tokenId.getExpiryTimeStamp(); + // We might have multiple containers with same expiration time. + if (!recentlyStartedContainerTracker.containsKey(expTime)) { + recentlyStartedContainerTracker + .put(expTime, new ArrayList()); + } + recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID()); + + } + + protected synchronized void removeAnyContainerTokenIfExpired() { + // Trying to remove any container if its container token has expired. + Iterator>> containersI = + this.recentlyStartedContainerTracker.entrySet().iterator(); + Long currTime = System.currentTimeMillis(); + while (containersI.hasNext()) { + Entry> containerEntry = containersI.next(); + if (containerEntry.getKey() < currTime) { + containersI.remove(); + } else { + break; + } } } /** - * Ensure the startContainer call is not using an older cached key. Will - * return false once startContainerSuccessful is called. Does not check - * the actual key being current since that is verified by the security layer - * via retrievePassword. + * Container will be remembered based on expiration time of the container + * token used for starting the container. It is safe to use expiration time + * as there is one to many mapping between expiration time and containerId. + * @return true if the current token identifier is not present in cache. */ public synchronized boolean isValidStartContainerRequest( - ContainerId containerID) { - ApplicationId applicationId = - containerID.getApplicationAttemptId().getApplicationId(); - return !this.oldMasterKeys.containsKey(applicationId) - || !this.oldMasterKeys.get(applicationId).containsKey(containerID); - } + ContainerTokenIdentifier containerTokenIdentifier) { - private synchronized void addKeyForContainerId(ContainerId containerId, - MasterKeyData masterKeyData) { - if (containerId != null) { - ApplicationId appId = - containerId.getApplicationAttemptId().getApplicationId(); - if (!this.oldMasterKeys.containsKey(appId)) { - this.oldMasterKeys.put(appId, - new ConcurrentHashMap()); - } - ConcurrentMap containerIdToKeysMapForThisApp = - this.oldMasterKeys.get(appId); - containerIdToKeysMapForThisApp.put(containerId, masterKeyData); + removeAnyContainerTokenIfExpired(); + + Long expTime = containerTokenIdentifier.getExpiryTimeStamp(); + List containers = + this.recentlyStartedContainerTracker.get(expTime); + if (containers == null + || !containers.contains(containerTokenIdentifier.getContainerID())) { + return true; } else { - LOG.warn("Not adding key for null containerId"); + return false; } } - // Holding on to master-keys corresponding to containers until the app is - // finished due to the multiple ways a container can finish. Avoid - // stopContainer calls seeing unnecessary authorization exceptions. - public synchronized void appFinished(ApplicationId appId) { - this.oldMasterKeys.remove(appId); - } - public synchronized void setNodeId(NodeId nodeId) { nodeHostAddr = nodeId.toString(); LOG.info("Updating node address : " + nodeHostAddr); - } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index a11995525b..63d90c94d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import junit.framework.Assert; @@ -265,9 +266,12 @@ public void testAppFinishedOnRunningContainers() { AuxServicesEventType.APPLICATION_STOP, wa.appId))); wa.appResourcesCleanedup(); - for ( Container container : wa.containers) { + for (Container container : wa.containers) { + ContainerTokenIdentifier identifier = + wa.getContainerTokenIdentifier(container.getContainerId()); + waitForContainerTokenToExpire(identifier); Assert.assertTrue(wa.context.getContainerTokenSecretManager() - .isValidStartContainerRequest(container.getContainerId())); + .isValidStartContainerRequest(identifier)); } assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); @@ -277,6 +281,18 @@ public void testAppFinishedOnRunningContainers() { } } + protected ContainerTokenIdentifier waitForContainerTokenToExpire( + ContainerTokenIdentifier identifier) { + int attempts = 5; + while (System.currentTimeMillis() < identifier.getExpiryTimeStamp() + && attempts-- > 0) { + try { + Thread.sleep(1000); + } catch (Exception e) {} + } + return identifier; + } + @Test @SuppressWarnings("unchecked") public void testAppFinishedOnCompletedContainers() { @@ -306,8 +322,11 @@ public void testAppFinishedOnCompletedContainers() { wa.appResourcesCleanedup(); for ( Container container : wa.containers) { + ContainerTokenIdentifier identifier = + wa.getContainerTokenIdentifier(container.getContainerId()); + waitForContainerTokenToExpire(identifier); Assert.assertTrue(wa.context.getContainerTokenSecretManager() - .isValidStartContainerRequest(container.getContainerId())); + .isValidStartContainerRequest(identifier)); } assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); } finally { @@ -440,7 +459,8 @@ private class WrappedApplication { final String user; final List containers; final Context context; - + final Map containerTokenIdentifierMap; + final ApplicationId appId; final Application app; @@ -448,6 +468,8 @@ private class WrappedApplication { Configuration conf = new Configuration(); dispatcher = new DrainDispatcher(); + containerTokenIdentifierMap = + new HashMap(); dispatcher.init(conf); localizerBus = mock(EventHandler.class); @@ -486,11 +508,15 @@ private class WrappedApplication { Container container = createMockedContainer(this.appId, i); containers.add(container); long currentTime = System.currentTimeMillis(); + ContainerTokenIdentifier identifier = + new ContainerTokenIdentifier(container.getContainerId(), "", "", + null, currentTime + 2000, masterKey.getKeyId(), currentTime); + containerTokenIdentifierMap + .put(identifier.getContainerID(), identifier); context.getContainerTokenSecretManager().startContainerSuccessful( - new ContainerTokenIdentifier(container.getContainerId(), "", - "", null, currentTime + 1000, masterKey.getKeyId(), currentTime)); + identifier); Assert.assertFalse(context.getContainerTokenSecretManager() - .isValidStartContainerRequest(container.getContainerId())); + .isValidStartContainerRequest(identifier)); } dispatcher.start(); @@ -542,6 +568,11 @@ public void appResourcesCleanedup() { ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)); drainDispatcherEvents(); } + + public ContainerTokenIdentifier getContainerTokenIdentifier( + ContainerId containerId) { + return this.containerTokenIdentifierMap.get(containerId); + } } private Container createMockedContainer(ApplicationId appId, int containerId) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java index b78a1e51e1..7781d50c8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java @@ -42,12 +42,12 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; @@ -211,12 +211,25 @@ private void testNMTokens(Configuration conf) throws Exception { Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode, validContainerToken, invalidNMToken, true).contains(sb.toString())); - // using correct tokens. nmtoken for appattempt should get saved. + // using correct tokens. nmtoken for app attempt should get saved. + conf.setInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS, + 4 * 60 * 1000); + validContainerToken = + containerTokenSecretManager.createContainerToken(validContainerId, + validNode, user, r); + testStartContainer(rpc, validAppAttemptId, validNode, validContainerToken, validNMToken, false); Assert.assertTrue(nmTokenSecretManagerNM .isAppAttemptNMTokenKeyPresent(validAppAttemptId)); + //Now lets wait till container finishes and is removed from node manager. + waitForContainerToFinishOnNM(validContainerId); + sb = new StringBuilder("Attempt to relaunch the same container with id "); + sb.append(validContainerId); + Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode, + validContainerToken, validNMToken, true).contains(sb.toString())); + // Rolling over master key twice so that we can check whether older keys // are used for authentication. rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); @@ -233,6 +246,19 @@ private void testNMTokens(Configuration conf) throws Exception { } + private void waitForContainerToFinishOnNM(ContainerId containerId) { + Context nmContet = yarnCluster.getNodeManager(0).getNMContext(); + int interval = 4 * 60; // Max time for container token to expire. + while ((interval-- > 0) + && nmContet.getContainers().containsKey(containerId)) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + Assert.assertFalse(nmContet.getContainers().containsKey(containerId)); + } + protected void waitForNMToReceiveNMTokenKey( NMTokenSecretManagerInNM nmTokenSecretManagerNM, NodeManager nm) throws InterruptedException {