YARN-1839. Fixed handling of NMTokens in ResourceManager such that containers launched by AMs running on the same machine as the AM are correctly propagated. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1578631 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-17 22:22:15 +00:00
parent 81a456e638
commit 92317e3459
7 changed files with 93 additions and 21 deletions

View File

@ -505,6 +505,10 @@ Release 2.4.0 - UNRELEASED
manner and thus fix failure of TestResourceTrackerService. (Tsuyoshi Ozawa manner and thus fix failure of TestResourceTrackerService. (Tsuyoshi Ozawa
via vinodkv) via vinodkv)
YARN-1839. Fixed handling of NMTokens in ResourceManager such that containers
launched by AMs running on the same machine as the AM are correctly
propagated. (Jian He via vinodkv)
Release 2.3.1 - UNRELEASED Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -294,13 +294,9 @@ protected void populateNMTokens(List<NMToken> nmTokens) {
for (NMToken token : nmTokens) { for (NMToken token : nmTokens) {
String nodeId = token.getNodeId().toString(); String nodeId = token.getNodeId().toString();
if (getNMTokenCache().containsToken(nodeId)) { if (getNMTokenCache().containsToken(nodeId)) {
if (LOG.isDebugEnabled()) { LOG.info("Replacing token for : " + nodeId);
LOG.debug("Replacing token for : " + nodeId);
}
} else { } else {
if (LOG.isDebugEnabled()) { LOG.info("Received new token for : " + nodeId);
LOG.debug("Received new token for : " + nodeId);
}
} }
getNMTokenCache().setToken(nodeId, token.getToken()); getNMTokenCache().setToken(nodeId, token.getToken());
} }

View File

@ -831,9 +831,18 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
appAttempt.retryFetchingAMContainer(appAttempt); appAttempt.retryFetchingAMContainer(appAttempt);
return RMAppAttemptState.SCHEDULED; return RMAppAttemptState.SCHEDULED;
} }
// Set the masterContainer // Set the masterContainer
appAttempt.setMasterContainer(amContainerAllocation.getContainers() appAttempt.setMasterContainer(amContainerAllocation.getContainers()
.get(0)); .get(0));
// The node set in NMTokenSecrentManager is used for marking whether the
// NMToken has been issued for this node to the AM.
// When AM container was allocated to RM itself, the node which allocates
// this AM container was marked as the NMToken already sent. Thus,
// clear this node set so that the following allocate requests from AM are
// able to retrieve the corresponding NMToken.
appAttempt.rmContext.getNMTokenSecretManager()
.clearNodeSetForAttempt(appAttempt.applicationAttemptId);
appAttempt.getSubmissionContext().setResource( appAttempt.getSubmissionContext().setResource(
appAttempt.getMasterContainer().getResource()); appAttempt.getMasterContainer().getResource());
appAttempt.storeAttempt(); appAttempt.storeAttempt();

View File

