MAPREDUCE-2953. Fix a race condition on submission which caused client to incorrectly assume application was gone by making submission synchronous for RMAppManager. Contributed by Thomas Graves.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1166968 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a0ef2d7503
commit
ca853445e9
@ -1247,6 +1247,10 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-2937. Ensure reason for application failure is displayed to the
|
||||
user. (mahadev via acmurthy)
|
||||
|
||||
MAPREDUCE-2953. Fix a race condition on submission which caused client to
|
||||
incorrectly assume application was gone by making submission synchronous
|
||||
for RMAppManager. (Thomas Graves via acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -91,6 +91,7 @@ public class ClientRMService extends AbstractService implements
|
||||
final private YarnScheduler scheduler;
|
||||
final private RMContext rmContext;
|
||||
private final AMLivelinessMonitor amLivelinessMonitor;
|
||||
private final RMAppManager rmAppManager;
|
||||
|
||||
private String clientServiceBindAddress;
|
||||
private Server server;
|
||||
@ -100,11 +101,13 @@ public class ClientRMService extends AbstractService implements
|
||||
private ApplicationACLsManager aclsManager;
|
||||
private Map<ApplicationACL, AccessControlList> applicationACLs;
|
||||
|
||||
public ClientRMService(RMContext rmContext, YarnScheduler scheduler) {
|
||||
public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
|
||||
RMAppManager rmAppManager) {
|
||||
super(ClientRMService.class.getName());
|
||||
this.scheduler = scheduler;
|
||||
this.rmContext = rmContext;
|
||||
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
|
||||
this.rmAppManager = rmAppManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -201,8 +204,10 @@ public SubmitApplicationResponse submitApplication(
|
||||
throw new IOException("Application with id " + applicationId
|
||||
+ " is already present! Cannot add a duplicate!");
|
||||
}
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppManagerSubmitEvent(submissionContext));
|
||||
// This needs to be synchronous as the client can query
|
||||
// immediately following the submission to get the application status.
|
||||
// So call handle directly and do not send an event.
|
||||
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext));
|
||||
|
||||
LOG.info("Application with id " + applicationId.getId() +
|
||||
" submitted by user " + user + " with " + submissionContext);
|
||||
|
@ -210,7 +210,7 @@ protected synchronized void checkAppNumCompletedLimit() {
|
||||
}
|
||||
}
|
||||
|
||||
protected void submitApplication(ApplicationSubmissionContext submissionContext) {
|
||||
protected synchronized void submitApplication(ApplicationSubmissionContext submissionContext) {
|
||||
ApplicationId applicationId = submissionContext.getApplicationId();
|
||||
RMApp application = null;
|
||||
try {
|
||||
|
@ -99,7 +99,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||
protected NMLivelinessMonitor nmLivelinessMonitor;
|
||||
protected NodesListManager nodesListManager;
|
||||
private SchedulerEventDispatcher schedulerDispatcher;
|
||||
private RMAppManager rmAppManager;
|
||||
protected RMAppManager rmAppManager;
|
||||
|
||||
private final AtomicBoolean shutdown = new AtomicBoolean(false);
|
||||
private WebApp webApp;
|
||||
@ -176,13 +176,13 @@ public synchronized void init(Configuration conf) {
|
||||
masterService = createApplicationMasterService();
|
||||
addService(masterService) ;
|
||||
|
||||
clientRM = createClientRMService();
|
||||
addService(clientRM);
|
||||
|
||||
this.rmAppManager = createRMAppManager();
|
||||
// Register event handler for RMAppManagerEvents
|
||||
this.rmDispatcher.register(RMAppManagerEventType.class,
|
||||
this.rmAppManager);
|
||||
|
||||
clientRM = createClientRMService();
|
||||
addService(clientRM);
|
||||
|
||||
adminService = createAdminService();
|
||||
addService(adminService);
|
||||
@ -441,7 +441,7 @@ protected ResourceTrackerService createResourceTrackerService() {
|
||||
}
|
||||
|
||||
protected ClientRMService createClientRMService() {
|
||||
return new ClientRMService(this.rmContext, scheduler);
|
||||
return new ClientRMService(this.rmContext, scheduler, this.rmAppManager);
|
||||
}
|
||||
|
||||
protected ApplicationMasterService createApplicationMasterService() {
|
||||
|
@ -60,13 +60,9 @@ public MockRM(Configuration conf) {
|
||||
|
||||
public void waitForState(ApplicationId appId, RMAppState finalState)
|
||||
throws Exception {
|
||||
RMApp app = getRMContext().getRMApps().get(appId);
|
||||
Assert.assertNotNull("app shouldn't be null", app);
|
||||
int timeoutSecs = 0;
|
||||
RMApp app = null;
|
||||
while ((app == null) && timeoutSecs++ < 20) {
|
||||
app = getRMContext().getRMApps().get(appId);
|
||||
Thread.sleep(500);
|
||||
}
|
||||
timeoutSecs = 0;
|
||||
while (!finalState.equals(app.getState()) &&
|
||||
timeoutSecs++ < 20) {
|
||||
System.out.println("App State is : " + app.getState() +
|
||||
@ -95,6 +91,7 @@ public RMApp submitApp(int masterMemory) throws Exception {
|
||||
req.setApplicationSubmissionContext(sub);
|
||||
|
||||
client.submitApplication(req);
|
||||
// make sure app is immediately available after submit
|
||||
waitForState(appId, RMAppState.ACCEPTED);
|
||||
return getRMContext().getRMApps().get(appId);
|
||||
}
|
||||
@ -131,7 +128,7 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) throws Excepti
|
||||
|
||||
@Override
|
||||
protected ClientRMService createClientRMService() {
|
||||
return new ClientRMService(getRMContext(), getResourceScheduler()) {
|
||||
return new ClientRMService(getRMContext(), getResourceScheduler(), rmAppManager) {
|
||||
@Override
|
||||
public void start() {
|
||||
//override to not start rpc handler
|
||||
|
Loading…
Reference in New Issue
Block a user