YARN-2136. Changed RMStateStore to ignore store opearations when fenced. Contributed by Varun Saxena
This commit is contained in:
parent
26319ba0db
commit
52bcefca8b
@ -109,6 +109,9 @@ Release 2.7.0 - UNRELEASED
|
||||
YARN-2907. SchedulerNode#toString should print all resource detail instead
|
||||
of only memory. (Rohith via junping_du)
|
||||
|
||||
YARN-2136. Changed RMStateStore to ignore store opearations when fenced.
|
||||
(Varun Saxena via jianhe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
@ -26,6 +27,7 @@
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
@ -88,7 +90,8 @@ public abstract class RMStateStore extends AbstractService {
|
||||
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
|
||||
|
||||
private enum RMStateStoreState {
|
||||
DEFAULT
|
||||
ACTIVE,
|
||||
FENCED
|
||||
};
|
||||
|
||||
private static final StateMachineFactory<RMStateStore,
|
||||
@ -99,17 +102,27 @@ private enum RMStateStoreState {
|
||||
RMStateStoreState,
|
||||
RMStateStoreEventType,
|
||||
RMStateStoreEvent>(
|
||||
RMStateStoreState.DEFAULT)
|
||||
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
|
||||
RMStateStoreState.ACTIVE)
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.STORE_APP, new StoreAppTransition())
|
||||
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
|
||||
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
|
||||
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
|
||||
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
|
||||
RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
|
||||
RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition())
|
||||
.addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
|
||||
RMStateStoreEventType.FENCED)
|
||||
.addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
|
||||
EnumSet.of(
|
||||
RMStateStoreEventType.STORE_APP,
|
||||
RMStateStoreEventType.UPDATE_APP,
|
||||
RMStateStoreEventType.REMOVE_APP,
|
||||
RMStateStoreEventType.STORE_APP_ATTEMPT,
|
||||
RMStateStoreEventType.UPDATE_APP_ATTEMPT,
|
||||
RMStateStoreEventType.FENCED));
|
||||
|
||||
private final StateMachine<RMStateStoreState,
|
||||
RMStateStoreEventType,
|
||||
@ -432,6 +445,11 @@ public synchronized void updateApplicationState(
|
||||
dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
|
||||
}
|
||||
|
||||
public synchronized void updateFencedState() {
|
||||
this.stateMachine.doTransition(RMStateStoreEventType.FENCED,
|
||||
new RMStateStoreEvent(RMStateStoreEventType.FENCED));
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocking API
|
||||
* Derived classes must implement this method to store the state of an
|
||||
@ -494,6 +512,10 @@ protected abstract void updateApplicationAttemptStateInternal(
|
||||
public synchronized void storeRMDelegationTokenAndSequenceNumber(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't store RM Delegation Token.");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
|
||||
latestSequenceNumber);
|
||||
@ -516,6 +538,10 @@ protected abstract void storeRMDelegationTokenAndSequenceNumberState(
|
||||
*/
|
||||
public synchronized void removeRMDelegationToken(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't remove RM Delegation Token.");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
removeRMDelegationTokenState(rmDTIdentifier);
|
||||
} catch (Exception e) {
|
||||
@ -537,6 +563,10 @@ protected abstract void removeRMDelegationTokenState(
|
||||
public synchronized void updateRMDelegationTokenAndSequenceNumber(
|
||||
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
|
||||
int latestSequenceNumber) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't update RM Delegation Token.");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
updateRMDelegationTokenAndSequenceNumberInternal(rmDTIdentifier, renewDate,
|
||||
latestSequenceNumber);
|
||||
@ -558,6 +588,11 @@ protected abstract void updateRMDelegationTokenAndSequenceNumberInternal(
|
||||
* RMDTSecretManager call this to store the state of a master key
|
||||
*/
|
||||
public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't store RM Delegation " +
|
||||
"Token Master key.");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
storeRMDTMasterKeyState(delegationKey);
|
||||
} catch (Exception e) {
|
||||
@ -577,6 +612,11 @@ protected abstract void storeRMDTMasterKeyState(DelegationKey delegationKey)
|
||||
* RMDTSecretManager call this to remove the state of a master key
|
||||
*/
|
||||
public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't remove RM Delegation " +
|
||||
"Token Master key.");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
removeRMDTMasterKeyState(delegationKey);
|
||||
} catch (Exception e) {
|
||||
@ -647,6 +687,11 @@ public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) {
|
||||
}
|
||||
return credentials;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized boolean isFencedState() {
|
||||
return (RMStateStoreState.FENCED == this.stateMachine.getCurrentState());
|
||||
}
|
||||
|
||||
// Dispatcher related code
|
||||
protected void handleStoreEvent(RMStateStoreEvent event) {
|
||||
@ -665,6 +710,7 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
|
||||
*/
|
||||
protected void notifyStoreOperationFailed(Exception failureCause) {
|
||||
if (failureCause instanceof StoreFencedException) {
|
||||
updateFencedState();
|
||||
Thread standByTransitionThread =
|
||||
new Thread(new StandByTransitionThread());
|
||||
standByTransitionThread.setName("StandByTransitionThread Handler");
|
||||
|
@ -23,5 +23,6 @@ public enum RMStateStoreEventType {
|
||||
STORE_APP,
|
||||
UPDATE_APP,
|
||||
UPDATE_APP_ATTEMPT,
|
||||
REMOVE_APP
|
||||
REMOVE_APP,
|
||||
FENCED
|
||||
}
|
||||
|
@ -1017,6 +1017,9 @@ private class VerifyActiveStatusThread extends Thread {
|
||||
public void run() {
|
||||
try {
|
||||
while (true) {
|
||||
if(isFencedState()) {
|
||||
break;
|
||||
}
|
||||
doMultiWithRetries(emptyOpList);
|
||||
Thread.sleep(zkSessionTimeout);
|
||||
}
|
||||
@ -1134,6 +1137,11 @@ protected synchronized ZooKeeper getNewZooKeeper()
|
||||
public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
|
||||
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
|
||||
boolean isUpdate) {
|
||||
if(isFencedState()) {
|
||||
LOG.info("State store is in Fenced state. Can't store/update " +
|
||||
"AMRMToken Secret Manager state.");
|
||||
return;
|
||||
}
|
||||
AMRMTokenSecretManagerState data =
|
||||
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
|
||||
byte[] stateData = data.getProto().toByteArray();
|
||||
|
@ -20,22 +20,40 @@
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.Test;
|
||||
@ -191,4 +209,80 @@ public void testFencing() throws Exception {
|
||||
HAServiceProtocol.HAServiceState.ACTIVE,
|
||||
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFencedState() throws Exception {
|
||||
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||
RMStateStore store = zkTester.getRMStateStore();
|
||||
|
||||
// Move state to FENCED from ACTIVE
|
||||
store.updateFencedState();
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
||||
long submitTime = System.currentTimeMillis();
|
||||
long startTime = submitTime + 1000;
|
||||
|
||||
// Add a new app
|
||||
RMApp mockApp = mock(RMApp.class);
|
||||
ApplicationSubmissionContext context =
|
||||
new ApplicationSubmissionContextPBImpl();
|
||||
when(mockApp.getSubmitTime()).thenReturn(submitTime);
|
||||
when(mockApp.getStartTime()).thenReturn(startTime);
|
||||
when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
|
||||
when(mockApp.getUser()).thenReturn("test");
|
||||
store.storeNewApplication(mockApp);
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
||||
// Add a new attempt
|
||||
ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
|
||||
new ClientToAMTokenSecretManagerInRM();
|
||||
ApplicationAttemptId attemptId = ConverterUtils
|
||||
.toApplicationAttemptId("appattempt_1234567894321_0001_000001");
|
||||
SecretKey clientTokenMasterKey =
|
||||
clientToAMTokenMgr.createMasterKey(attemptId);
|
||||
RMAppAttemptMetrics mockRmAppAttemptMetrics =
|
||||
mock(RMAppAttemptMetrics.class);
|
||||
Container container = new ContainerPBImpl();
|
||||
container.setId(ConverterUtils.toContainerId("container_1234567891234_0001_01_000001"));
|
||||
RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
|
||||
when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
|
||||
when(mockAttempt.getMasterContainer()).thenReturn(container);
|
||||
when(mockAttempt.getClientTokenMasterKey())
|
||||
.thenReturn(clientTokenMasterKey);
|
||||
when(mockAttempt.getRMAppAttemptMetrics())
|
||||
.thenReturn(mockRmAppAttemptMetrics);
|
||||
when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
|
||||
.thenReturn(new AggregateAppResourceUsage(0,0));
|
||||
store.storeNewApplicationAttempt(mockAttempt);
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
||||
long finishTime = submitTime + 1000;
|
||||
// Update attempt
|
||||
ApplicationAttemptStateData newAttemptState =
|
||||
ApplicationAttemptStateData.newInstance(attemptId, container,
|
||||
store.getCredentialsFromAppAttempt(mockAttempt),
|
||||
startTime, RMAppAttemptState.FINISHED, "testUrl",
|
||||
"test", FinalApplicationStatus.SUCCEEDED, 100,
|
||||
finishTime, 0, 0);
|
||||
store.updateApplicationAttemptState(newAttemptState);
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
||||
// Update app
|
||||
ApplicationStateData appState = ApplicationStateData.newInstance(submitTime,
|
||||
startTime, context, "test");
|
||||
store.updateApplicationState(appState);
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
||||
// Remove app
|
||||
store.removeApplication(mockApp);
|
||||
assertEquals("RMStateStore should have been in fenced state",
|
||||
true, store.isFencedState());
|
||||
|
||||
store.close();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user