YARN-8760. [AMRMProxy] Fix concurrent re-register due to YarnRM failover in AMRMClientRelayer. Contributed by Botong Huang.
This commit is contained in:
parent
cc80ac2315
commit
59d5af21b7
@ -237,6 +237,27 @@ public class AMRMClientRelayer extends AbstractService
|
|||||||
return this.rmClient.registerApplicationMaster(request);
|
return this.rmClient.registerApplicationMaster(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* After an RM failover, there might be more than one
|
||||||
|
* allocate/finishApplicationMaster call thread (due to RPC timeout and retry)
|
||||||
|
* doing the auto re-register concurrently. As a result, we need to swallow
|
||||||
|
* the already register exception thrown by the new RM.
|
||||||
|
*/
|
||||||
|
private void reRegisterApplicationMaster(
|
||||||
|
RegisterApplicationMasterRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
try {
|
||||||
|
registerApplicationMaster(request);
|
||||||
|
} catch (InvalidApplicationMasterRequestException e) {
|
||||||
|
if (e.getMessage()
|
||||||
|
.contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) {
|
||||||
|
LOG.info("Concurrent thread successfully re-registered, moving on.");
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FinishApplicationMasterResponse finishApplicationMaster(
|
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||||
FinishApplicationMasterRequest request)
|
FinishApplicationMasterRequest request)
|
||||||
@ -247,7 +268,7 @@ public class AMRMClientRelayer extends AbstractService
|
|||||||
LOG.warn("Out of sync with RM " + rmId
|
LOG.warn("Out of sync with RM " + rmId
|
||||||
+ " for " + this.appId + ", hence resyncing.");
|
+ " for " + this.appId + ", hence resyncing.");
|
||||||
// re register with RM
|
// re register with RM
|
||||||
registerApplicationMaster(this.amRegistrationRequest);
|
reRegisterApplicationMaster(this.amRegistrationRequest);
|
||||||
return finishApplicationMaster(request);
|
return finishApplicationMaster(request);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -381,7 +402,7 @@ public class AMRMClientRelayer extends AbstractService
|
|||||||
}
|
}
|
||||||
|
|
||||||
// re-register with RM, then retry allocate recursively
|
// re-register with RM, then retry allocate recursively
|
||||||
registerApplicationMaster(this.amRegistrationRequest);
|
reRegisterApplicationMaster(this.amRegistrationRequest);
|
||||||
// Reset responseId after re-register
|
// Reset responseId after re-register
|
||||||
allocateRequest.setResponseId(0);
|
allocateRequest.setResponseId(0);
|
||||||
allocateResponse = allocate(allocateRequest);
|
allocateResponse = allocate(allocateRequest);
|
||||||
|
@ -64,6 +64,11 @@ public class TestAMRMClientRelayer {
|
|||||||
// Whether this mockRM will throw failover exception upon next heartbeat
|
// Whether this mockRM will throw failover exception upon next heartbeat
|
||||||
// from AM
|
// from AM
|
||||||
private boolean failover = false;
|
private boolean failover = false;
|
||||||
|
|
||||||
|
// Whether this mockRM will throw application already registered exception
|
||||||
|
// upon next registerApplicationMaster call
|
||||||
|
private boolean throwAlreadyRegister = false;
|
||||||
|
|
||||||
private int responseIdReset = -1;
|
private int responseIdReset = -1;
|
||||||
private List<ResourceRequest> lastAsk;
|
private List<ResourceRequest> lastAsk;
|
||||||
private List<ContainerId> lastRelease;
|
private List<ContainerId> lastRelease;
|
||||||
@ -74,6 +79,11 @@ public class TestAMRMClientRelayer {
|
|||||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||||
RegisterApplicationMasterRequest request)
|
RegisterApplicationMasterRequest request)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
|
if (this.throwAlreadyRegister) {
|
||||||
|
this.throwAlreadyRegister = false;
|
||||||
|
throw new InvalidApplicationMasterRequestException(
|
||||||
|
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + "appId");
|
||||||
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,6 +128,10 @@ public class TestAMRMClientRelayer {
|
|||||||
this.failover = true;
|
this.failover = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setThrowAlreadyRegister() {
|
||||||
|
this.throwAlreadyRegister = true;
|
||||||
|
}
|
||||||
|
|
||||||
public void setResponseIdReset(int expectedResponseId) {
|
public void setResponseIdReset(int expectedResponseId) {
|
||||||
this.responseIdReset = expectedResponseId;
|
this.responseIdReset = expectedResponseId;
|
||||||
}
|
}
|
||||||
@ -315,4 +329,15 @@ public class TestAMRMClientRelayer {
|
|||||||
response = this.relayer.allocate(getAllocateRequest());
|
response = this.relayer.allocate(getAllocateRequest());
|
||||||
Assert.assertEquals(this.responseId + 1, response.getResponseId());
|
Assert.assertEquals(this.responseId + 1, response.getResponseId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConcurrentReregister() throws YarnException, IOException {
|
||||||
|
|
||||||
|
// Set RM restart and failover flag
|
||||||
|
this.mockAMS.setFailoverFlag();
|
||||||
|
|
||||||
|
this.mockAMS.setThrowAlreadyRegister();
|
||||||
|
|
||||||
|
relayer.finishApplicationMaster(null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -184,6 +184,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||||||
*/
|
*/
|
||||||
private volatile boolean justRecovered;
|
private volatile boolean justRecovered;
|
||||||
|
|
||||||
|
/** if true, allocate will be no-op, skipping actual processing. */
|
||||||
|
private volatile boolean finishAMCalled;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to keep track of the container Id and the sub cluster RM that created
|
* Used to keep track of the container Id and the sub cluster RM that created
|
||||||
* the container, so that we know which sub-cluster to forward later requests
|
* the container, so that we know which sub-cluster to forward later requests
|
||||||
@ -230,6 +233,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||||||
this.amRegistrationRequest = null;
|
this.amRegistrationRequest = null;
|
||||||
this.amRegistrationResponse = null;
|
this.amRegistrationResponse = null;
|
||||||
this.justRecovered = false;
|
this.justRecovered = false;
|
||||||
|
this.finishAMCalled = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -576,6 +580,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||||||
+ ". AM should re-register and full re-send pending requests.");
|
+ ". AM should re-register and full re-send pending requests.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.finishAMCalled) {
|
||||||
|
LOG.warn("FinishApplicationMaster already called by {}, skip heartbeat "
|
||||||
|
+ "processing and return dummy response" + this.attemptId);
|
||||||
|
return RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
|
||||||
|
}
|
||||||
|
|
||||||
// Check responseId and handle duplicate heartbeat exactly same as RM
|
// Check responseId and handle duplicate heartbeat exactly same as RM
|
||||||
synchronized (this.lastAllocateResponseLock) {
|
synchronized (this.lastAllocateResponseLock) {
|
||||||
LOG.info("Heartbeat from " + this.attemptId + " with responseId "
|
LOG.info("Heartbeat from " + this.attemptId + " with responseId "
|
||||||
@ -664,6 +674,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||||||
FinishApplicationMasterRequest request)
|
FinishApplicationMasterRequest request)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
|
|
||||||
|
this.finishAMCalled = true;
|
||||||
|
|
||||||
// TODO: consider adding batchFinishApplicationMaster in UAMPoolManager
|
// TODO: consider adding batchFinishApplicationMaster in UAMPoolManager
|
||||||
boolean failedToUnRegister = false;
|
boolean failedToUnRegister = false;
|
||||||
ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc =
|
ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user