@ -902,7 +902,8 @@ private synchronized void completedContainer(RMContainer rmContainer,
} }
@Lock(Lock.NoLock.class) @Lock(Lock.NoLock.class)
FiCaSchedulerApp getApplicationAttempt( @VisibleForTesting
public FiCaSchedulerApp getApplicationAttempt(
ApplicationAttemptId applicationAttemptId) { ApplicationAttemptId applicationAttemptId) {
SchedulerApplication app = SchedulerApplication app =
applications.get(applicationAttemptId.getApplicationId()); applications.get(applicationAttemptId.getApplicationId());

View File

@ -138,6 +138,19 @@ public void activateNextMasterKey() {
} }
} }
public void clearNodeSetForAttempt(ApplicationAttemptId attemptId) {
super.writeLock.lock();
try {
HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(attemptId);
if (nodeSet != null) {
LOG.info("Clear node set for " + attemptId);
nodeSet.clear();
}
} finally {
super.writeLock.unlock();
}
}
private void clearApplicationNMTokenKeys() { private void clearApplicationNMTokenKeys() {
// We should clear all node entries from this set. // We should clear all node entries from this set.
// TODO : Once we have per node master key then it will change to only // TODO : Once we have per node master key then it will change to only
@ -184,22 +197,13 @@ public NMToken createAndGetNMToken(String applicationSubmitter,
NMToken nmToken = null; NMToken nmToken = null;
if (nodeSet != null) { if (nodeSet != null) {
if (!nodeSet.contains(container.getNodeId())) { if (!nodeSet.contains(container.getNodeId())) {
if (LOG.isDebugEnabled()) { LOG.info("Sending NMToken for nodeId : " + container.getNodeId()
LOG.debug("Sending NMToken for nodeId : " + " for container : " + container.getId());
+ container.getNodeId().toString()
+ " for application attempt : " + appAttemptId.toString());
}
Token token = Token token =
createNMToken(container.getId().getApplicationAttemptId(), createNMToken(container.getId().getApplicationAttemptId(),
container.getNodeId(), applicationSubmitter); container.getNodeId(), applicationSubmitter);
nmToken = NMToken.newInstance(container.getNodeId(), token); nmToken = NMToken.newInstance(container.getNodeId(), token);
// The node set here is used for differentiating whether the NMToken nodeSet.add(container.getNodeId());
// has been issued for this node from the client's perspective. If
// this is an AM container, the NMToken is issued only for RM and so
// we should not update the node set.
if (container.getId().getId() != 1) {
nodeSet.add(container.getNodeId());
}
} }
} }
return nmToken; return nmToken;

View File

@ -63,6 +63,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
@ -171,7 +172,60 @@ public void testAppOnMultiNode() throws Exception {
rm.stop(); rm.stop();
} }
// Test even if AM container is allocated with containerId not equal to 1, the
// following allocate requests from AM should be able to retrieve the
// corresponding NM Token.
@Test (timeout = 20000)
public void testNMTokenSentForNormalContainer() throws Exception {
MockRM rm = new MockRM();
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
RMApp app = rm.submitApp(2000);
RMAppAttempt attempt = app.getCurrentAppAttempt();
// Call getNewContainerId to increase container Id so that the AM container
// Id doesn't equal to one.
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
cs.getApplicationAttempt(attempt.getAppAttemptId()).getNewContainerId();
// kick the scheduling
nm1.nodeHeartbeat(true);
MockAM am = MockRM.launchAM(app, rm, nm1);
// am container Id not equal to 1.
Assert.assertTrue(attempt.getMasterContainer().getId().getId() != 1);
// NMSecretManager doesn't record the node on which the am is allocated.
Assert.assertFalse(rm.getRMNMTokenSecretManager()
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
nm1.getNodeId()));
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
int NUM_CONTAINERS = 1;
List<Container> containers = new ArrayList<Container>();
// nmTokens keeps track of all the nmTokens issued in the allocate call.
List<NMToken> expectedNMTokens = new ArrayList<NMToken>();
// am1 allocate 1 container on nm1.
while (true) {
AllocateResponse response =
am.allocate("127.0.0.1", 2000, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
containers.addAll(response.getAllocatedContainers());
expectedNMTokens.addAll(response.getNMTokens());
if (containers.size() == NUM_CONTAINERS) {
break;
}
Thread.sleep(200);
System.out.println("Waiting for container to be allocated.");
}
NodeId nodeId = expectedNMTokens.get(0).getNodeId();
// NMToken is sent for the allocated container.
Assert.assertEquals(nm1.getNodeId(), nodeId);
}
@Test (timeout = 40000) @Test (timeout = 40000)
public void testNMToken() throws Exception { public void testNMToken() throws Exception {
MockRM rm = new MockRM(); MockRM rm = new MockRM();

View File

@ -133,6 +133,8 @@ public class TestRMAppAttemptTransitions {
private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf)); private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
private ClientToAMTokenSecretManagerInRM clientToAMTokenManager = private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
spy(new ClientToAMTokenSecretManagerInRM()); spy(new ClientToAMTokenSecretManagerInRM());
private NMTokenSecretManagerInRM nmTokenManager =
spy(new NMTokenSecretManagerInRM(conf));
private boolean transferStateFromPreviousAttempt = false; private boolean transferStateFromPreviousAttempt = false;
private final class TestApplicationAttemptEventDispatcher implements private final class TestApplicationAttemptEventDispatcher implements
@ -224,7 +226,7 @@ public void setUp() throws Exception {
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, amRMTokenManager, null, amRMTokenManager,
new RMContainerTokenSecretManager(conf), new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf), nmTokenManager,
clientToAMTokenManager, clientToAMTokenManager,
writer); writer);
@ -443,6 +445,8 @@ private void testAppAttemptAllocatedState(Container amContainer) {
any( any(
ApplicationAttemptId.class), any(List.class), any(List.class), ApplicationAttemptId.class), any(List.class), any(List.class),
any(List.class), any(List.class)); any(List.class), any(List.class));
verify(nmTokenManager).clearNodeSetForAttempt(
applicationAttempt.getAppAttemptId());
} }
/** /**