YARN-11289. [Federation] Improve NM FederationInterceptor removeAppFromRegistry. (#4836)

This commit is contained in:
slfan1989 2022-09-03 01:41:31 +08:00 committed by GitHub
parent 1965708d49
commit 7bf95d7949
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 148 additions and 5 deletions

View File

@ -66,6 +66,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
@ -771,18 +772,25 @@ public FinishApplicationMasterResponse finishApplicationMaster(
if (failedToUnRegister) {
homeResponse.setIsUnregistered(false);
} else {
} else if (checkRequestFinalApplicationStatusSuccess(request)) {
// Clean up UAMs only when the app finishes successfully, so that no more
// attempt will be launched.
this.uamPool.stop();
if (this.registryClient != null) {
this.registryClient
.removeAppFromRegistry(this.attemptId.getApplicationId());
}
removeAppFromRegistry();
}
return homeResponse;
}
private boolean checkRequestFinalApplicationStatusSuccess(
FinishApplicationMasterRequest request) {
if (request != null && request.getFinalApplicationStatus() != null) {
if (request.getFinalApplicationStatus().equals(FinalApplicationStatus.SUCCEEDED)) {
return true;
}
}
return false;
}
@Override
public void setNextInterceptor(RequestInterceptor next) {
throw new YarnRuntimeException(
@ -818,9 +826,21 @@ public void shutdown() {
this.homeHeartbeartHandler.shutdown();
this.homeRMRelayer.shutdown();
// Shutdown needs to clean up app
removeAppFromRegistry();
super.shutdown();
}
private void removeAppFromRegistry() {
if (this.registryClient != null && this.attemptId != null) {
ApplicationId applicationId = this.attemptId.getApplicationId();
if (applicationId != null) {
this.registryClient.removeAppFromRegistry(applicationId);
}
}
}
/**
* Only for unit test cleanup.
*/

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
@ -71,6 +72,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@ -1119,4 +1121,125 @@ public void testBatchFinishApplicationMaster() throws IOException, InterruptedEx
return null;
});
}
@Test
public void testRemoveAppFromRegistryApplicationSuccess()
throws IOException, InterruptedException {
final RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
// Register the application
RegisterApplicationMasterRequest registerReq1 =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq1.setHost(Integer.toString(testAppId));
registerReq1.setRpcPort(0);
registerReq1.setTrackingUrl("");
// Register ApplicationMaster
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq1);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate the first batch of containers, with sc1 active
registerSubCluster(SubClusterId.newInstance("SC-1"));
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(numberOfContainers, containers.size());
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finishResp = interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResp);
Assert.assertTrue(finishResp.getIsUnregistered());
FederationRegistryClient client = interceptor.getRegistryClient();
List<String> applications = client.getAllApplications();
Assert.assertNotNull(finishResp);
Assert.assertEquals(0, applications.size());
return null;
});
}
@Test
public void testRemoveAppFromRegistryApplicationFailed()
throws IOException, InterruptedException {
final RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
// Register the application
RegisterApplicationMasterRequest registerReq1 =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq1.setHost(Integer.toString(testAppId));
registerReq1.setRpcPort(0);
registerReq1.setTrackingUrl("");
// Register ApplicationMaster
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq1);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate the first batch of containers, with sc1 active
registerSubCluster(SubClusterId.newInstance("SC-1"));
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(numberOfContainers, containers.size());
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
// Check Registry Applications
// At this time, the Application should not be cleaned up because the state is not SUCCESS.
FederationRegistryClient client = interceptor.getRegistryClient();
List<String> applications = client.getAllApplications();
Assert.assertNotNull(applications);
Assert.assertEquals(1, applications.size());
// interceptor cleanupRegistry
ApplicationId applicationId = interceptor.getAttemptId().getApplicationId();
client.removeAppFromRegistry(applicationId);
applications = client.getAllApplications();
Assert.assertNotNull(applications);
Assert.assertEquals(0, applications.size());
return null;
});
}
}