YARN-1446. Changed client API to retry killing application till RM acknowledges so as to account for RM crashes/failover. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1551444 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-12-17 02:16:20 +00:00
parent a6754bbb81
commit b774d7b3de
20 changed files with 290 additions and 94 deletions

View File

@ -178,6 +178,9 @@ Release 2.4.0 - UNRELEASED
YARN-1435. Modified Distributed Shell to accept either the command or the
custom script. (Xuan Gong via zjshen)
YARN-1446. Changed client API to retry killing application till RM
acknowledges so as to account for RM crashes/failover. (Jian He via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -26,10 +26,21 @@
import org.apache.hadoop.yarn.util.Records;
/**
* <p>The response sent by the <code>ResourceManager</code> to the client
* aborting a submitted application.</p>
*
* <p>Currently it's empty.</p>
* <p>
* The response sent by the <code>ResourceManager</code> to the client aborting
* a submitted application.
* </p>
* <p>
* The response, includes:
* <ul>
* <li>A flag which indicates that the process of killing the application is
* completed or not.</li>
* </ul>
* Note: user is recommended to wait until this flag becomes true, otherwise if
* the <code>ResourceManager</code> crashes before the process of killing the
* application is completed, the <code>ResourceManager</code> may retry this
* application on recovery.
* </p>
*
* @see ApplicationClientProtocol#forceKillApplication(KillApplicationRequest)
*/
@ -38,9 +49,24 @@
public abstract class KillApplicationResponse {
@Private
@Unstable
public static KillApplicationResponse newInstance() {
public static KillApplicationResponse newInstance(boolean isKillCompleted) {
KillApplicationResponse response =
Records.newRecord(KillApplicationResponse.class);
response.setIsKillCompleted(isKillCompleted);
return response;
}
/**
* Get the flag which indicates that the process of killing application is completed or not.
*/
@Public
@Stable
public abstract boolean getIsKillCompleted();
/**
* Set the flag which indicates that the process of killing application is completed or not.
*/
@Private
@Unstable
public abstract void setIsKillCompleted(boolean isKillCompleted);
}

View File

@ -27,7 +27,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
@ -882,14 +881,22 @@ public class YarnConfiguration extends Configuration {
////////////////////////////////
/**
* Use YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS instead.
* The interval of the yarn client's querying application state after
* application submission. The unit is millisecond.
*/
@Deprecated
public static final String YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
YARN_PREFIX + "client.app-submission.poll-interval";
public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
1000;
/**
* The interval that the yarn client library uses to poll the completion
* status of the asynchronous API of application client protocol.
*/
public static final String YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS =
YARN_PREFIX + "client.application-client-protocol.poll-interval-ms";
public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS =
200;
/**
* Max number of threads in NMClientAsync to process container management
* events

View File

@ -116,6 +116,7 @@ message KillApplicationRequestProto {
}
message KillApplicationResponseProto {
optional bool is_kill_completed = 1 [default = false];
}
message GetClusterMetricsRequestProto {

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -79,7 +80,8 @@ public class YarnClientImpl extends YarnClient {
protected ApplicationClientProtocol rmClient;
protected InetSocketAddress rmAddress;
protected long statePollIntervalMillis;
protected long submitPollIntervalMillis;
private long asyncApiPollIntervalMillis;
private static final String ROOT = "root";
@ -92,12 +94,20 @@ private static InetSocketAddress getRmAddress(Configuration conf) {
YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT);
}
@SuppressWarnings("deprecation")
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.rmAddress = getRmAddress(conf);
statePollIntervalMillis = conf.getLong(
asyncApiPollIntervalMillis =
conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
submitPollIntervalMillis = asyncApiPollIntervalMillis;
if (conf.get(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS)
!= null) {
submitPollIntervalMillis = conf.getLong(
YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS);
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
}
super.serviceInit(conf);
}
@ -165,7 +175,7 @@ public YarnClientApplication createApplication()
" is still in " + state);
}
try {
Thread.sleep(statePollIntervalMillis);
Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
@ -179,11 +189,29 @@ public YarnClientApplication createApplication()
@Override
public void killApplication(ApplicationId applicationId)
throws YarnException, IOException {
LOG.info("Killing application " + applicationId);
KillApplicationRequest request =
Records.newRecord(KillApplicationRequest.class);
request.setApplicationId(applicationId);
rmClient.forceKillApplication(request);
try {
int pollCount = 0;
while (true) {
KillApplicationResponse response =
rmClient.forceKillApplication(request);
if (response.getIsKillCompleted()) {
break;
}
if (++pollCount % 10 == 0) {
LOG.info("Watiting for application " + applicationId
+ " to be killed.");
}
Thread.sleep(asyncApiPollIntervalMillis);
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for application " + applicationId
+ " to be killed.");
}
LOG.info("Killed application " + applicationId);
}
@Override

View File

@ -42,6 +42,8 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -89,6 +91,7 @@ public void testClientStop() {
rm.stop();
}
@SuppressWarnings("deprecation")
@Test (timeout = 30000)
public void testSubmitApplication() {
Configuration conf = new Configuration();
@ -128,6 +131,23 @@ public void testSubmitApplication() {
client.stop();
}
@Test
public void testKillApplication() throws Exception {
MockRM rm = new MockRM();
rm.start();
RMApp app = rm.submitApp(2000);
Configuration conf = new Configuration();
@SuppressWarnings("resource")
final YarnClient client = new MockYarnClient();
client.init(conf);
client.start();
client.killApplication(app.getApplicationId());
verify(((MockYarnClient) client).getRMClient(), times(2))
.forceKillApplication(any(KillApplicationRequest.class));
}
@Test(timeout = 30000)
public void testApplicationType() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
@ -234,6 +254,11 @@ public void start() {
GetApplicationReportRequest.class))).thenReturn(mockResponse);
when(rmClient.getApplications(any(GetApplicationsRequest.class)))
.thenReturn(mockAppResponse);
// return false for 1st kill request, and true for the 2nd.
when(rmClient.forceKillApplication(any(
KillApplicationRequest.class)))
.thenReturn(KillApplicationResponse.newInstance(false)).thenReturn(
KillApplicationResponse.newInstance(true));
} catch (YarnException e) {
Assert.fail("Exception is not expected.");
} catch (IOException e) {
@ -242,6 +267,10 @@ public void start() {
when(mockResponse.getApplicationReport()).thenReturn(mockReport);
}
public ApplicationClientProtocol getRMClient() {
return rmClient;
}
@Override
public List<ApplicationReport> getApplications(
Set<String> applicationTypes, EnumSet<YarnApplicationState> applicationStates)

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.KillApplicationResponseProtoOrBuilder;
import com.google.protobuf.TextFormat;
@ -67,4 +68,24 @@ public boolean equals(Object other) {
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = KillApplicationResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public boolean getIsKillCompleted() {
KillApplicationResponseProtoOrBuilder p =
viaProto ? proto : builder;
return p.getIsKillCompleted();
}
@Override
public void setIsKillCompleted(boolean isKillCompleted) {
maybeInitBuilder();
builder.setIsKillCompleted(isKillCompleted);
}
}

View File

@ -945,10 +945,10 @@
<!-- Other configuration -->
<property>
<description>The interval of the yarn client's querying application state
after application submission. The unit is millisecond.</description>
<name>yarn.client.app-submission.poll-interval</name>
<value>1000</value>
<description>The interval that the yarn client library uses to poll the
completion status of the asynchronous API of application client protocol.
</description>
<name>yarn.client.application-client-protocol.poll-interval-ms</name>
<value>200</value>
</property>
</configuration>

View File

@ -292,15 +292,15 @@ public FinishApplicationMasterResponse finishApplicationMaster(
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
rmContext.getDispatcher().getEventHandler().handle(
if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
.isAppSafeToTerminate()) {
return FinishApplicationMasterResponse.newInstance(true);
} else {
// keep sending the unregister event as RM may crash in the meanwhile.
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
.getTrackingUrl(), request.getFinalApplicationStatus(), request
.getDiagnostics()));
if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
.isAppSafeToUnregister()) {
return FinishApplicationMasterResponse.newInstance(true);
} else {
return FinishApplicationMasterResponse.newInstance(false);
}
}

View File

@ -380,14 +380,15 @@ public KillApplicationResponse forceKillApplication(
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
}
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.KILL));
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.KILL_APP_REQUEST, "ClientRMService" , applicationId);
KillApplicationResponse response = recordFactory
.newRecordInstance(KillApplicationResponse.class);
return response;
if (application.isAppSafeToTerminate()) {
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId);
return KillApplicationResponse.newInstance(true);
} else {
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.KILL));
return KillApplicationResponse.newInstance(false);
}
}
@Override

View File

@ -197,13 +197,13 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
String getApplicationType();
/**
* Check whether this application is safe to unregister.
* An application is deemed to be safe to unregister if it is an unmanaged
* AM or its state has been removed from state store.
* Check whether this application is safe to terminate.
* An application is deemed to be safe to terminate if it is an unmanaged
* AM or its state has been saved in state store.
* @return the flag which indicates whether this application is safe to
* unregister.
* terminate.
*/
boolean isAppSafeToUnregister();
boolean isAppSafeToTerminate();
/**
* Create the external user-facing state of ApplicationMaster from the

View File

@ -37,5 +37,4 @@ public enum RMAppEventType {
// Source: RMStateStore
APP_NEW_SAVED,
APP_UPDATE_SAVED,
APP_REMOVED
}

View File

@ -110,10 +110,14 @@ public class RMAppImpl implements RMApp, Recoverable {
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
private static final AppFinishedTransition FINISHED_TRANSITION =
new AppFinishedTransition();
// These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling;
private RMAppState stateBeforeFinalSaving;
private RMAppEvent eventCausingFinalSaving;
private RMAppState targetedFinalState;
private RMAppState recoveredFinalState;
Object transitionTodo;
private static final StateMachineFactory<RMAppImpl,
@ -166,10 +170,8 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
new AppRejectedTransition(), RMAppState.FAILED))
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED)
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
RMAppEventType.KILL,
new FinalSavingTransition(
new KillAppAndAttemptTransition(), RMAppState.KILLED))
.addTransition(RMAppState.SUBMITTED, RMAppState.KILLING,
RMAppEventType.KILL,new KillAttemptTransition())
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
@ -180,10 +182,8 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
.addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
RMAppEventType.KILL,
new FinalSavingTransition(
new KillAppAndAttemptTransition(), RMAppState.KILLED))
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
RMAppEventType.KILL,new KillAttemptTransition())
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@ -200,10 +200,8 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
EnumSet.of(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED,
new AttemptFailedTransition(RMAppState.SUBMITTED))
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
RMAppEventType.KILL,
new FinalSavingTransition(
new KillAppAndAttemptTransition(), RMAppState.KILLED))
.addTransition(RMAppState.RUNNING, RMAppState.KILLING,
RMAppEventType.KILL, new KillAttemptTransition())
// Transitions from FINAL_SAVING state
.addTransition(RMAppState.FINAL_SAVING,
@ -221,11 +219,27 @@ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
RMAppEventType.KILL, new KillAppAndAttemptTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE))
EnumSet.of(RMAppEventType.NODE_UPDATE,
// ignore Kill as we have already saved the final Finished state in
// state store.
RMAppEventType.KILL))
// Transitions from KILLING state
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
new AppKilledTransition(), RMAppState.KILLED))
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
EnumSet.of(
RMAppEventType.NODE_UPDATE,
RMAppEventType.ATTEMPT_REGISTERED,
RMAppEventType.ATTEMPT_UNREGISTERED,
RMAppEventType.ATTEMPT_FINISHED,
RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.APP_UPDATE_SAVED,
RMAppEventType.KILL))
// Transitions from FINISHED state
// ignorable transitions
@ -249,7 +263,7 @@ RMAppEventType.KILL, new KillAppAndAttemptTransition())
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE))
RMAppEventType.NODE_UPDATE))
.installTopology();
@ -419,6 +433,7 @@ private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
case ACCEPTED:
case RUNNING:
case FINAL_SAVING:
case KILLING:
return FinalApplicationStatus.UNDEFINED;
// finished without a proper final state is the same as failed
case FINISHING:
@ -681,7 +696,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
}
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
app.createNewAttempt(true);
return RMAppState.SUBMITTED;
@ -811,7 +826,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
diags = getAppAttemptFailedDiagnostics(failedEvent);
break;
case KILL:
case ATTEMPT_KILLED:
diags = getAppKilledDiagnostics();
break;
default:
@ -901,7 +916,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
private static class AppKilledTransition extends FinalTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.diagnostics.append("Application killed by user.");
app.diagnostics.append(getAppKilledDiagnostics());
super.transition(app, event);
};
}
@ -910,15 +925,16 @@ private static String getAppKilledDiagnostics() {
return "Application killed by user.";
}
private static class KillAppAndAttemptTransition extends AppKilledTransition {
private static class KillAttemptTransition extends RMAppTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.handler.handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL));
super.transition(app, event);
app.stateBeforeKilling = app.getState();
app.handler.handle(new RMAppAttemptEvent(app.currentAttempt
.getAppAttemptId(), RMAppAttemptEventType.KILL));
}
}
private static final class AppRejectedTransition extends
FinalTransition{
public void transition(RMAppImpl app, RMAppEvent event) {
@ -986,7 +1002,7 @@ public String getApplicationType() {
}
@Override
public boolean isAppSafeToUnregister() {
public boolean isAppSafeToTerminate() {
RMAppState state = getState();
return state.equals(RMAppState.FINISHING)
|| state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
@ -1003,6 +1019,9 @@ public YarnApplicationState createApplicationState() {
if (rmAppState.equals(RMAppState.FINAL_SAVING)) {
rmAppState = stateBeforeFinalSaving;
}
if (rmAppState.equals(RMAppState.KILLING)) {
rmAppState = stateBeforeKilling;
}
switch (rmAppState) {
case NEW:
return YarnApplicationState.NEW;

View File

@ -361,6 +361,8 @@ RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.STATUS_UPDATE,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
// ignore Kill as we have already saved the final Finished state in
// state store.
RMAppAttemptEventType.KILL))
// Transitions from FINISHED State

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -277,12 +278,10 @@ public void NMwaitForState(NodeId nodeid, NodeState finalState)
node.getState());
}
public void killApp(ApplicationId appId) throws Exception {
public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
ApplicationClientProtocol client = getClientRMService();
KillApplicationRequest req = Records
.newRecord(KillApplicationRequest.class);
req.setApplicationId(appId);
client.forceKillApplication(req);
KillApplicationRequest req = KillApplicationRequest.newInstance(appId);
return client.forceKillApplication(req);
}
// from AMLauncher

View File

@ -54,6 +54,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -76,8 +77,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -414,10 +416,8 @@ public void testRMRestartAppRunningAMFailed() throws Exception {
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
// assert the previous AM state is loaded back on RM recovery.
RMApp recoveredApp =
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
Assert.assertEquals(RMAppAttemptState.FAILED, recoveredApp
.getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
rm1.stop();
rm2.stop();
}
@ -964,8 +964,8 @@ public void testRMRestartOnMaxAppAttempts() throws Exception {
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
// Setting AMLivelinessMonitor interval to be 10 Secs.
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
// Setting AMLivelinessMonitor interval to be 3 Secs.
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000);
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
@ -1494,6 +1494,69 @@ public synchronized void checkVersion()
Assert.assertTrue(rm1.getServiceState() == STATE.STOPPED);
}
// This is to test Killing application should be able to wait until app
// reaches killed state and also check that attempt state is saved before app
// state is saved.
@Test
public void testClientRetryOnKillingApplication() throws Exception {
MemoryRMStateStore memStore = new TestMemoryRMStateStore();
memStore.init(conf);
// start RM
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 =
rm1.submitApp(200, "name", "user", null, false, "default", 1, null,
"myType");
MockAM am1 = launchAM(app1, rm1, nm1);
KillApplicationResponse response;
int count = 0;
while (true) {
response = rm1.killApp(app1.getApplicationId());
if (response.getIsKillCompleted()) {
break;
}
Thread.sleep(100);
count++;
}
// we expect at least 2 calls for killApp as the first killApp always return
// false.
Assert.assertTrue(count >= 1);
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.KILLED);
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
Assert.assertEquals(1, ((TestMemoryRMStateStore) memStore).updateAttempt);
Assert.assertEquals(2, ((TestMemoryRMStateStore) memStore).updateApp);
}
public class TestMemoryRMStateStore extends MemoryRMStateStore {
int count = 0;
public int updateApp = 0;
public int updateAttempt = 0;
@Override
public void updateApplicationStateInternal(String appId,
ApplicationStateDataPBImpl appStateData) throws Exception {
updateApp = ++count;
super.updateApplicationStateInternal(appId, appStateData);
}
@Override
public synchronized void
updateApplicationAttemptStateInternal(String attemptIdStr,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception {
updateAttempt = ++count;
super.updateApplicationAttemptStateInternal(attemptIdStr,
attemptStateData);
}
}
public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {

View File

@ -145,7 +145,7 @@ public void setQueue(String name) {
}
@Override
public boolean isAppSafeToUnregister() {
public boolean isAppSafeToTerminate() {
throw new UnsupportedOperationException("Not supported yet.");
}

View File

@ -218,7 +218,7 @@ public String getApplicationType() {
}
@Override
public boolean isAppSafeToUnregister() {
public boolean isAppSafeToTerminate() {
return true;
}

View File

@ -301,12 +301,9 @@ private void assertKilled(RMApp application) {
private void assertAppAndAttemptKilled(RMApp application)
throws InterruptedException {
sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application);
assertKilled(application);
// send attempt final state saved event.
application.getCurrentAppAttempt().handle(
new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
.getAppAttemptId(), null));
Assert.assertEquals(RMAppAttemptState.KILLED, application
.getCurrentAppAttempt().getAppAttemptState());
assertAppFinalStateSaved(application);
@ -329,6 +326,12 @@ private void sendAppUpdateSavedEvent(RMApp application) {
rmDispatcher.await();
}
private void sendAttemptUpdateSavedEvent(RMApp application) {
application.getCurrentAppAttempt().handle(
new RMAppAttemptUpdateSavedEvent(application.getCurrentAppAttempt()
.getAppAttemptId(), null));
}
protected RMApp testCreateAppNewSaving(
ApplicationSubmissionContext submissionContext) throws IOException {
RMApp application = createNewTestApp(submissionContext);
@ -624,11 +627,12 @@ public void testAppRunningKill() throws IOException {
rmDispatcher.await();
// Ignore Attempt_Finished if we were supposed to go to Finished.
assertAppState(RMAppState.FINAL_SAVING, application);
assertAppState(RMAppState.KILLING, application);
RMAppEvent finishEvent =
new RMAppFinishedAttemptEvent(application.getApplicationId(), null);
application.handle(finishEvent);
assertAppState(RMAppState.FINAL_SAVING, application);
assertAppState(RMAppState.KILLING, application);
sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application);
assertKilled(application);
}
@ -686,8 +690,8 @@ public void testAppRunningFailed() throws IOException {
}
@Test
public void testAppFinishingKill() throws IOException {
LOG.info("--- START: testAppFinishedFinished ---");
public void testAppAtFinishingIgnoreKill() throws IOException {
LOG.info("--- START: testAppAtFinishingIgnoreKill ---");
RMApp application = testCreateAppFinishing(null);
// FINISHING => FINISHED event RMAppEventType.KILL
@ -695,7 +699,7 @@ public void testAppFinishingKill() throws IOException {
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
assertAppState(RMAppState.FINISHED, application);
assertAppState(RMAppState.FINISHING, application);
}
// While App is at FINAL_SAVING, Attempt_Finished event may come before
@ -780,6 +784,7 @@ public void testAppKilledKilled() throws IOException {
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
application.handle(event);
rmDispatcher.await();
sendAttemptUpdateSavedEvent(application);
sendAppUpdateSavedEvent(application);
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
@ -801,14 +806,6 @@ public void testAppKilledKilled() throws IOException {
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.ATTEMPT_KILLED
event =
new RMAppEvent(application.getApplicationId(),
RMAppEventType.ATTEMPT_KILLED);
application.handle(event);
rmDispatcher.await();
assertTimesAtFinish(application);
assertAppState(RMAppState.KILLED, application);
// KILLED => KILLED event RMAppEventType.KILL
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);