YARN-11502. Refactor AMRMProxy#FederationInterceptor#registerApplicationMaster. (#5705)

This commit is contained in:
slfan1989 2023-06-06 06:54:41 +08:00 committed by GitHub
parent e6937d7076
commit 9de13f879a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 56 deletions

View File

@ -91,6 +91,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -570,9 +571,12 @@ private Map<String, Token<AMRMTokenIdentifier>> recoverSubClusterAMRMTokenIdenti
* For the same reason, this method needs to be synchronized.
*/
@Override
public synchronized RegisterApplicationMasterResponse
registerApplicationMaster(RegisterApplicationMasterRequest request)
throws YarnException, IOException {
public synchronized RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException, IOException {
if (request == null) {
throw new YarnException("RegisterApplicationMasterRequest can't be null!");
}
// Reset the heartbeat responseId to zero upon register
synchronized (this.lastAllocateResponseLock) {
@ -590,18 +594,9 @@ private Map<String, Token<AMRMTokenIdentifier>> recoverSubClusterAMRMTokenIdenti
// Save the registration request. This will be used for registering with
// secondary sub-clusters using UAMs, as well as re-register later
this.amRegistrationRequest = request;
if (getNMStateStore() != null) {
try {
RegisterApplicationMasterRequestPBImpl pb =
(RegisterApplicationMasterRequestPBImpl)
RegisterApplicationMasterRequestPBImpl requestPB = (RegisterApplicationMasterRequestPBImpl)
this.amRegistrationRequest;
getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
NMSS_REG_REQUEST_KEY, pb.getProto().toByteArray());
} catch (Exception e) {
LOG.error("Error storing AMRMProxy application context entry for "
+ this.attemptId, e);
}
}
storeAMRMProxyAppContextEntry(NMSS_REG_REQUEST_KEY, requestPB.getProto().toByteArray());
}
/*
@ -625,47 +620,39 @@ private Map<String, Token<AMRMTokenIdentifier>> recoverSubClusterAMRMTokenIdenti
* is running and will breaks the elasticity feature. The registration with
* the other sub-cluster RM will be done lazily as needed later.
*/
this.amRegistrationResponse =
this.homeRMRelayer.registerApplicationMaster(request);
if (this.amRegistrationResponse
.getContainersFromPreviousAttempts() != null) {
cacheAllocatedContainers(
this.amRegistrationResponse.getContainersFromPreviousAttempts(),
this.homeSubClusterId);
this.amRegistrationResponse = this.homeRMRelayer.registerApplicationMaster(request);
if (this.amRegistrationResponse == null) {
throw new YarnException("RegisterApplicationMasterResponse can't be null!");
}
List<Container> containersFromPreviousAttempts =
this.amRegistrationResponse.getContainersFromPreviousAttempts();
if (containersFromPreviousAttempts != null) {
cacheAllocatedContainers(containersFromPreviousAttempts, this.homeSubClusterId);
}
ApplicationId appId = this.attemptId.getApplicationId();
reAttachUAMAndMergeRegisterResponse(this.amRegistrationResponse, appId);
if (getNMStateStore() != null) {
try {
RegisterApplicationMasterResponsePBImpl pb =
(RegisterApplicationMasterResponsePBImpl)
RegisterApplicationMasterResponsePBImpl responsePB = (RegisterApplicationMasterResponsePBImpl)
this.amRegistrationResponse;
getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
NMSS_REG_RESPONSE_KEY, pb.getProto().toByteArray());
} catch (Exception e) {
LOG.error("Error storing AMRMProxy application context entry for "
+ this.attemptId, e);
}
}
storeAMRMProxyAppContextEntry(NMSS_REG_RESPONSE_KEY, responsePB.getProto().toByteArray());
// the queue this application belongs will be used for getting
// AMRMProxy policy from state store.
String queue = this.amRegistrationResponse.getQueue();
if (queue == null) {
LOG.warn("Received null queue for application " + appId
+ " from home subcluster. Will use default queue name "
+ YarnConfiguration.DEFAULT_QUEUE_NAME
+ " for getting AMRMProxyPolicy");
LOG.warn("Received null queue for application {} from home subcluster. " +
" Will use default queue name {} for getting AMRMProxyPolicy.", appId,
YarnConfiguration.DEFAULT_QUEUE_NAME);
} else {
LOG.info("Application " + appId + " belongs to queue " + queue);
LOG.info("Application {} belongs to queue {}.", appId, queue);
}
// Initialize the AMRMProxyPolicy
try {
this.policyInterpreter =
FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
this.policyInterpreter = FederationPolicyUtils.loadAMRMPolicy(queue, this.policyInterpreter,
getConf(), this.federationFacade, this.homeSubClusterId);
} catch (FederationPolicyInitializationException e) {
throw new YarnRuntimeException(e);
@ -673,6 +660,24 @@ private Map<String, Token<AMRMTokenIdentifier>> recoverSubClusterAMRMTokenIdenti
return this.amRegistrationResponse;
}
/**
* Add a context entry for an application attempt in AMRMProxyService.
*
* @param key key string
* @param data state data
*/
private void storeAMRMProxyAppContextEntry(String key, byte[] data) {
NMStateStoreService nmStateStore = getNMStateStore();
if (nmStateStore != null) {
try {
nmStateStore.storeAMRMProxyAppContextEntry(this.attemptId, key, data);
} catch (Exception e) {
LOG.error("Error storing AMRMProxy application context entry[{}] for {}.",
key, this.attemptId, e);
}
}
}
/**
* Sends the heart beats to the home RM and the secondary sub-cluster RMs that
* are being used by the application.

View File

@ -195,10 +195,8 @@ private void deRegisterSubCluster(SubClusterId subClusterId)
private List<Container> getContainersAndAssert(int numberOfResourceRequests,
int numberOfAllocationExcepted) throws Exception {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
List<Container> containers =
new ArrayList<Container>(numberOfResourceRequests);
List<ResourceRequest> askList =
new ArrayList<ResourceRequest>(numberOfResourceRequests);
List<Container> containers = new ArrayList<>(numberOfResourceRequests);
List<ResourceRequest> askList = new ArrayList<>(numberOfResourceRequests);
for (int id = 0; id < numberOfResourceRequests; id++) {
askList.add(createResourceRequest("test-node-" + Integer.toString(id),
6000, 2, id % 5, 1));
@ -269,8 +267,8 @@ private void releaseContainersAndAssert(List<Container> containers)
List<ContainerId> newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
LOG.info("Number of containers received in the original request: "
+ Integer.toString(newlyFinished.size()));
LOG.info("Number of containers received in the original request: {}",
newlyFinished.size());
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
@ -290,10 +288,9 @@ private void releaseContainersAndAssert(List<Container> containers)
newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
LOG.info("Number of containers received in this request: "
+ Integer.toString(newlyFinished.size()));
LOG.info("Total number of containers received: "
+ Integer.toString(containersForReleasedContainerIds.size()));
LOG.info("Number of containers received in this request: {}.", newlyFinished.size());
LOG.info("Total number of containers received: {}.",
containersForReleasedContainerIds.size());
Thread.sleep(10);
}
@ -431,7 +428,7 @@ public Object run() throws Exception {
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
Assert.assertTrue(finishResponse.getIsUnregistered());
return null;
}
});
@ -624,7 +621,7 @@ public Object run() throws Exception {
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
Assert.assertTrue(finishResponse.getIsUnregistered());
// After the application succeeds, the registry/NMSS entry should be
// cleaned up
@ -805,7 +802,7 @@ public Object run() throws Exception {
Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size());
// Generate a duplicate heartbeat from AM, so that it won't really
// trigger an heartbeat to all SC
// trigger a heartbeat to all SC
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
// Set to lastResponseId - 1 so that it will be considered a duplicate
@ -904,7 +901,7 @@ public Object run() throws Exception {
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
Assert.assertTrue(finishResponse.getIsUnregistered());
// After the application succeeds, the registry entry should be deleted
if (interceptor.getRegistryClient() != null) {