YARN-62. Modified NodeManagers to avoid AMs from abusing container tokens for repetitive container launches. Contributed by Omkar Vinit Joshi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1503986 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-07-17 04:24:44 +00:00
parent 1b6324265d
commit 8b9c1e68ab
6 changed files with 125 additions and 78 deletions

View File

@ -68,6 +68,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-820. Fixed an invalid state transition in NodeManager caused by failing YARN-820. Fixed an invalid state transition in NodeManager caused by failing
resource localization. (Mayank Bansal via vinodkv) resource localization. (Mayank Bansal via vinodkv)
YARN-62. Modified NodeManagers to avoid AMs from abusing container tokens for
repetitive container launches. (Omkar Vinit Joshi via vinodkv)
Release 2.1.0-beta - 2013-07-02 Release 2.1.0-beta - 2013-07-02
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -24,8 +24,11 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -166,7 +169,6 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
new ContainersMonitorImpl(exec, dispatcher, this.context); new ContainersMonitorImpl(exec, dispatcher, this.context);
addService(this.containersMonitor); addService(this.containersMonitor);
dispatcher.register(ContainerEventType.class, dispatcher.register(ContainerEventType.class,
new ContainerEventDispatcher()); new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class, dispatcher.register(ApplicationEventType.class,
@ -345,7 +347,7 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
.append(ugi.getUserName()).append(" Found: ") .append(ugi.getUserName()).append(" Found: ")
.append(nmTokenIdentifier.getApplicationAttemptId().toString()); .append(nmTokenIdentifier.getApplicationAttemptId().toString());
} else if (!this.context.getContainerTokenSecretManager() } else if (!this.context.getContainerTokenSecretManager()
.isValidStartContainerRequest(containerId)) { .isValidStartContainerRequest(containerTokenIdentifier)) {
// Is the container being relaunched? Or RPC layer let startCall with // Is the container being relaunched? Or RPC layer let startCall with
// tokens generated off old-secret through? // tokens generated off old-secret through?
unauthorized = true; unauthorized = true;

View File

@ -392,9 +392,6 @@ static class AppCompletelyDoneTransition implements
@Override @Override
public void transition(ApplicationImpl app, ApplicationEvent event) { public void transition(ApplicationImpl app, ApplicationEvent event) {
// Inform the ContainerTokenSecretManager
app.context.getContainerTokenSecretManager().appFinished(app.appId);
// Inform the logService // Inform the logService
app.dispatcher.getEventHandler().handle( app.dispatcher.getEventHandler().handle(
new LogHandlerAppFinishedEvent(app.appId)); new LogHandlerAppFinishedEvent(app.appId));

View File

@ -18,17 +18,17 @@
package org.apache.hadoop.yarn.server.nodemanager.security; package org.apache.hadoop.yarn.server.nodemanager.security;
import java.util.HashMap; import java.util.ArrayList;
import java.util.Map; import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap; import java.util.List;
import java.util.concurrent.ConcurrentMap; import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@ -48,14 +48,15 @@ public class NMContainerTokenSecretManager extends
.getLog(NMContainerTokenSecretManager.class); .getLog(NMContainerTokenSecretManager.class);
private MasterKeyData previousMasterKey; private MasterKeyData previousMasterKey;
private final TreeMap<Long, List<ContainerId>> recentlyStartedContainerTracker;
private final Map<ApplicationId, ConcurrentMap<ContainerId, MasterKeyData>> oldMasterKeys;
private String nodeHostAddr; private String nodeHostAddr;
public NMContainerTokenSecretManager(Configuration conf) { public NMContainerTokenSecretManager(Configuration conf) {
super(conf); super(conf);
this.oldMasterKeys = recentlyStartedContainerTracker =
new HashMap<ApplicationId, ConcurrentMap<ContainerId, MasterKeyData>>(); new TreeMap<Long, List<ContainerId>>();
} }
/** /**
@ -93,9 +94,6 @@ public synchronized void setMasterKey(MasterKey masterKeyRecord) {
public synchronized byte[] retrievePassword( public synchronized byte[] retrievePassword(
ContainerTokenIdentifier identifier) throws SecretManager.InvalidToken { ContainerTokenIdentifier identifier) throws SecretManager.InvalidToken {
int keyId = identifier.getMasterKeyId(); int keyId = identifier.getMasterKeyId();
ContainerId containerId = identifier.getContainerID();
ApplicationId appId =
containerId.getApplicationAttemptId().getApplicationId();
MasterKeyData masterKeyToUse = null; MasterKeyData masterKeyToUse = null;
if (this.previousMasterKey != null if (this.previousMasterKey != null
@ -107,19 +105,6 @@ public synchronized byte[] retrievePassword(
// A container-launch has come in with a token generated off the current // A container-launch has come in with a token generated off the current
// master-key // master-key
masterKeyToUse = super.currentMasterKey; masterKeyToUse = super.currentMasterKey;
} else if (this.oldMasterKeys.containsKey(appId)
&& this.oldMasterKeys.get(appId).containsKey(containerId)) {
// This means on the following happened:
// (1) a stopContainer() or a getStatus() happened for a container with
// token generated off a master-key that is neither current nor the
// previous one.
// (2) a container-relaunch has come in with a token generated off a
// master-key that is neither current nor the previous one.
// This basically lets stop and getStatus() calls with old-tokens to pass
// through without any issue, i.e. (1).
// Start-calls for repetitive launches (2) also pass through RPC here, but
// get thwarted at the app-layer as part of startContainer() call.
masterKeyToUse = this.oldMasterKeys.get(appId).get(containerId);
} }
if (nodeHostAddr != null if (nodeHostAddr != null
@ -143,59 +128,62 @@ public synchronized byte[] retrievePassword(
} }
/** /**
* Container start has gone through. Store the corresponding keys so that * Container start has gone through. We need to store the containerId in order
* stopContainer() and getContainerStatus() can be authenticated long after * to block future container start requests with same container token. This
* the container-start went through. * container token needs to be saved till its container token expires.
*/ */
public synchronized void startContainerSuccessful( public synchronized void startContainerSuccessful(
ContainerTokenIdentifier tokenId) { ContainerTokenIdentifier tokenId) {
int keyId = tokenId.getMasterKeyId();
if (currentMasterKey.getMasterKey().getKeyId() == keyId) { removeAnyContainerTokenIfExpired();
addKeyForContainerId(tokenId.getContainerID(), currentMasterKey);
} else if (previousMasterKey != null Long expTime = tokenId.getExpiryTimeStamp();
&& previousMasterKey.getMasterKey().getKeyId() == keyId) { // We might have multiple containers with same expiration time.
addKeyForContainerId(tokenId.getContainerID(), previousMasterKey); if (!recentlyStartedContainerTracker.containsKey(expTime)) {
recentlyStartedContainerTracker
.put(expTime, new ArrayList<ContainerId>());
}
recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID());
}
protected synchronized void removeAnyContainerTokenIfExpired() {
// Trying to remove any container if its container token has expired.
Iterator<Entry<Long, List<ContainerId>>> containersI =
this.recentlyStartedContainerTracker.entrySet().iterator();
Long currTime = System.currentTimeMillis();
while (containersI.hasNext()) {
Entry<Long, List<ContainerId>> containerEntry = containersI.next();
if (containerEntry.getKey() < currTime) {
containersI.remove();
} else {
break;
}
} }
} }
/** /**
* Ensure the startContainer call is not using an older cached key. Will * Container will be remembered based on expiration time of the container
* return false once startContainerSuccessful is called. Does not check * token used for starting the container. It is safe to use expiration time
* the actual key being current since that is verified by the security layer * as there is one to many mapping between expiration time and containerId.
* via retrievePassword. * @return true if the current token identifier is not present in cache.
*/ */
public synchronized boolean isValidStartContainerRequest( public synchronized boolean isValidStartContainerRequest(
ContainerId containerID) { ContainerTokenIdentifier containerTokenIdentifier) {
ApplicationId applicationId =
containerID.getApplicationAttemptId().getApplicationId();
return !this.oldMasterKeys.containsKey(applicationId)
|| !this.oldMasterKeys.get(applicationId).containsKey(containerID);
}
private synchronized void addKeyForContainerId(ContainerId containerId, removeAnyContainerTokenIfExpired();
MasterKeyData masterKeyData) {
if (containerId != null) { Long expTime = containerTokenIdentifier.getExpiryTimeStamp();
ApplicationId appId = List<ContainerId> containers =
containerId.getApplicationAttemptId().getApplicationId(); this.recentlyStartedContainerTracker.get(expTime);
if (!this.oldMasterKeys.containsKey(appId)) { if (containers == null
this.oldMasterKeys.put(appId, || !containers.contains(containerTokenIdentifier.getContainerID())) {
new ConcurrentHashMap<ContainerId, MasterKeyData>()); return true;
}
ConcurrentMap<ContainerId, MasterKeyData> containerIdToKeysMapForThisApp =
this.oldMasterKeys.get(appId);
containerIdToKeysMapForThisApp.put(containerId, masterKeyData);
} else { } else {
LOG.warn("Not adding key for null containerId"); return false;
} }
} }
// Holding on to master-keys corresponding to containers until the app is
// finished due to the multiple ways a container can finish. Avoid
// stopContainer calls seeing unnecessary authorization exceptions.
public synchronized void appFinished(ApplicationId appId) {
this.oldMasterKeys.remove(appId);
}
public synchronized void setNodeId(NodeId nodeId) { public synchronized void setNodeId(NodeId nodeId) {
nodeHostAddr = nodeId.toString(); nodeHostAddr = nodeId.toString();
LOG.info("Updating node address : " + nodeHostAddr); LOG.info("Updating node address : " + nodeHostAddr);

View File

@ -29,6 +29,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import junit.framework.Assert; import junit.framework.Assert;
@ -266,8 +267,11 @@ public void testAppFinishedOnRunningContainers() {
wa.appResourcesCleanedup(); wa.appResourcesCleanedup();
for (Container container : wa.containers) { for (Container container : wa.containers) {
ContainerTokenIdentifier identifier =
wa.getContainerTokenIdentifier(container.getContainerId());
waitForContainerTokenToExpire(identifier);
Assert.assertTrue(wa.context.getContainerTokenSecretManager() Assert.assertTrue(wa.context.getContainerTokenSecretManager()
.isValidStartContainerRequest(container.getContainerId())); .isValidStartContainerRequest(identifier));
} }
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
@ -277,6 +281,18 @@ public void testAppFinishedOnRunningContainers() {
} }
} }
protected ContainerTokenIdentifier waitForContainerTokenToExpire(
ContainerTokenIdentifier identifier) {
int attempts = 5;
while (System.currentTimeMillis() < identifier.getExpiryTimeStamp()
&& attempts-- > 0) {
try {
Thread.sleep(1000);
} catch (Exception e) {}
}
return identifier;
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testAppFinishedOnCompletedContainers() { public void testAppFinishedOnCompletedContainers() {
@ -306,8 +322,11 @@ public void testAppFinishedOnCompletedContainers() {
wa.appResourcesCleanedup(); wa.appResourcesCleanedup();
for ( Container container : wa.containers) { for ( Container container : wa.containers) {
ContainerTokenIdentifier identifier =
wa.getContainerTokenIdentifier(container.getContainerId());
waitForContainerTokenToExpire(identifier);
Assert.assertTrue(wa.context.getContainerTokenSecretManager() Assert.assertTrue(wa.context.getContainerTokenSecretManager()
.isValidStartContainerRequest(container.getContainerId())); .isValidStartContainerRequest(identifier));
} }
assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState()); assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
} finally { } finally {
@ -440,6 +459,7 @@ private class WrappedApplication {
final String user; final String user;
final List<Container> containers; final List<Container> containers;
final Context context; final Context context;
final Map<ContainerId, ContainerTokenIdentifier> containerTokenIdentifierMap;
final ApplicationId appId; final ApplicationId appId;
final Application app; final Application app;
@ -448,6 +468,8 @@ private class WrappedApplication {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
dispatcher = new DrainDispatcher(); dispatcher = new DrainDispatcher();
containerTokenIdentifierMap =
new HashMap<ContainerId, ContainerTokenIdentifier>();
dispatcher.init(conf); dispatcher.init(conf);
localizerBus = mock(EventHandler.class); localizerBus = mock(EventHandler.class);
@ -486,11 +508,15 @@ private class WrappedApplication {
Container container = createMockedContainer(this.appId, i); Container container = createMockedContainer(this.appId, i);
containers.add(container); containers.add(container);
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
ContainerTokenIdentifier identifier =
new ContainerTokenIdentifier(container.getContainerId(), "", "",
null, currentTime + 2000, masterKey.getKeyId(), currentTime);
containerTokenIdentifierMap
.put(identifier.getContainerID(), identifier);
context.getContainerTokenSecretManager().startContainerSuccessful( context.getContainerTokenSecretManager().startContainerSuccessful(
new ContainerTokenIdentifier(container.getContainerId(), "", identifier);
"", null, currentTime + 1000, masterKey.getKeyId(), currentTime));
Assert.assertFalse(context.getContainerTokenSecretManager() Assert.assertFalse(context.getContainerTokenSecretManager()
.isValidStartContainerRequest(container.getContainerId())); .isValidStartContainerRequest(identifier));
} }
dispatcher.start(); dispatcher.start();
@ -542,6 +568,11 @@ public void appResourcesCleanedup() {
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)); ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
drainDispatcherEvents(); drainDispatcherEvents();
} }
public ContainerTokenIdentifier getContainerTokenIdentifier(
ContainerId containerId) {
return this.containerTokenIdentifierMap.get(containerId);
}
} }
private Container createMockedContainer(ApplicationId appId, int containerId) { private Container createMockedContainer(ApplicationId appId, int containerId) {

View File

@ -42,12 +42,12 @@
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
@ -212,11 +212,24 @@ private void testNMTokens(Configuration conf) throws Exception {
validContainerToken, invalidNMToken, true).contains(sb.toString())); validContainerToken, invalidNMToken, true).contains(sb.toString()));
// using correct tokens. nmtoken for app attempt should get saved. // using correct tokens. nmtoken for app attempt should get saved.
conf.setInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
4 * 60 * 1000);
validContainerToken =
containerTokenSecretManager.createContainerToken(validContainerId,
validNode, user, r);
testStartContainer(rpc, validAppAttemptId, validNode, validContainerToken, testStartContainer(rpc, validAppAttemptId, validNode, validContainerToken,
validNMToken, false); validNMToken, false);
Assert.assertTrue(nmTokenSecretManagerNM Assert.assertTrue(nmTokenSecretManagerNM
.isAppAttemptNMTokenKeyPresent(validAppAttemptId)); .isAppAttemptNMTokenKeyPresent(validAppAttemptId));
//Now lets wait till container finishes and is removed from node manager.
waitForContainerToFinishOnNM(validContainerId);
sb = new StringBuilder("Attempt to relaunch the same container with id ");
sb.append(validContainerId);
Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode,
validContainerToken, validNMToken, true).contains(sb.toString()));
// Rolling over master key twice so that we can check whether older keys // Rolling over master key twice so that we can check whether older keys
// are used for authentication. // are used for authentication.
rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM);
@ -233,6 +246,19 @@ private void testNMTokens(Configuration conf) throws Exception {
} }
private void waitForContainerToFinishOnNM(ContainerId containerId) {
Context nmContet = yarnCluster.getNodeManager(0).getNMContext();
int interval = 4 * 60; // Max time for container token to expire.
while ((interval-- > 0)
&& nmContet.getContainers().containsKey(containerId)) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
Assert.assertFalse(nmContet.getContainers().containsKey(containerId));
}
protected void waitForNMToReceiveNMTokenKey( protected void waitForNMToReceiveNMTokenKey(
NMTokenSecretManagerInNM nmTokenSecretManagerNM, NodeManager nm) NMTokenSecretManagerInNM nmTokenSecretManagerNM, NodeManager nm)
throws InterruptedException { throws InterruptedException {