YARN-8673. [AMRMProxy] More robust responseId resync after an YarnRM master slave switch. Contributed by Botong Huang.

This commit is contained in:
Giovanni Matteo Fumarola 2018-08-20 12:22:36 -07:00
parent 65e7469712
commit 8736fc39ac
6 changed files with 190 additions and 57 deletions

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -54,6 +55,10 @@ public final class AMRMClientUtils {
public static final String APP_ALREADY_REGISTERED_MESSAGE = public static final String APP_ALREADY_REGISTERED_MESSAGE =
"Application Master is already registered : "; "Application Master is already registered : ";
public static final String EXPECTED_HB_RESPONSEID_MESSAGE =
" expect responseId to be ";
public static final String RECEIVED_HB_RESPONSEID_MESSAGE = " but get ";
private AMRMClientUtils() { private AMRMClientUtils() {
} }
@ -105,6 +110,48 @@ public final class AMRMClientUtils {
SaslRpcServer.AuthMethod.TOKEN.toString()); SaslRpcServer.AuthMethod.TOKEN.toString());
} }
/**
* Generate the exception message when RM receives an AM heartbeat with
* invalid responseId.
*
* @param appAttemptId the app attempt
* @param expected the expected responseId value
* @param received the received responseId value
* @return the assembled exception message
*/
public static String assembleInvalidResponseIdExceptionMessage(
ApplicationAttemptId appAttemptId, int expected, int received) {
return "Invalid responseId in AllocateRequest from application attempt: "
+ appAttemptId + EXPECTED_HB_RESPONSEID_MESSAGE + expected
+ RECEIVED_HB_RESPONSEID_MESSAGE + received;
}
/**
* Parse the expected responseId from the exception generated by RM when
* processing AM heartbeat.
*
* @param exceptionMessage the exception message thrown by RM
* @return the parsed expected responseId, -1 if failed
*/
public static int parseExpectedResponseIdFromException(
String exceptionMessage) {
if (exceptionMessage == null) {
return -1;
}
int start = exceptionMessage.indexOf(EXPECTED_HB_RESPONSEID_MESSAGE);
int end = exceptionMessage.indexOf(RECEIVED_HB_RESPONSEID_MESSAGE);
if (start == -1 || end == -1) {
return -1;
}
start += EXPECTED_HB_RESPONSEID_MESSAGE.length();
try {
return Integer.parseInt(exceptionMessage.substring(start, end));
} catch (NumberFormatException ex) {
return -1;
}
}
public static void addToOutstandingSchedulingRequests( public static void addToOutstandingSchedulingRequests(
Collection<SchedulingRequest> requests, Collection<SchedulingRequest> requests,
Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) { Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests) {

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
@ -111,13 +113,22 @@ public class AMRMClientRelayer extends AbstractService
new HashMap<>(); new HashMap<>();
private List<SchedulingRequest> schedulingRequest = new ArrayList<>(); private List<SchedulingRequest> schedulingRequest = new ArrayList<>();
private ApplicationId appId;
// Normally -1, otherwise will override responseId with this value in the next
// heartbeat
private volatile int resetResponseId;
public AMRMClientRelayer() { public AMRMClientRelayer() {
super(AMRMClientRelayer.class.getName()); super(AMRMClientRelayer.class.getName());
this.resetResponseId = -1;
} }
public AMRMClientRelayer(ApplicationMasterProtocol rmClient) { public AMRMClientRelayer(ApplicationMasterProtocol rmClient,
ApplicationId appId) {
this(); this();
this.rmClient = rmClient; this.rmClient = rmClient;
this.appId = appId;
} }
@Override @Override
@ -167,19 +178,15 @@ public class AMRMClientRelayer extends AbstractService
try { try {
return this.rmClient.finishApplicationMaster(request); return this.rmClient.finishApplicationMaster(request);
} catch (ApplicationMasterNotRegisteredException e) { } catch (ApplicationMasterNotRegisteredException e) {
LOG.warn("Out of sync with ResourceManager, hence resyncing."); LOG.warn("Out of sync with RM for " + this.appId + ", hence resyncing.");
// re register with RM // re register with RM
registerApplicationMaster(this.amRegistrationRequest); registerApplicationMaster(this.amRegistrationRequest);
return finishApplicationMaster(request); return finishApplicationMaster(request);
} }
} }
@Override private void addNewAllocateRequest(AllocateRequest allocateRequest)
public AllocateResponse allocate(AllocateRequest allocateRequest) throws YarnException {
throws YarnException, IOException {
AllocateResponse allocateResponse = null;
try {
synchronized (this) {
// update the data structures first // update the data structures first
addNewAsks(allocateRequest.getAskList()); addNewAsks(allocateRequest.getAskList());
@ -217,9 +224,17 @@ public class AMRMClientRelayer extends AbstractService
AMRMClientUtils.addToOutstandingSchedulingRequests( AMRMClientUtils.addToOutstandingSchedulingRequests(
allocateRequest.getSchedulingRequests(), allocateRequest.getSchedulingRequests(),
this.remotePendingSchedRequest); this.remotePendingSchedRequest);
this.schedulingRequest this.schedulingRequest.addAll(allocateRequest.getSchedulingRequests());
.addAll(allocateRequest.getSchedulingRequests());
} }
}
@Override
public AllocateResponse allocate(AllocateRequest allocateRequest)
throws YarnException, IOException {
AllocateResponse allocateResponse = null;
try {
synchronized (this) {
addNewAllocateRequest(allocateRequest);
ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size()); ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size());
for (ResourceRequest r : ask) { for (ResourceRequest r : ask) {
@ -238,13 +253,23 @@ public class AMRMClientRelayer extends AbstractService
.updateRequests(new ArrayList<>(this.change.values())) .updateRequests(new ArrayList<>(this.change.values()))
.schedulingRequests(new ArrayList<>(this.schedulingRequest)) .schedulingRequests(new ArrayList<>(this.schedulingRequest))
.build(); .build();
if (this.resetResponseId != -1) {
LOG.info("Override allocate responseId from "
+ allocateRequest.getResponseId() + " to " + this.resetResponseId
+ " for " + this.appId);
allocateRequest.setResponseId(this.resetResponseId);
}
} }
// Do the actual allocate call // Do the actual allocate call
try { try {
allocateResponse = this.rmClient.allocate(allocateRequest); allocateResponse = this.rmClient.allocate(allocateRequest);
// Heartbeat succeeded, wipe out responseId overriding
this.resetResponseId = -1;
} catch (ApplicationMasterNotRegisteredException e) { } catch (ApplicationMasterNotRegisteredException e) {
LOG.warn("ApplicationMaster is out of sync with ResourceManager," LOG.warn("ApplicationMaster is out of sync with RM for " + this.appId
+ " hence resyncing."); + " hence resyncing.");
synchronized (this) { synchronized (this) {
@ -269,6 +294,25 @@ public class AMRMClientRelayer extends AbstractService
// Reset responseId after re-register // Reset responseId after re-register
allocateRequest.setResponseId(0); allocateRequest.setResponseId(0);
return allocate(allocateRequest); return allocate(allocateRequest);
} catch (Throwable t) {
// If RM is complaining about responseId out of sync, force reset next
// time
if (t instanceof InvalidApplicationMasterRequestException) {
int responseId = AMRMClientUtils
.parseExpectedResponseIdFromException(t.getMessage());
if (responseId != -1) {
this.resetResponseId = responseId;
LOG.info("ResponseId out of sync with RM, expect " + responseId
+ " but " + allocateRequest.getResponseId() + " used by "
+ this.appId + ". Will override in the next allocate.");
} else {
LOG.warn("Failed to parse expected responseId out of exception for "
+ this.appId);
}
}
throw t;
} }
synchronized (this) { synchronized (this) {

View File

@ -193,7 +193,7 @@ public class UnmanagedApplicationManager {
this.applicationId.toString(), UserGroupInformation.getCurrentUser()); this.applicationId.toString(), UserGroupInformation.getCurrentUser());
this.rmProxyRelayer = this.rmProxyRelayer =
new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class, new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
this.conf, this.userUgi, amrmToken)); this.conf, this.userUgi, amrmToken), this.applicationId);
} }
/** /**

View File

@ -40,7 +40,9 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.AMRMClientUtils;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -62,6 +64,7 @@ public class TestAMRMClientRelayer {
// Whether this mockRM will throw failover exception upon next heartbeat // Whether this mockRM will throw failover exception upon next heartbeat
// from AM // from AM
private boolean failover = false; private boolean failover = false;
private int responseIdReset = -1;
private List<ResourceRequest> lastAsk; private List<ResourceRequest> lastAsk;
private List<ContainerId> lastRelease; private List<ContainerId> lastRelease;
private List<String> lastBlacklistAdditions; private List<String> lastBlacklistAdditions;
@ -92,26 +95,40 @@ public class TestAMRMClientRelayer {
this.failover = false; this.failover = false;
throw new ApplicationMasterNotRegisteredException("Mock RM restarted"); throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
} }
if (this.responseIdReset != -1) {
String errorMessage =
AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(null,
this.responseIdReset, request.getResponseId());
this.responseIdReset = -1;
throw new InvalidApplicationMasterRequestException(errorMessage);
}
this.lastAsk = request.getAskList(); this.lastAsk = request.getAskList();
this.lastRelease = request.getReleaseList(); this.lastRelease = request.getReleaseList();
this.lastBlacklistAdditions = this.lastBlacklistAdditions =
request.getResourceBlacklistRequest().getBlacklistAdditions(); request.getResourceBlacklistRequest().getBlacklistAdditions();
this.lastBlacklistRemovals = this.lastBlacklistRemovals =
request.getResourceBlacklistRequest().getBlacklistRemovals(); request.getResourceBlacklistRequest().getBlacklistRemovals();
return AllocateResponse.newInstance(0, null, null, return AllocateResponse.newInstance(request.getResponseId() + 1, null,
new ArrayList<NodeReport>(), Resource.newInstance(0, 0), null, 0, null, new ArrayList<NodeReport>(), Resource.newInstance(0, 0), null,
null, null); 0, null, null);
} }
public void setFailoverFlag() { public void setFailoverFlag() {
this.failover = true; this.failover = true;
} }
public void setResponseIdReset(int expectedResponseId) {
this.responseIdReset = expectedResponseId;
}
} }
private Configuration conf; private Configuration conf;
private MockApplicationMasterService mockAMS; private MockApplicationMasterService mockAMS;
private AMRMClientRelayer relayer; private AMRMClientRelayer relayer;
private int responseId = 0;
// Buffer of asks that will be sent to RM in the next AM heartbeat // Buffer of asks that will be sent to RM in the next AM heartbeat
private List<ResourceRequest> asks = new ArrayList<>(); private List<ResourceRequest> asks = new ArrayList<>();
private List<ContainerId> releases = new ArrayList<>(); private List<ContainerId> releases = new ArrayList<>();
@ -123,7 +140,7 @@ public class TestAMRMClientRelayer {
this.conf = new Configuration(); this.conf = new Configuration();
this.mockAMS = new MockApplicationMasterService(); this.mockAMS = new MockApplicationMasterService();
this.relayer = new AMRMClientRelayer(this.mockAMS); this.relayer = new AMRMClientRelayer(this.mockAMS, null);
this.relayer.init(conf); this.relayer.init(conf);
this.relayer.start(); this.relayer.start();
@ -150,7 +167,7 @@ public class TestAMRMClientRelayer {
private AllocateRequest getAllocateRequest() { private AllocateRequest getAllocateRequest() {
// Need to create a new one every time because rather than directly // Need to create a new one every time because rather than directly
// referring the lists, the protobuf impl makes a copy of the lists // referring the lists, the protobuf impl makes a copy of the lists
return AllocateRequest.newInstance(0, 0, asks, releases, return AllocateRequest.newInstance(responseId, 0, asks, releases,
ResourceBlacklistRequest.newInstance(blacklistAdditions, ResourceBlacklistRequest.newInstance(blacklistAdditions,
blacklistRemoval)); blacklistRemoval));
} }
@ -272,4 +289,30 @@ public class TestAMRMClientRelayer {
clearAllocateRequestLists(); clearAllocateRequestLists();
} }
@Test
public void testResponseIdResync() throws YarnException, IOException {
this.responseId = 10;
AllocateResponse response = this.relayer.allocate(getAllocateRequest());
Assert.assertEquals(this.responseId + 1, response.getResponseId());
int expected = 5;
this.mockAMS.setResponseIdReset(expected);
try {
this.relayer.allocate(getAllocateRequest());
Assert.fail("Expecting exception from RM");
} catch (InvalidApplicationMasterRequestException e) {
// Expected exception
}
// Verify that the responseId is overridden
response = this.relayer.allocate(getAllocateRequest());
Assert.assertEquals(expected + 1, response.getResponseId());
// Verify it is no longer overriden
this.responseId = response.getResponseId();
response = this.relayer.allocate(getAllocateRequest());
Assert.assertEquals(this.responseId + 1, response.getResponseId());
}
} }

View File

@ -249,8 +249,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.homeSubClusterId = this.homeSubClusterId =
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext, this.homeRMRelayer = new AMRMClientRelayer(
ApplicationMasterProtocol.class, this.appOwner)); createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
this.appOwner),
getApplicationContext().getApplicationAttemptId().getApplicationId());
this.federationFacade = FederationStateStoreFacade.getInstance(); this.federationFacade = FederationStateStoreFacade.getInstance();
this.subClusterResolver = this.federationFacade.getSubClusterResolver(); this.subClusterResolver = this.federationFacade.getSubClusterResolver();

View File

@ -420,12 +420,9 @@ public class ApplicationMasterService extends AbstractService implements
// heartbeat one step old, simply return lastReponse // heartbeat one step old, simply return lastReponse
return lastResponse; return lastResponse;
} else if (request.getResponseId() != lastResponse.getResponseId()) { } else if (request.getResponseId() != lastResponse.getResponseId()) {
String message = throw new InvalidApplicationMasterRequestException(AMRMClientUtils
"Invalid responseId in AllocateRequest from application attempt: " .assembleInvalidResponseIdExceptionMessage(appAttemptId,
+ appAttemptId + ", expect responseId to be " lastResponse.getResponseId(), request.getResponseId()));
+ lastResponse.getResponseId() + ", but get "
+ request.getResponseId();
throw new InvalidApplicationMasterRequestException(message);
} }
AllocateResponse response = AllocateResponse response =