YARN-540. Race condition causing RM to potentially relaunch already unregistered AMs on RM restart (Jian He via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1523376 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7bc9a5d068
commit
ec010a2936
@ -45,6 +45,7 @@
|
|||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
@ -57,6 +58,8 @@
|
|||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
|
||||||
|
import com.sun.research.ws.wadl.Response;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers/unregisters to RM and sends heartbeats to RM.
|
* Registers/unregisters to RM and sends heartbeats to RM.
|
||||||
*/
|
*/
|
||||||
@ -194,7 +197,15 @@ protected void unregister() {
|
|||||||
FinishApplicationMasterRequest request =
|
FinishApplicationMasterRequest request =
|
||||||
FinishApplicationMasterRequest.newInstance(finishState,
|
FinishApplicationMasterRequest.newInstance(finishState,
|
||||||
sb.toString(), historyUrl);
|
sb.toString(), historyUrl);
|
||||||
scheduler.finishApplicationMaster(request);
|
while (true) {
|
||||||
|
FinishApplicationMasterResponse response =
|
||||||
|
scheduler.finishApplicationMaster(request);
|
||||||
|
if (response.getIsUnregistered()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
LOG.info("Waiting for application to be successfully unregistered.");
|
||||||
|
Thread.sleep(rmPollInterval);
|
||||||
|
}
|
||||||
} catch(Exception are) {
|
} catch(Exception are) {
|
||||||
LOG.error("Exception while unregistering ", are);
|
LOG.error("Exception while unregistering ", are);
|
||||||
}
|
}
|
||||||
|
@ -200,6 +200,9 @@ Release 2.1.1-beta - UNRELEASED
|
|||||||
YARN-1189. NMTokenSecretManagerInNM is not being told when applications
|
YARN-1189. NMTokenSecretManagerInNM is not being told when applications
|
||||||
have finished (Omkar Vinit Joshi via jlowe)
|
have finished (Omkar Vinit Joshi via jlowe)
|
||||||
|
|
||||||
|
YARN-540. Race condition causing RM to potentially relaunch already
|
||||||
|
unregistered AMs on RM restart (Jian He via bikas)
|
||||||
|
|
||||||
Release 2.1.0-beta - 2013-08-22
|
Release 2.1.0-beta - 2013-08-22
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -26,21 +26,52 @@
|
|||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>The response sent by the <code>ResourceManager</code> to a
|
* <p>
|
||||||
* <code>ApplicationMaster</code> on it's completion.</p>
|
* The response sent by the <code>ResourceManager</code> to a
|
||||||
|
* <code>ApplicationMaster</code> on it's completion.
|
||||||
|
* </p>
|
||||||
*
|
*
|
||||||
* <p>Currently, this is empty.</p>
|
* <p>
|
||||||
|
* The response, includes:
|
||||||
|
* <ul>
|
||||||
|
* <li>A flag which indicates that the application has successfully unregistered
|
||||||
|
* with the RM and the application can safely stop.</li>
|
||||||
|
* </ul>
|
||||||
|
* </p>
|
||||||
|
* Note: The flag indicates whether the application has successfully
|
||||||
|
* unregistered and is safe to stop. The application may stop after the flag is
|
||||||
|
* true. If the application stops before the flag is true then the RM may retry
|
||||||
|
* the application .
|
||||||
*
|
*
|
||||||
* @see ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)
|
* @see ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
public abstract class FinishApplicationMasterResponse {
|
public abstract class FinishApplicationMasterResponse {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public static FinishApplicationMasterResponse newInstance() {
|
public static FinishApplicationMasterResponse newInstance(
|
||||||
|
boolean isRemovedFromRMStateStore) {
|
||||||
FinishApplicationMasterResponse response =
|
FinishApplicationMasterResponse response =
|
||||||
Records.newRecord(FinishApplicationMasterResponse.class);
|
Records.newRecord(FinishApplicationMasterResponse.class);
|
||||||
|
response.setIsUnregistered(isRemovedFromRMStateStore);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the flag which indicates that the application has successfully
|
||||||
|
* unregistered with the RM and the application can safely stop.
|
||||||
|
*/
|
||||||
|
@Public
|
||||||
|
@Stable
|
||||||
|
public abstract boolean getIsUnregistered();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the flag which indicates that the application has successfully
|
||||||
|
* unregistered with the RM and the application can safely stop.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public abstract void setIsUnregistered(boolean isUnregistered);
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,7 @@ message FinishApplicationMasterRequestProto {
|
|||||||
}
|
}
|
||||||
|
|
||||||
message FinishApplicationMasterResponseProto {
|
message FinishApplicationMasterResponseProto {
|
||||||
|
optional bool isUnregistered = 1 [default = false];
|
||||||
}
|
}
|
||||||
|
|
||||||
message AllocateRequestProto {
|
message AllocateRequestProto {
|
||||||
|
@ -44,6 +44,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
@ -300,11 +301,24 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
|
|||||||
String appMessage, String appTrackingUrl) throws YarnException,
|
String appMessage, String appTrackingUrl) throws YarnException,
|
||||||
IOException {
|
IOException {
|
||||||
Preconditions.checkArgument(appStatus != null,
|
Preconditions.checkArgument(appStatus != null,
|
||||||
"AppStatus should not be null.");
|
"AppStatus should not be null.");
|
||||||
FinishApplicationMasterRequest request =
|
FinishApplicationMasterRequest request =
|
||||||
FinishApplicationMasterRequest.newInstance(appStatus, appMessage,
|
FinishApplicationMasterRequest.newInstance(appStatus, appMessage,
|
||||||
appTrackingUrl);
|
appTrackingUrl);
|
||||||
rmClient.finishApplicationMaster(request);
|
try {
|
||||||
|
while (true) {
|
||||||
|
FinishApplicationMasterResponse response =
|
||||||
|
rmClient.finishApplicationMaster(request);
|
||||||
|
if (response.getIsUnregistered()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
LOG.info("Waiting for application to be successfully unregistered.");
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.info("Interrupted while waiting for application"
|
||||||
|
+ " to be removed from RMStateStore");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -22,7 +22,9 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProtoOrBuilder;
|
||||||
|
|
||||||
import com.google.protobuf.TextFormat;
|
import com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
@ -67,4 +69,24 @@ public boolean equals(Object other) {
|
|||||||
public String toString() {
|
public String toString() {
|
||||||
return TextFormat.shortDebugString(getProto());
|
return TextFormat.shortDebugString(getProto());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void maybeInitBuilder() {
|
||||||
|
if (viaProto || builder == null) {
|
||||||
|
builder = FinishApplicationMasterResponseProto.newBuilder(proto);
|
||||||
|
}
|
||||||
|
viaProto = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean getIsUnregistered() {
|
||||||
|
FinishApplicationMasterResponseProtoOrBuilder p =
|
||||||
|
viaProto ? proto : builder;
|
||||||
|
return p.getIsUnregistered();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setIsUnregistered(boolean isUnregistered) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setIsUnregistered(isUnregistered);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,6 +72,7 @@
|
|||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||||
@ -303,9 +304,12 @@ public FinishApplicationMasterResponse finishApplicationMaster(
|
|||||||
.getTrackingUrl(), request.getFinalApplicationStatus(), request
|
.getTrackingUrl(), request.getFinalApplicationStatus(), request
|
||||||
.getDiagnostics()));
|
.getDiagnostics()));
|
||||||
|
|
||||||
FinishApplicationMasterResponse response = recordFactory
|
if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
|
||||||
.newRecordInstance(FinishApplicationMasterResponse.class);
|
.isAppSafeToUnregister()) {
|
||||||
return response;
|
return FinishApplicationMasterResponse.newInstance(true);
|
||||||
|
} else {
|
||||||
|
return FinishApplicationMasterResponse.newInstance(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -414,8 +414,8 @@ public GetApplicationsResponse getApplications(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (applicationStates != null && !applicationStates.isEmpty()) {
|
if (applicationStates != null && !applicationStates.isEmpty()) {
|
||||||
if (!applicationStates.contains(RMServerUtils
|
if (!applicationStates.contains(application
|
||||||
.createApplicationState(application.getState()))) {
|
.createApplicationState())) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -186,10 +186,6 @@ protected synchronized void finishApplication(ApplicationId applicationId) {
|
|||||||
|
|
||||||
completedApps.add(applicationId);
|
completedApps.add(applicationId);
|
||||||
writeAuditLog(applicationId);
|
writeAuditLog(applicationId);
|
||||||
|
|
||||||
// application completely done. Remove from state
|
|
||||||
RMStateStore store = rmContext.getStateStore();
|
|
||||||
store.removeApplication(rmContext.getRMApps().get(applicationId));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,27 +115,4 @@ public static void validateBlacklistRequest(ResourceBlacklistRequest blacklistRe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static YarnApplicationState createApplicationState(RMAppState rmAppState) {
|
|
||||||
switch(rmAppState) {
|
|
||||||
case NEW:
|
|
||||||
return YarnApplicationState.NEW;
|
|
||||||
case NEW_SAVING:
|
|
||||||
return YarnApplicationState.NEW_SAVING;
|
|
||||||
case SUBMITTED:
|
|
||||||
return YarnApplicationState.SUBMITTED;
|
|
||||||
case ACCEPTED:
|
|
||||||
return YarnApplicationState.ACCEPTED;
|
|
||||||
case RUNNING:
|
|
||||||
return YarnApplicationState.RUNNING;
|
|
||||||
case FINISHING:
|
|
||||||
case FINISHED:
|
|
||||||
return YarnApplicationState.FINISHED;
|
|
||||||
case KILLED:
|
|
||||||
return YarnApplicationState.KILLED;
|
|
||||||
case FAILED:
|
|
||||||
return YarnApplicationState.FAILED;
|
|
||||||
}
|
|
||||||
throw new YarnRuntimeException("Unknown state passed!");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
@ -108,7 +109,9 @@ public synchronized void storeApplicationAttemptState(String attemptIdStr,
|
|||||||
|
|
||||||
ApplicationState appState = state.getApplicationState().get(
|
ApplicationState appState = state.getApplicationState().get(
|
||||||
attemptState.getAttemptId().getApplicationId());
|
attemptState.getAttemptId().getApplicationId());
|
||||||
assert appState != null;
|
if (appState == null) {
|
||||||
|
throw new YarnRuntimeException("Application doesn't exist");
|
||||||
|
}
|
||||||
|
|
||||||
if (appState.attempts.containsKey(attemptState.getAttemptId())) {
|
if (appState.attempts.containsKey(attemptState.getAttemptId())) {
|
||||||
Exception e = new IOException("Attempt: " +
|
Exception e = new IOException("Attempt: " +
|
||||||
@ -125,7 +128,9 @@ public synchronized void removeApplicationState(ApplicationState appState)
|
|||||||
throws Exception {
|
throws Exception {
|
||||||
ApplicationId appId = appState.getAppId();
|
ApplicationId appId = appState.getAppId();
|
||||||
ApplicationState removed = state.appState.remove(appId);
|
ApplicationState removed = state.appState.remove(appId);
|
||||||
assert removed != null;
|
if (removed == null) {
|
||||||
|
throw new YarnRuntimeException("Removing non-exsisting application state");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -51,6 +51,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
|
||||||
|
|
||||||
@ -482,12 +483,15 @@ private synchronized void handleStoreEvent(RMStateStoreEvent event) {
|
|||||||
ApplicationState appState =
|
ApplicationState appState =
|
||||||
((RMStateStoreRemoveAppEvent) event).getAppState();
|
((RMStateStoreRemoveAppEvent) event).getAppState();
|
||||||
ApplicationId appId = appState.getAppId();
|
ApplicationId appId = appState.getAppId();
|
||||||
|
Exception removedException = null;
|
||||||
LOG.info("Removing info for app: " + appId);
|
LOG.info("Removing info for app: " + appId);
|
||||||
try {
|
try {
|
||||||
removeApplicationState(appState);
|
removeApplicationState(appState);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error removing app: " + appId, e);
|
LOG.error("Error removing app: " + appId, e);
|
||||||
|
removedException = e;
|
||||||
|
} finally {
|
||||||
|
notifyDoneRemovingApplcation(appId, removedException);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -521,7 +525,18 @@ private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId,
|
|||||||
rmDispatcher.getEventHandler().handle(
|
rmDispatcher.getEventHandler().handle(
|
||||||
new RMAppAttemptStoredEvent(attemptId, storedException));
|
new RMAppAttemptStoredEvent(attemptId, storedException));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
/**
|
||||||
|
* This is to notify RMApp that this application has been removed from
|
||||||
|
* RMStateStore
|
||||||
|
*/
|
||||||
|
private void notifyDoneRemovingApplcation(ApplicationId appId,
|
||||||
|
Exception removedException) {
|
||||||
|
rmDispatcher.getEventHandler().handle(
|
||||||
|
new RMAppRemovedEvent(appId, removedException));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* EventHandler implementation which forward events to the FSRMStateStore
|
* EventHandler implementation which forward events to the FSRMStateStore
|
||||||
* This hides the EventHandle methods of the store from its public interface
|
* This hides the EventHandle methods of the store from its public interface
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
@ -194,4 +195,20 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
|
|||||||
* @return the application type.
|
* @return the application type.
|
||||||
*/
|
*/
|
||||||
String getApplicationType();
|
String getApplicationType();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether this application is safe to unregister.
|
||||||
|
* An application is deemed to be safe to unregister if it is an unmanaged
|
||||||
|
* AM or its state has been removed from state store.
|
||||||
|
* @return the flag which indicates whether this application is safe to
|
||||||
|
* unregister.
|
||||||
|
*/
|
||||||
|
boolean isAppSafeToUnregister();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the external user-facing state of ApplicationMaster from the
|
||||||
|
* current state of the {@link RMApp}.
|
||||||
|
* @return the external user-facing state of ApplicationMaster.
|
||||||
|
*/
|
||||||
|
YarnApplicationState createApplicationState();
|
||||||
}
|
}
|
||||||
|
@ -27,11 +27,14 @@ public enum RMAppEventType {
|
|||||||
// Source: RMAppAttempt
|
// Source: RMAppAttempt
|
||||||
APP_REJECTED,
|
APP_REJECTED,
|
||||||
APP_ACCEPTED,
|
APP_ACCEPTED,
|
||||||
APP_SAVED,
|
|
||||||
ATTEMPT_REGISTERED,
|
ATTEMPT_REGISTERED,
|
||||||
ATTEMPT_FINISHING,
|
ATTEMPT_UNREGISTERED,
|
||||||
ATTEMPT_FINISHED, // Will send the final state
|
ATTEMPT_FINISHED, // Will send the final state
|
||||||
ATTEMPT_FAILED,
|
ATTEMPT_FAILED,
|
||||||
ATTEMPT_KILLED,
|
ATTEMPT_KILLED,
|
||||||
NODE_UPDATE
|
NODE_UPDATE,
|
||||||
|
|
||||||
|
// Source: RMStateStore
|
||||||
|
APP_SAVED,
|
||||||
|
APP_REMOVED
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
@ -56,6 +57,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
@ -109,6 +111,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
|
private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
|
||||||
private static final AppFinishedTransition FINISHED_TRANSITION =
|
private static final AppFinishedTransition FINISHED_TRANSITION =
|
||||||
new AppFinishedTransition();
|
new AppFinishedTransition();
|
||||||
|
private boolean isAppRemovalRequestSent = false;
|
||||||
|
private RMAppState previousStateAtRemoving;
|
||||||
|
|
||||||
private static final StateMachineFactory<RMAppImpl,
|
private static final StateMachineFactory<RMAppImpl,
|
||||||
RMAppState,
|
RMAppState,
|
||||||
@ -167,8 +171,9 @@ RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
|||||||
// Transitions from RUNNING state
|
// Transitions from RUNNING state
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
||||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.FINISHING,
|
.addTransition(RMAppState.RUNNING, RMAppState.REMOVING,
|
||||||
RMAppEventType.ATTEMPT_FINISHING, new RMAppFinishingTransition())
|
RMAppEventType.ATTEMPT_UNREGISTERED,
|
||||||
|
new RMAppRemovingTransition())
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
|
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
|
||||||
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
||||||
.addTransition(RMAppState.RUNNING,
|
.addTransition(RMAppState.RUNNING,
|
||||||
@ -178,6 +183,17 @@ RMAppEventType.ATTEMPT_FINISHING, new RMAppFinishingTransition())
|
|||||||
.addTransition(RMAppState.RUNNING, RMAppState.KILLED,
|
.addTransition(RMAppState.RUNNING, RMAppState.KILLED,
|
||||||
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
||||||
|
|
||||||
|
// Transitions from REMOVING state
|
||||||
|
.addTransition(RMAppState.REMOVING, RMAppState.FINISHING,
|
||||||
|
RMAppEventType.APP_REMOVED, new RMAppFinishingTransition())
|
||||||
|
.addTransition(RMAppState.REMOVING, RMAppState.FINISHED,
|
||||||
|
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
||||||
|
.addTransition(RMAppState.REMOVING, RMAppState.KILLED,
|
||||||
|
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
||||||
|
// ignorable transitions
|
||||||
|
.addTransition(RMAppState.REMOVING, RMAppState.REMOVING,
|
||||||
|
RMAppEventType.NODE_UPDATE)
|
||||||
|
|
||||||
// Transitions from FINISHING state
|
// Transitions from FINISHING state
|
||||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
|
||||||
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
|
||||||
@ -185,36 +201,34 @@ RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
|||||||
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
RMAppEventType.KILL, new KillAppAndAttemptTransition())
|
||||||
// ignorable transitions
|
// ignorable transitions
|
||||||
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
|
||||||
RMAppEventType.NODE_UPDATE)
|
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.APP_REMOVED))
|
||||||
|
|
||||||
// Transitions from FINISHED state
|
// Transitions from FINISHED state
|
||||||
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
|
||||||
RMAppEventType.KILL)
|
|
||||||
// ignorable transitions
|
// ignorable transitions
|
||||||
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
|
||||||
EnumSet.of(
|
EnumSet.of(
|
||||||
RMAppEventType.NODE_UPDATE,
|
RMAppEventType.NODE_UPDATE,
|
||||||
RMAppEventType.ATTEMPT_FINISHING,
|
RMAppEventType.ATTEMPT_UNREGISTERED,
|
||||||
RMAppEventType.ATTEMPT_FINISHED))
|
RMAppEventType.ATTEMPT_FINISHED,
|
||||||
|
RMAppEventType.KILL,
|
||||||
|
RMAppEventType.APP_REMOVED))
|
||||||
|
|
||||||
// Transitions from FAILED state
|
// Transitions from FAILED state
|
||||||
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
|
||||||
EnumSet.of(RMAppEventType.KILL, RMAppEventType.APP_SAVED))
|
|
||||||
// ignorable transitions
|
// ignorable transitions
|
||||||
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
|
||||||
RMAppEventType.NODE_UPDATE)
|
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
|
||||||
|
RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
|
||||||
|
|
||||||
// Transitions from KILLED state
|
// Transitions from KILLED state
|
||||||
|
// ignorable transitions
|
||||||
.addTransition(
|
.addTransition(
|
||||||
RMAppState.KILLED,
|
RMAppState.KILLED,
|
||||||
RMAppState.KILLED,
|
RMAppState.KILLED,
|
||||||
EnumSet.of(RMAppEventType.APP_ACCEPTED,
|
EnumSet.of(RMAppEventType.APP_ACCEPTED,
|
||||||
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
|
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
|
||||||
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
|
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
|
||||||
RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_SAVED))
|
RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE,
|
||||||
// ignorable transitions
|
RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
|
||||||
.addTransition(RMAppState.KILLED, RMAppState.KILLED,
|
|
||||||
RMAppEventType.NODE_UPDATE)
|
|
||||||
|
|
||||||
.installTopology();
|
.installTopology();
|
||||||
|
|
||||||
@ -384,6 +398,7 @@ private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
|
|||||||
case SUBMITTED:
|
case SUBMITTED:
|
||||||
case ACCEPTED:
|
case ACCEPTED:
|
||||||
case RUNNING:
|
case RUNNING:
|
||||||
|
case REMOVING:
|
||||||
return FinalApplicationStatus.UNDEFINED;
|
return FinalApplicationStatus.UNDEFINED;
|
||||||
// finished without a proper final state is the same as failed
|
// finished without a proper final state is the same as failed
|
||||||
case FINISHING:
|
case FINISHING:
|
||||||
@ -475,7 +490,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
|
|||||||
return BuilderUtils.newApplicationReport(this.applicationId,
|
return BuilderUtils.newApplicationReport(this.applicationId,
|
||||||
currentApplicationAttemptId, this.user, this.queue,
|
currentApplicationAttemptId, this.user, this.queue,
|
||||||
this.name, host, rpcPort, clientToAMToken,
|
this.name, host, rpcPort, clientToAMToken,
|
||||||
RMServerUtils.createApplicationState(this.stateMachine.getCurrentState()), diags,
|
createApplicationState(), diags,
|
||||||
trackingUrl, this.startTime, this.finishTime, finishState,
|
trackingUrl, this.startTime, this.finishTime, finishState,
|
||||||
appUsageReport, origTrackingUrl, progress, this.applicationType,
|
appUsageReport, origTrackingUrl, progress, this.applicationType,
|
||||||
amrmToken);
|
amrmToken);
|
||||||
@ -657,6 +672,15 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class RMAppRemovingTransition extends RMAppTransition {
|
||||||
|
@Override
|
||||||
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
|
LOG.info("Removing application with id " + app.applicationId);
|
||||||
|
app.removeApplicationState();
|
||||||
|
app.previousStateAtRemoving = app.getState();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class AppFinishedTransition extends FinalTransition {
|
private static class AppFinishedTransition extends FinalTransition {
|
||||||
public void transition(RMAppImpl app, RMAppEvent event) {
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
RMAppFinishedAttemptEvent finishedEvent =
|
RMAppFinishedAttemptEvent finishedEvent =
|
||||||
@ -712,6 +736,9 @@ public void transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
if (app.getState() != RMAppState.FINISHING) {
|
if (app.getState() != RMAppState.FINISHING) {
|
||||||
app.finishTime = System.currentTimeMillis();
|
app.finishTime = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
// application completely done and remove from state store.
|
||||||
|
app.removeApplicationState();
|
||||||
|
|
||||||
app.handler.handle(
|
app.handler.handle(
|
||||||
new RMAppManagerEvent(app.applicationId,
|
new RMAppManagerEvent(app.applicationId,
|
||||||
RMAppManagerEventType.APP_COMPLETED));
|
RMAppManagerEventType.APP_COMPLETED));
|
||||||
@ -764,4 +791,52 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
|
|||||||
public String getApplicationType() {
|
public String getApplicationType() {
|
||||||
return this.applicationType;
|
return this.applicationType;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAppSafeToUnregister() {
|
||||||
|
RMAppState state = getState();
|
||||||
|
return state.equals(RMAppState.FINISHING)
|
||||||
|
|| state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
|
||||||
|
|| state.equals(RMAppState.KILLED) ||
|
||||||
|
// If this is an unmanaged AM, we are safe to unregister since unmanaged
|
||||||
|
// AM will immediately go to FINISHED state on AM unregistration
|
||||||
|
getApplicationSubmissionContext().getUnmanagedAM();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public YarnApplicationState createApplicationState() {
|
||||||
|
RMAppState rmAppState = getState();
|
||||||
|
// If App is in REMOVING state, return its previous state.
|
||||||
|
if (rmAppState.equals(RMAppState.REMOVING)) {
|
||||||
|
rmAppState = previousStateAtRemoving;
|
||||||
|
}
|
||||||
|
switch (rmAppState) {
|
||||||
|
case NEW:
|
||||||
|
return YarnApplicationState.NEW;
|
||||||
|
case NEW_SAVING:
|
||||||
|
return YarnApplicationState.NEW_SAVING;
|
||||||
|
case SUBMITTED:
|
||||||
|
return YarnApplicationState.SUBMITTED;
|
||||||
|
case ACCEPTED:
|
||||||
|
return YarnApplicationState.ACCEPTED;
|
||||||
|
case RUNNING:
|
||||||
|
return YarnApplicationState.RUNNING;
|
||||||
|
case FINISHING:
|
||||||
|
case FINISHED:
|
||||||
|
return YarnApplicationState.FINISHED;
|
||||||
|
case KILLED:
|
||||||
|
return YarnApplicationState.KILLED;
|
||||||
|
case FAILED:
|
||||||
|
return YarnApplicationState.FAILED;
|
||||||
|
default:
|
||||||
|
throw new YarnRuntimeException("Unknown state passed!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeApplicationState(){
|
||||||
|
if (!isAppRemovalRequestSent) {
|
||||||
|
rmContext.getStateStore().removeApplication(this);
|
||||||
|
isAppRemovalRequestSent = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,36 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
|
||||||
|
public class RMAppRemovedEvent extends RMAppEvent {
|
||||||
|
|
||||||
|
private final Exception removedException;
|
||||||
|
|
||||||
|
public RMAppRemovedEvent(ApplicationId appId, Exception removedException) {
|
||||||
|
super(appId, RMAppEventType.APP_REMOVED);
|
||||||
|
this.removedException = removedException;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Exception getRemovedException() {
|
||||||
|
return removedException;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -24,6 +24,7 @@ public enum RMAppState {
|
|||||||
SUBMITTED,
|
SUBMITTED,
|
||||||
ACCEPTED,
|
ACCEPTED,
|
||||||
RUNNING,
|
RUNNING,
|
||||||
|
REMOVING,
|
||||||
FINISHING,
|
FINISHING,
|
||||||
FINISHED,
|
FINISHED,
|
||||||
FAILED,
|
FAILED,
|
||||||
|
@ -1147,7 +1147,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
|
|||||||
ApplicationId applicationId =
|
ApplicationId applicationId =
|
||||||
appAttempt.getAppAttemptId().getApplicationId();
|
appAttempt.getAppAttemptId().getApplicationId();
|
||||||
appAttempt.eventHandler.handle(
|
appAttempt.eventHandler.handle(
|
||||||
new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHING));
|
new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED));
|
||||||
return RMAppAttemptState.FINISHING;
|
return RMAppAttemptState.FINISHING;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -334,10 +334,12 @@ public void testRMRestart() throws Exception {
|
|||||||
|
|
||||||
// finish the AM's
|
// finish the AM's
|
||||||
am1.unregisterAppAttempt();
|
am1.unregisterAppAttempt();
|
||||||
|
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.FINISHING);
|
||||||
am1Node.nodeHeartbeat(attempt1.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
am1Node.nodeHeartbeat(attempt1.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
am1.waitForState(RMAppAttemptState.FINISHED);
|
am1.waitForState(RMAppAttemptState.FINISHED);
|
||||||
|
|
||||||
am2.unregisterAppAttempt();
|
am2.unregisterAppAttempt();
|
||||||
|
rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.FINISHING);
|
||||||
am2Node.nodeHeartbeat(attempt2.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
am2Node.nodeHeartbeat(attempt2.getAppAttemptId(), 1, ContainerState.COMPLETE);
|
||||||
am2.waitForState(RMAppAttemptState.FINISHED);
|
am2.waitForState(RMAppAttemptState.FINISHED);
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
@ -142,6 +143,16 @@ public String getApplicationType() {
|
|||||||
public void setQueue(String name) {
|
public void setQueue(String name) {
|
||||||
throw new UnsupportedOperationException("Not supported yet.");
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAppSafeToUnregister() {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public YarnApplicationState createApplicationState() {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static RMApp newApplication(int i) {
|
public static RMApp newApplication(int i) {
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.MockApps;
|
import org.apache.hadoop.yarn.MockApps;
|
||||||
@ -215,6 +216,15 @@ public int pullRMNodeUpdates(Collection<RMNode> updatedNodes) {
|
|||||||
@Override
|
@Override
|
||||||
public String getApplicationType() {
|
public String getApplicationType() {
|
||||||
return YarnConfiguration.DEFAULT_APPLICATION_TYPE;
|
return YarnConfiguration.DEFAULT_APPLICATION_TYPE;
|
||||||
};
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAppSafeToUnregister() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public YarnApplicationState createApplicationState() {
|
||||||
|
return null;
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.junit.Assume.assumeTrue;
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -59,8 +60,9 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
@ -78,6 +80,7 @@ public class TestRMAppTransitions {
|
|||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS;
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS;
|
||||||
private static int appId = 1;
|
private static int appId = 1;
|
||||||
private DrainDispatcher rmDispatcher;
|
private DrainDispatcher rmDispatcher;
|
||||||
|
private RMStateStore store;
|
||||||
|
|
||||||
// ignore all the RM application attempt events
|
// ignore all the RM application attempt events
|
||||||
private static final class TestApplicationAttemptEventDispatcher implements
|
private static final class TestApplicationAttemptEventDispatcher implements
|
||||||
@ -171,7 +174,7 @@ public void setUp() throws Exception {
|
|||||||
mock(ContainerAllocationExpirer.class);
|
mock(ContainerAllocationExpirer.class);
|
||||||
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
||||||
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
|
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
|
||||||
RMStateStore store = mock(RMStateStore.class);
|
store = mock(RMStateStore.class);
|
||||||
this.rmContext =
|
this.rmContext =
|
||||||
new RMContextImpl(rmDispatcher, store,
|
new RMContextImpl(rmDispatcher, store,
|
||||||
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
|
||||||
@ -278,6 +281,10 @@ private static void assertTimesAtFinish(RMApp application) {
|
|||||||
(application.getFinishTime() >= application.getStartTime()));
|
(application.getFinishTime() >= application.getStartTime()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertAppRemoved(RMApp application){
|
||||||
|
verify(store).removeApplication(application);
|
||||||
|
}
|
||||||
|
|
||||||
private static void assertKilled(RMApp application) {
|
private static void assertKilled(RMApp application) {
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.KILLED, application);
|
assertAppState(RMAppState.KILLED, application);
|
||||||
@ -366,15 +373,27 @@ protected RMApp testCreateAppRunning(
|
|||||||
return application;
|
return application;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected RMApp testCreateAppRemoving(
|
||||||
|
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||||
|
RMApp application = testCreateAppRunning(submissionContext);
|
||||||
|
RMAppEvent finishingEvent =
|
||||||
|
new RMAppEvent(application.getApplicationId(),
|
||||||
|
RMAppEventType.ATTEMPT_UNREGISTERED);
|
||||||
|
application.handle(finishingEvent);
|
||||||
|
assertAppState(RMAppState.REMOVING, application);
|
||||||
|
assertAppRemoved(application);
|
||||||
|
return application;
|
||||||
|
}
|
||||||
|
|
||||||
protected RMApp testCreateAppFinishing(
|
protected RMApp testCreateAppFinishing(
|
||||||
ApplicationSubmissionContext submissionContext) throws IOException {
|
ApplicationSubmissionContext submissionContext) throws IOException {
|
||||||
// unmanaged AMs don't use the FINISHING state
|
// unmanaged AMs don't use the FINISHING state
|
||||||
assert submissionContext == null || !submissionContext.getUnmanagedAM();
|
assert submissionContext == null || !submissionContext.getUnmanagedAM();
|
||||||
RMApp application = testCreateAppRunning(submissionContext);
|
RMApp application = testCreateAppRemoving(submissionContext);
|
||||||
// RUNNING => FINISHING event RMAppEventType.ATTEMPT_FINISHING
|
// REMOVING => FINISHING event RMAppEventType.APP_REMOVED
|
||||||
RMAppEvent finishingEvent =
|
RMAppEvent finishingEvent =
|
||||||
new RMAppEvent(application.getApplicationId(),
|
new RMAppEvent(application.getApplicationId(),
|
||||||
RMAppEventType.ATTEMPT_FINISHING);
|
RMAppEventType.APP_REMOVED);
|
||||||
application.handle(finishingEvent);
|
application.handle(finishingEvent);
|
||||||
assertAppState(RMAppState.FINISHING, application);
|
assertAppState(RMAppState.FINISHING, application);
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
@ -634,6 +653,30 @@ public void testAppRunningFailed() throws IOException {
|
|||||||
assertFailed(application, ".*Failing the application.*");
|
assertFailed(application, ".*Failing the application.*");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppRemovingFinished() throws IOException {
|
||||||
|
LOG.info("--- START: testAppRemovingFINISHED ---");
|
||||||
|
RMApp application = testCreateAppRemoving(null);
|
||||||
|
// APP_REMOVING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
|
||||||
|
RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent(
|
||||||
|
application.getApplicationId(), null);
|
||||||
|
application.handle(finishedEvent);
|
||||||
|
rmDispatcher.await();
|
||||||
|
assertAppState(RMAppState.FINISHED, application);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppRemovingKilled() throws IOException {
|
||||||
|
LOG.info("--- START: testAppRemovingKilledD ---");
|
||||||
|
RMApp application = testCreateAppRemoving(null);
|
||||||
|
// APP_REMOVING => KILLED event RMAppEventType.KILL
|
||||||
|
RMAppEvent event =
|
||||||
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
||||||
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
|
assertAppState(RMAppState.KILLED, application);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAppFinishingKill() throws IOException {
|
public void testAppFinishingKill() throws IOException {
|
||||||
LOG.info("--- START: testAppFinishedFinished ---");
|
LOG.info("--- START: testAppFinishedFinished ---");
|
||||||
|
Loading…
Reference in New Issue
Block a user