YARN-1793. Fixed ClientRMService#forceKillApplication not killing unmanaged application. Contributed by Karthik Kambatla
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1576023 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d59dbc9e38
commit
7da07461ff
@ -433,6 +433,9 @@ Release 2.4.0 - UNRELEASED
|
||||
YARN-1787. Fixed help messages for applicationattempt and container
|
||||
sub-commands in bin/yarn. (Zhijie Shen via vinodkv)
|
||||
|
||||
YARN-1793. Fixed ClientRMService#forceKillApplication not killing unmanaged
|
||||
application. (Karthik Kambatla via jianhe)
|
||||
|
||||
Release 2.3.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -351,27 +351,18 @@ public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
RMApp rmApp =
|
||||
rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
|
||||
|
||||
if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
|
||||
// No recovery supported yet for unmanaged AM. Send the unregister event
|
||||
// and (falsely) acknowledge state-store write immediately.
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
|
||||
.getTrackingUrl(), request.getFinalApplicationStatus(), request
|
||||
.getDiagnostics()));
|
||||
if (rmApp.isAppFinalStateStored()) {
|
||||
return FinishApplicationMasterResponse.newInstance(true);
|
||||
}
|
||||
|
||||
// Not an unmanaged-AM.
|
||||
if (rmApp.isAppSafeToTerminate()) {
|
||||
return FinishApplicationMasterResponse.newInstance(true);
|
||||
} else {
|
||||
// keep sending the unregister event as RM may crash in the meanwhile.
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
|
||||
.getTrackingUrl(), request.getFinalApplicationStatus(), request
|
||||
.getDiagnostics()));
|
||||
return FinishApplicationMasterResponse.newInstance(false);
|
||||
}
|
||||
|
||||
// For UnmanagedAMs, return true so they don't retry
|
||||
return FinishApplicationMasterResponse.newInstance(
|
||||
rmApp.getApplicationSubmissionContext().getUnmanagedAM());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -397,15 +397,18 @@ public KillApplicationResponse forceKillApplication(
|
||||
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
||||
}
|
||||
|
||||
if (application.isAppSafeToTerminate()) {
|
||||
if (application.isAppFinalStateStored()) {
|
||||
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||
AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);
|
||||
AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);
|
||||
return KillApplicationResponse.newInstance(true);
|
||||
} else {
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMAppEvent(applicationId, RMAppEventType.KILL));
|
||||
return KillApplicationResponse.newInstance(false);
|
||||
}
|
||||
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMAppEvent(applicationId, RMAppEventType.KILL));
|
||||
|
||||
// For UnmanagedAMs, return true so they don't retry
|
||||
return KillApplicationResponse.newInstance(
|
||||
application.getApplicationSubmissionContext().getUnmanagedAM());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -204,13 +204,10 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
|
||||
Set<String> getApplicationTags();
|
||||
|
||||
/**
|
||||
* Check whether this application is safe to terminate.
|
||||
* An application is deemed to be safe to terminate if it is an unmanaged
|
||||
* AM or its state has been saved in state store.
|
||||
* @return the flag which indicates whether this application is safe to
|
||||
* terminate.
|
||||
* Check whether this application's state has been saved to the state store.
|
||||
* @return the flag indicating whether the applications's state is stored.
|
||||
*/
|
||||
boolean isAppSafeToTerminate();
|
||||
boolean isAppFinalStateStored();
|
||||
|
||||
/**
|
||||
* Create the external user-facing state of ApplicationMaster from the
|
||||
|
@ -1103,14 +1103,11 @@ public Set<String> getApplicationTags() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAppSafeToTerminate() {
|
||||
public boolean isAppFinalStateStored() {
|
||||
RMAppState state = getState();
|
||||
return state.equals(RMAppState.FINISHING)
|
||||
|| state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
|
||||
|| state.equals(RMAppState.KILLED) ||
|
||||
// If this is an unmanaged AM, we are safe to unregister since unmanaged
|
||||
// AM will immediately go to FINISHED state on AM unregistration
|
||||
getApplicationSubmissionContext().getUnmanagedAM();
|
||||
|| state.equals(RMAppState.KILLED);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,17 +173,28 @@ public GetNewApplicationResponse getNewAppId() throws Exception {
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory) throws Exception {
|
||||
return submitApp(masterMemory, false);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, boolean unmanaged)
|
||||
throws Exception {
|
||||
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
|
||||
.getShortUserName());
|
||||
.getShortUserName(), unmanaged);
|
||||
}
|
||||
|
||||
// client
|
||||
public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
|
||||
return submitApp(masterMemory, name, user, null, false, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
||||
return submitApp(masterMemory, name, user, false);
|
||||
}
|
||||
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
boolean unmanaged)
|
||||
throws Exception {
|
||||
return submitApp(masterMemory, name, user, null, unmanaged, null,
|
||||
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
||||
}
|
||||
|
||||
public RMApp submitApp(int masterMemory, String name, String user,
|
||||
Map<ApplicationAccessType, String> acls) throws Exception {
|
||||
return submitApp(masterMemory, name, user, acls, false, null,
|
||||
|
@ -19,6 +19,8 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
@ -34,6 +36,7 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -61,6 +64,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
@ -218,7 +222,7 @@ public void testGetApplicationReport() throws YarnException {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForceKillApplication() throws YarnException {
|
||||
public void testForceKillNonExistingApplication() throws YarnException {
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
when(rmContext.getRMApps()).thenReturn(
|
||||
new ConcurrentHashMap<ApplicationId, RMApp>());
|
||||
@ -237,6 +241,58 @@ public void testForceKillApplication() throws YarnException {
|
||||
"application " + request.getApplicationId());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testForceKillApplication() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
MockRM rm = new MockRM();
|
||||
rm.init(conf);
|
||||
rm.start();
|
||||
|
||||
ClientRMService rmService = rm.getClientRMService();
|
||||
GetApplicationsRequest getRequest = GetApplicationsRequest.newInstance(
|
||||
EnumSet.of(YarnApplicationState.KILLED));
|
||||
|
||||
RMApp app1 = rm.submitApp(1024);
|
||||
RMApp app2 = rm.submitApp(1024, true);
|
||||
|
||||
assertEquals("Incorrect number of apps in the RM", 0,
|
||||
rmService.getApplications(getRequest).getApplicationList().size());
|
||||
|
||||
KillApplicationRequest killRequest1 =
|
||||
KillApplicationRequest.newInstance(app1.getApplicationId());
|
||||
KillApplicationRequest killRequest2 =
|
||||
KillApplicationRequest.newInstance(app2.getApplicationId());
|
||||
|
||||
int killAttemptCount = 0;
|
||||
for (int i = 0; i < 100; i++) {
|
||||
KillApplicationResponse killResponse1 =
|
||||
rmService.forceKillApplication(killRequest1);
|
||||
killAttemptCount++;
|
||||
if (killResponse1.getIsKillCompleted()) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(10);
|
||||
}
|
||||
assertTrue("Kill attempt count should be greater than 1 for managed AMs",
|
||||
killAttemptCount > 1);
|
||||
assertEquals("Incorrect number of apps in the RM", 1,
|
||||
rmService.getApplications(getRequest).getApplicationList().size());
|
||||
|
||||
KillApplicationResponse killResponse2 =
|
||||
rmService.forceKillApplication(killRequest2);
|
||||
assertTrue("Killing UnmanagedAM should falsely acknowledge true",
|
||||
killResponse2.getIsKillCompleted());
|
||||
for (int i = 0; i < 100; i++) {
|
||||
if (2 ==
|
||||
rmService.getApplications(getRequest).getApplicationList().size()) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(10);
|
||||
}
|
||||
assertEquals("Incorrect number of apps in the RM", 2,
|
||||
rmService.getApplications(getRequest).getApplicationList().size());
|
||||
}
|
||||
|
||||
@Test (expected = ApplicationNotFoundException.class)
|
||||
public void testMoveAbsentApplication() throws YarnException {
|
||||
@ -629,6 +685,12 @@ private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
|
||||
|
||||
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
|
||||
String name, String queue, Set<String> tags) {
|
||||
return mockSubmitAppRequest(appId, name, queue, tags, false);
|
||||
}
|
||||
|
||||
private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
|
||||
String name, String queue, Set<String> tags, boolean unmanaged) {
|
||||
|
||||
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
|
||||
|
||||
Resource resource = Resources.createResource(
|
||||
@ -643,6 +705,7 @@ private SubmitApplicationRequest mockSubmitAppRequest(ApplicationId appId,
|
||||
submissionContext.setResource(resource);
|
||||
submissionContext.setApplicationType(appType);
|
||||
submissionContext.setApplicationTags(tags);
|
||||
submissionContext.setUnmanagedAM(unmanaged);
|
||||
|
||||
SubmitApplicationRequest submitRequest =
|
||||
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
|
||||
|
@ -233,7 +233,7 @@ public KillApplicationResponse forceKillApplication(
|
||||
KillApplicationRequest request) throws YarnException {
|
||||
ApplicationId applicationId = request.getApplicationId();
|
||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||
if (application.isAppSafeToTerminate()) {
|
||||
if (application.isAppFinalStateStored()) {
|
||||
return KillApplicationResponse.newInstance(true);
|
||||
} else {
|
||||
return KillApplicationResponse.newInstance(false);
|
||||
|
@ -151,7 +151,7 @@ public void setQueue(String name) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAppSafeToTerminate() {
|
||||
public boolean isAppFinalStateStored() {
|
||||
throw new UnsupportedOperationException("Not supported yet.");
|
||||
}
|
||||
|
||||
|
@ -224,7 +224,7 @@ public Set<String> getApplicationTags() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAppSafeToTerminate() {
|
||||
public boolean isAppFinalStateStored() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user