YARN-8980. Mapreduce application container start fail after AM restart. (#5975) Contributed by Chenyu Zheng.

Reviewed-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
zhengchenyu 2023-09-09 09:50:53 +08:00 committed by GitHub
parent bf605c8acc
commit c5e9510b54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 176 additions and 12 deletions

View File

@ -98,6 +98,7 @@ import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -260,6 +261,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private final MonotonicClock clock = new MonotonicClock();
/*
* For UAM, keepContainersAcrossApplicationAttempts is always true.
* When re-register to RM, RM will clear node set and regenerate NMToken for transferred
* container. But If keepContainersAcrossApplicationAttempts of AM is false, AM may not
* called getNMTokensFromPreviousAttempts, so the NMToken which is pass from
* RegisterApplicationMasterResponse will be missing. Here we cache these NMToken,
* then pass to AM in allocate stage.
* */
private Set<NMToken> nmTokenMapFromRegisterSecondaryCluster;
/**
* Creates an instance of the FederationInterceptor class.
*/
@ -278,6 +289,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.finishAMCalled = false;
this.lastSCResponseTime = new ConcurrentHashMap<>();
this.lastAMHeartbeatTime = this.clock.getTime();
this.nmTokenMapFromRegisterSecondaryCluster = new ConcurrentHashSet<>();
}
/**
@ -453,6 +465,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// RegisterApplicationMaster
RegisterApplicationMasterResponse response =
this.uamPool.registerApplicationMaster(keyScId, this.amRegistrationRequest);
nmTokenMapFromRegisterSecondaryCluster.addAll(response.getNMTokensFromPreviousAttempts());
// Set sub-cluster to be timed out initially
lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut);
@ -1096,6 +1109,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (registerResponse != null) {
LOG.info("Merging register response for {}", appId);
mergeRegisterResponse(homeResponse, registerResponse);
nmTokenMapFromRegisterSecondaryCluster.addAll(
registerResponse.getNMTokensFromPreviousAttempts());
}
} catch (Exception e) {
LOG.warn("Reattaching UAM failed for ApplicationId: " + appId, e);
@ -1434,6 +1449,17 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
}
// When re-register RM, client may not cache the NMToken from register response.
// Here we pass these NMToken in allocate stage.
if (nmTokenMapFromRegisterSecondaryCluster.size() > 0) {
List<NMToken> duplicateNmToken = new ArrayList(nmTokenMapFromRegisterSecondaryCluster);
nmTokenMapFromRegisterSecondaryCluster.removeAll(duplicateNmToken);
if (!isNullOrEmpty(mergedResponse.getNMTokens())) {
mergedResponse.getNMTokens().addAll(duplicateNmToken);
} else {
mergedResponse.setNMTokens(duplicateNmToken);
}
}
}
/**

View File

@ -168,14 +168,13 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
// and corresponding NM tokens.
if (app.getApplicationSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
// Clear the node set remembered by the secret manager. Necessary
// for UAM restart because we use the same attemptId.
rmContext.getNMTokenSecretManager().clearNodeSetForAttempt(applicationAttemptId);
List<Container> transferredContainers = getScheduler()
.getTransferredContainers(applicationAttemptId);
if (!transferredContainers.isEmpty()) {
response.setContainersFromPreviousAttempts(transferredContainers);
// Clear the node set remembered by the secret manager. Necessary
// for UAM restart because we use the same attemptId.
rmContext.getNMTokenSecretManager()
.clearNodeSetForAttempt(applicationAttemptId);
List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Container container : transferredContainers) {

View File

@ -21,15 +21,19 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
@ -71,6 +75,7 @@ public class TestWorkPreservingUnmanagedAM
MockNM nm =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm.registerNode();
Set<NodeId> tokenCacheClientSide = new HashSet();
// create app and launch the UAM
boolean unamanged = true;
@ -98,14 +103,19 @@ public class TestWorkPreservingUnmanagedAM
// Allocate two containers to UAM
int numContainers = 3;
List<Container> conts = am.allocate("127.0.0.1", 1000, numContainers,
new ArrayList<ContainerId>()).getAllocatedContainers();
AllocateResponse allocateResponse =
am.allocate("127.0.0.1", 1000, numContainers, new ArrayList<ContainerId>());
allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
List<Container> conts = allocateResponse.getAllocatedContainers();
while (conts.size() < numContainers) {
nm.nodeHeartbeat(true);
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
allocateResponse =
am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
conts.addAll(allocateResponse.getAllocatedContainers());
Thread.sleep(100);
}
checkNMTokenForContainer(tokenCacheClientSide, conts);
// Release one container
List<ContainerId> releaseList =
@ -127,6 +137,10 @@ public class TestWorkPreservingUnmanagedAM
RegisterApplicationMasterResponse response = null;
try {
response = am.registerAppAttempt(false);
// When AM restart, it means nmToken in client side should be missing
tokenCacheClientSide.clear();
response.getNMTokensFromPreviousAttempts()
.forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
} catch (InvalidApplicationMasterRequestException e) {
Assert.assertEquals(false, keepContainers);
return;
@ -142,14 +156,124 @@ public class TestWorkPreservingUnmanagedAM
numContainers = 1;
am.allocate("127.0.0.1", 1000, numContainers, new ArrayList<ContainerId>());
nm.nodeHeartbeat(true);
conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
conts = allocateResponse.getAllocatedContainers();
while (conts.size() < numContainers) {
nm.nodeHeartbeat(true);
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
allocateResponse =
am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
conts.addAll(allocateResponse.getAllocatedContainers());
Thread.sleep(100);
}
checkNMTokenForContainer(tokenCacheClientSide, conts);
rm.stop();
}
protected void testUAMRestartWithoutTransferContainer(boolean keepContainers) throws Exception {
// start RM
MockRM rm = new MockRM();
rm.start();
MockNM nm =
new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
nm.registerNode();
Set<NodeId> tokenCacheClientSide = new HashSet();
// create app and launch the UAM
boolean unamanged = true;
int maxAttempts = 1;
boolean waitForAccepted = true;
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(200, rm)
.withAppName("")
.withUser(UserGroupInformation.getCurrentUser().getShortUserName())
.withAcls(null)
.withUnmanagedAM(unamanged)
.withQueue(null)
.withMaxAppAttempts(maxAttempts)
.withCredentials(null)
.withAppType(null)
.withWaitForAppAcceptedState(waitForAccepted)
.withKeepContainers(keepContainers)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
MockAM am = MockRM.launchUAM(app, rm, nm);
// Register for the first time
am.registerAppAttempt();
// Allocate two containers to UAM
int numContainers = 3;
AllocateResponse allocateResponse =
am.allocate("127.0.0.1", 1000, numContainers, new ArrayList<ContainerId>());
allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
List<Container> conts = allocateResponse.getAllocatedContainers();
while (conts.size() < numContainers) {
nm.nodeHeartbeat(true);
allocateResponse =
am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
conts.addAll(allocateResponse.getAllocatedContainers());
Thread.sleep(100);
}
checkNMTokenForContainer(tokenCacheClientSide, conts);
// Release all containers, then there are no transfer containfer app attempt
List<ContainerId> releaseList = new ArrayList();
releaseList.add(conts.get(0).getId());
releaseList.add(conts.get(1).getId());
releaseList.add(conts.get(2).getId());
List<ContainerStatus> finishedConts =
am.allocate(new ArrayList<ResourceRequest>(), releaseList)
.getCompletedContainersStatuses();
while (finishedConts.size() < releaseList.size()) {
nm.nodeHeartbeat(true);
finishedConts
.addAll(am
.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>())
.getCompletedContainersStatuses());
Thread.sleep(100);
}
// Register for the second time
RegisterApplicationMasterResponse response = null;
try {
response = am.registerAppAttempt(false);
// When AM restart, it means nmToken in client side should be missing
tokenCacheClientSide.clear();
response.getNMTokensFromPreviousAttempts()
.forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
} catch (InvalidApplicationMasterRequestException e) {
Assert.assertEquals(false, keepContainers);
return;
}
Assert.assertEquals("RM should not allow second register"
+ " for UAM without keep container flag ", true, keepContainers);
// Expecting the zero running containers previously
Assert.assertEquals(0, response.getContainersFromPreviousAttempts().size());
Assert.assertEquals(0, response.getNMTokensFromPreviousAttempts().size());
// Allocate one more containers to UAM, just to be safe
numContainers = 1;
am.allocate("127.0.0.1", 1000, numContainers, new ArrayList<ContainerId>());
nm.nodeHeartbeat(true);
allocateResponse = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
conts = allocateResponse.getAllocatedContainers();
while (conts.size() < numContainers) {
nm.nodeHeartbeat(true);
allocateResponse =
am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
allocateResponse.getNMTokens().forEach(token -> tokenCacheClientSide.add(token.getNodeId()));
conts.addAll(allocateResponse.getAllocatedContainers());
Thread.sleep(100);
}
checkNMTokenForContainer(tokenCacheClientSide, conts);
rm.stop();
}
@ -164,4 +288,19 @@ public class TestWorkPreservingUnmanagedAM
testUAMRestart(false);
}
@Test(timeout = 600000)
public void testUAMRestartKeepContainersWithoutTransferContainer() throws Exception {
testUAMRestartWithoutTransferContainer(true);
}
@Test(timeout = 600000)
public void testUAMRestartNoKeepContainersWithoutTransferContainer() throws Exception {
testUAMRestartWithoutTransferContainer(false);
}
private void checkNMTokenForContainer(Set<NodeId> cacheToken, List<Container> containers) {
for (Container container : containers) {
Assert.assertTrue(cacheToken.contains(container.getNodeId()));
}
}
}