YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks)

This commit is contained in:
Rohith Sharma K S 2015-10-07 09:34:59 +05:30
parent 1bca1bb517
commit 9156fc60c6
3 changed files with 243 additions and 86 deletions

View File

@ -1020,6 +1020,9 @@ Release 2.7.2 - UNRELEASED
YARN-3619. ContainerMetrics unregisters during getMetrics and leads to YARN-3619. ContainerMetrics unregisters during getMetrics and leads to
ConcurrentModificationException (Zhihai Xu via jlowe) ConcurrentModificationException (Zhihai Xu via jlowe)
YARN-4209. RMStateStore FENCED state doesnt work due to updateFencedState called
by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -67,7 +67,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
@ -100,7 +100,10 @@ public abstract class RMStateStore extends AbstractService {
public static final Log LOG = LogFactory.getLog(RMStateStore.class); public static final Log LOG = LogFactory.getLog(RMStateStore.class);
private enum RMStateStoreState { /**
* The enum defines state of RMStateStore.
*/
public enum RMStateStoreState {
ACTIVE, ACTIVE,
FENCED FENCED
}; };
@ -114,41 +117,57 @@ private enum RMStateStoreState {
RMStateStoreEventType, RMStateStoreEventType,
RMStateStoreEvent>( RMStateStoreEvent>(
RMStateStoreState.ACTIVE) RMStateStoreState.ACTIVE)
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_APP, new StoreAppTransition()) RMStateStoreEventType.STORE_APP, new StoreAppTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition()) RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition()) RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition()) EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, RMStateStoreEventType.STORE_APP_ATTEMPT,
RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition()) new StoreAppAttemptTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.UPDATE_APP_ATTEMPT,
new UpdateAppAttemptTransition())
.addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_MASTERKEY, RMStateStoreEventType.STORE_MASTERKEY,
new StoreRMDTMasterKeyTransition()) new StoreRMDTMasterKeyTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.REMOVE_MASTERKEY, RMStateStoreEventType.REMOVE_MASTERKEY,
new RemoveRMDTMasterKeyTransition()) new RemoveRMDTMasterKeyTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_DELEGATION_TOKEN, RMStateStoreEventType.STORE_DELEGATION_TOKEN,
new StoreRMDTTransition()) new StoreRMDTTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.REMOVE_DELEGATION_TOKEN, RMStateStoreEventType.REMOVE_DELEGATION_TOKEN,
new RemoveRMDTTransition()) new RemoveRMDTTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.UPDATE_DELEGATION_TOKEN, RMStateStoreEventType.UPDATE_DELEGATION_TOKEN,
new UpdateRMDTTransition()) new UpdateRMDTTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.UPDATE_AMRM_TOKEN, RMStateStoreEventType.UPDATE_AMRM_TOKEN,
new StoreOrUpdateAMRMTokenTransition()) new StoreOrUpdateAMRMTokenTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.STORE_RESERVATION, RMStateStoreEventType.STORE_RESERVATION,
new StoreReservationAllocationTransition()) new StoreReservationAllocationTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.UPDATE_RESERVATION, RMStateStoreEventType.UPDATE_RESERVATION,
new UpdateReservationAllocationTransition()) new UpdateReservationAllocationTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, .addTransition(RMStateStoreState.ACTIVE,
EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
RMStateStoreEventType.REMOVE_RESERVATION, RMStateStoreEventType.REMOVE_RESERVATION,
new RemoveReservationAllocationTransition()) new RemoveReservationAllocationTransition())
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED, .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
@ -176,14 +195,17 @@ RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition())
RMStateStoreEvent> stateMachine; RMStateStoreEvent> stateMachine;
private static class StoreAppTransition private static class StoreAppTransition
implements SingleArcTransition<RMStateStore, RMStateStoreEvent> { implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAppEvent)) { if (!(event instanceof RMStateStoreAppEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
ApplicationStateData appState = ApplicationStateData appState =
((RMStateStoreAppEvent) event).getAppState(); ((RMStateStoreAppEvent) event).getAppState();
ApplicationId appId = ApplicationId appId =
@ -195,20 +217,24 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
RMAppEventType.APP_NEW_SAVED)); RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error storing app: " + appId, e); LOG.error("Error storing app: " + appId, e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
}; };
} }
private static class UpdateAppTransition implements private static class UpdateAppTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateUpdateAppEvent)) { if (!(event instanceof RMStateUpdateAppEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
ApplicationStateData appState = ApplicationStateData appState =
((RMStateUpdateAppEvent) event).getAppState(); ((RMStateUpdateAppEvent) event).getAppState();
ApplicationId appId = ApplicationId appId =
@ -222,20 +248,24 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error updating app: " + appId, e); LOG.error("Error updating app: " + appId, e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
}; };
} }
private static class RemoveAppTransition implements private static class RemoveAppTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRemoveAppEvent)) { if (!(event instanceof RMStateStoreRemoveAppEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
ApplicationStateData appState = ApplicationStateData appState =
((RMStateStoreRemoveAppEvent) event).getAppState(); ((RMStateStoreRemoveAppEvent) event).getAppState();
ApplicationId appId = ApplicationId appId =
@ -245,20 +275,24 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
store.removeApplicationStateInternal(appState); store.removeApplicationStateInternal(appState);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error removing app: " + appId, e); LOG.error("Error removing app: " + appId, e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
}; };
} }
private static class StoreAppAttemptTransition implements private static class StoreAppAttemptTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAppAttemptEvent)) { if (!(event instanceof RMStateStoreAppAttemptEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
ApplicationAttemptStateData attemptState = ApplicationAttemptStateData attemptState =
((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
try { try {
@ -272,20 +306,24 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
}; };
} }
private static class UpdateAppAttemptTransition implements private static class UpdateAppAttemptTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateUpdateAppAttemptEvent)) { if (!(event instanceof RMStateUpdateAppAttemptEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
ApplicationAttemptStateData attemptState = ApplicationAttemptStateData attemptState =
((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState();
try { try {
@ -299,20 +337,24 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED));
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
}; };
} }
private static class StoreRMDTTransition implements private static class StoreRMDTTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRMDTEvent)) { if (!(event instanceof RMStateStoreRMDTEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
try { try {
LOG.info("Storing RMDelegationToken and SequenceNumber"); LOG.info("Storing RMDelegationToken and SequenceNumber");
@ -321,20 +363,24 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error While Storing RMDelegationToken and SequenceNumber ", LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
e); e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
} }
} }
private static class RemoveRMDTTransition implements private static class RemoveRMDTTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRMDTEvent)) { if (!(event instanceof RMStateStoreRMDTEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
try { try {
LOG.info("Removing RMDelegationToken and SequenceNumber"); LOG.info("Removing RMDelegationToken and SequenceNumber");
@ -342,21 +388,24 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error While Removing RMDelegationToken and SequenceNumber ", LOG.error("Error While Removing RMDelegationToken and SequenceNumber ",
e); e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
} }
} }
private static class UpdateRMDTTransition implements private static class UpdateRMDTTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRMDTEvent)) { if (!(event instanceof RMStateStoreRMDTEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event;
try { try {
LOG.info("Updating RMDelegationToken and SequenceNumber"); LOG.info("Updating RMDelegationToken and SequenceNumber");
@ -365,20 +414,24 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error While Updating RMDelegationToken and SequenceNumber ", LOG.error("Error While Updating RMDelegationToken and SequenceNumber ",
e); e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
} }
} }
private static class StoreRMDTMasterKeyTransition implements private static class StoreRMDTMasterKeyTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) { if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
RMStateStoreRMDTMasterKeyEvent dtEvent = RMStateStoreRMDTMasterKeyEvent dtEvent =
(RMStateStoreRMDTMasterKeyEvent) event; (RMStateStoreRMDTMasterKeyEvent) event;
try { try {
@ -386,20 +439,24 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
store.storeRMDTMasterKeyState(dtEvent.getDelegationKey()); store.storeRMDTMasterKeyState(dtEvent.getDelegationKey());
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error While Storing RMDTMasterKey.", e); LOG.error("Error While Storing RMDTMasterKey.", e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
} }
} }
private static class RemoveRMDTMasterKeyTransition implements private static class RemoveRMDTMasterKeyTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) { if (!(event instanceof RMStateStoreRMDTMasterKeyEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
RMStateStoreRMDTMasterKeyEvent dtEvent = RMStateStoreRMDTMasterKeyEvent dtEvent =
(RMStateStoreRMDTMasterKeyEvent) event; (RMStateStoreRMDTMasterKeyEvent) event;
try { try {
@ -407,42 +464,49 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
store.removeRMDTMasterKeyState(dtEvent.getDelegationKey()); store.removeRMDTMasterKeyState(dtEvent.getDelegationKey());
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error While Removing RMDTMasterKey.", e); LOG.error("Error While Removing RMDTMasterKey.", e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
} }
} }
private static class StoreOrUpdateAMRMTokenTransition implements private static class StoreOrUpdateAMRMTokenTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAMRMTokenEvent)) { if (!(event instanceof RMStateStoreAMRMTokenEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
RMStateStoreAMRMTokenEvent amrmEvent = (RMStateStoreAMRMTokenEvent) event; RMStateStoreAMRMTokenEvent amrmEvent = (RMStateStoreAMRMTokenEvent) event;
boolean isFenced = false;
try { try {
LOG.info("Updating AMRMToken"); LOG.info("Updating AMRMToken");
store.storeOrUpdateAMRMTokenSecretManagerState( store.storeOrUpdateAMRMTokenSecretManagerState(
amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate()); amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate());
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error storing info for AMRMTokenSecretManager", e); LOG.error("Error storing info for AMRMTokenSecretManager", e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
} }
} }
private static class StoreReservationAllocationTransition implements private static class StoreReservationAllocationTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreStoreReservationEvent)) { if (!(event instanceof RMStateStoreStoreReservationEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
RMStateStoreStoreReservationEvent reservationEvent = RMStateStoreStoreReservationEvent reservationEvent =
(RMStateStoreStoreReservationEvent) event; (RMStateStoreStoreReservationEvent) event;
try { try {
@ -454,20 +518,24 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
reservationEvent.getReservationIdName()); reservationEvent.getReservationIdName());
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error while storing reservation allocation.", e); LOG.error("Error while storing reservation allocation.", e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
} }
} }
private static class UpdateReservationAllocationTransition implements private static class UpdateReservationAllocationTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreStoreReservationEvent)) { if (!(event instanceof RMStateStoreStoreReservationEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
RMStateStoreStoreReservationEvent reservationEvent = RMStateStoreStoreReservationEvent reservationEvent =
(RMStateStoreStoreReservationEvent) event; (RMStateStoreStoreReservationEvent) event;
try { try {
@ -479,20 +547,24 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
reservationEvent.getReservationIdName()); reservationEvent.getReservationIdName());
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error while updating reservation allocation.", e); LOG.error("Error while updating reservation allocation.", e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
} }
return finalState(isFenced);
} }
} }
private static class RemoveReservationAllocationTransition implements private static class RemoveReservationAllocationTransition implements
SingleArcTransition<RMStateStore, RMStateStoreEvent> { MultipleArcTransition<RMStateStore, RMStateStoreEvent,
RMStateStoreState> {
@Override @Override
public void transition(RMStateStore store, RMStateStoreEvent event) { public RMStateStoreState transition(RMStateStore store,
RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreStoreReservationEvent)) { if (!(event instanceof RMStateStoreStoreReservationEvent)) {
// should never happen // should never happen
LOG.error("Illegal event type: " + event.getClass()); LOG.error("Illegal event type: " + event.getClass());
return; return RMStateStoreState.ACTIVE;
} }
boolean isFenced = false;
RMStateStoreStoreReservationEvent reservationEvent = RMStateStoreStoreReservationEvent reservationEvent =
(RMStateStoreStoreReservationEvent) event; (RMStateStoreStoreReservationEvent) event;
try { try {
@ -503,9 +575,14 @@ public void transition(RMStateStore store, RMStateStoreEvent event) {
reservationEvent.getReservationIdName()); reservationEvent.getReservationIdName());
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error while removing reservation allocation.", e); LOG.error("Error while removing reservation allocation.", e);
store.notifyStoreOperationFailed(e); isFenced = store.notifyStoreOperationFailedInternal(e);
}
return finalState(isFenced);
} }
} }
private static RMStateStoreState finalState(boolean isFenced) {
return isFenced ? RMStateStoreState.FENCED : RMStateStoreState.ACTIVE;
} }
public RMStateStore() { public RMStateStore() {
@ -1006,17 +1083,28 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
} }
} }
@SuppressWarnings("unchecked")
/** /**
* This method is called to notify the ResourceManager that the store * This method is called to notify the ResourceManager that the store
* operation has failed. * operation has failed.
* @param failureCause the exception due to which the operation failed * @param failureCause the exception due to which the operation failed
*/ */
protected void notifyStoreOperationFailed(Exception failureCause) { protected void notifyStoreOperationFailed(Exception failureCause) {
if (isFencedState()) {
return;
}
if (notifyStoreOperationFailedInternal(failureCause)) {
updateFencedState();
}
}
@SuppressWarnings("unchecked")
private boolean notifyStoreOperationFailedInternal(
Exception failureCause) {
boolean isFenced = false;
LOG.error("State store operation failed ", failureCause); LOG.error("State store operation failed ", failureCause);
if (HAUtil.isHAEnabled(getConfig())) { if (HAUtil.isHAEnabled(getConfig())) {
LOG.warn("State-store fenced ! Transitioning RM to standby"); LOG.warn("State-store fenced ! Transitioning RM to standby");
updateFencedState(); isFenced = true;
Thread standByTransitionThread = Thread standByTransitionThread =
new Thread(new StandByTransitionThread()); new Thread(new StandByTransitionThread());
standByTransitionThread.setName("StandByTransitionThread Handler"); standByTransitionThread.setName("StandByTransitionThread Handler");
@ -1029,6 +1117,7 @@ protected void notifyStoreOperationFailed(Exception failureCause) {
} else { } else {
LOG.warn("Skip the state-store error."); LOG.warn("Skip the state-store error.");
} }
return isFenced;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class TestMemoryRMStateStore {
@Test
public void testNotifyStoreOperationFailed() throws Exception {
RMStateStore store = new MemoryRMStateStore() {
@Override
public synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
throw new Exception("testNotifyStoreOperationFailed");
}
};
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
store.init(conf);
ResourceManager mockRM = mock(ResourceManager.class);
store.setResourceManager(mockRM);
RMDelegationTokenIdentifier mockTokenId =
mock(RMDelegationTokenIdentifier.class);
store.removeRMDelegationToken(mockTokenId);
assertTrue("RMStateStore should have been in fenced state",
store.isFencedState());
store = new MemoryRMStateStore() {
@Override
public synchronized void removeRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier) {
notifyStoreOperationFailed(new Exception(
"testNotifyStoreOperationFailed"));
}
};
store.init(conf);
store.setResourceManager(mockRM);
store.removeRMDelegationToken(mockTokenId);
assertTrue("RMStateStore should have been in fenced state",
store.isFencedState());
}
}