YARN-2579. Fixed a deadlock issue when EmbeddedElectorService and FatalEventDispatcher try to transition RM to StandBy at the same time. Contributed by Rohith Sharmaks
This commit is contained in:
parent
8549fa5dc9
commit
395275af86
@ -868,6 +868,10 @@ Release 2.6.0 - UNRELEASED
|
||||
YARN-2805. Fixed ResourceManager to load HA configs correctly before kerberos
|
||||
login. (Wangda Tan via vinodkv)
|
||||
|
||||
YARN-2579. Fixed a deadlock issue when EmbeddedElectorService and
|
||||
FatalEventDispatcher try to transition RM to StandBy at the same time.
|
||||
(Rohith Sharmaks via jianhe)
|
||||
|
||||
Release 2.5.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -43,8 +43,6 @@
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
|
||||
import org.junit.After;
|
||||
@ -173,7 +171,6 @@ public void testExplicitFailover()
|
||||
verifyConnections();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testAutomaticFailover()
|
||||
throws YarnException, InterruptedException, IOException {
|
||||
@ -196,10 +193,7 @@ public void testAutomaticFailover()
|
||||
// so it transitions to standby.
|
||||
ResourceManager rm = cluster.getResourceManager(
|
||||
cluster.getActiveRMIndex());
|
||||
RMFatalEvent event =
|
||||
new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED,
|
||||
"Fake RMFatalEvent");
|
||||
rm.getRMContext().getDispatcher().getEventHandler().handle(event);
|
||||
rm.handleTransitionToStandBy();
|
||||
int maxWaitingAttempts = 2000;
|
||||
while (maxWaitingAttempts-- > 0 ) {
|
||||
if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) {
|
||||
|
@ -23,7 +23,6 @@
|
||||
@InterfaceAudience.Private
|
||||
public enum RMFatalEventType {
|
||||
// Source <- Store
|
||||
STATE_STORE_FENCED,
|
||||
STATE_STORE_OP_FAILED,
|
||||
|
||||
// Source <- Embedded Elector
|
||||
|
@ -269,6 +269,7 @@ protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler,
|
||||
@VisibleForTesting
|
||||
protected void setRMStateStore(RMStateStore rmStore) {
|
||||
rmStore.setRMDispatcher(rmDispatcher);
|
||||
rmStore.setResourceManager(this);
|
||||
rmContext.setStateStore(rmStore);
|
||||
}
|
||||
|
||||
@ -397,11 +398,12 @@ public class RMActiveServices extends CompositeService {
|
||||
private EventHandler<SchedulerEvent> schedulerDispatcher;
|
||||
private ApplicationMasterLauncher applicationMasterLauncher;
|
||||
private ContainerAllocationExpirer containerAllocationExpirer;
|
||||
|
||||
private ResourceManager rm;
|
||||
private boolean recoveryEnabled;
|
||||
|
||||
RMActiveServices() {
|
||||
RMActiveServices(ResourceManager rm) {
|
||||
super("RMActiveServices");
|
||||
this.rm = rm;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -449,6 +451,7 @@ protected void serviceInit(Configuration configuration) throws Exception {
|
||||
try {
|
||||
rmStore.init(conf);
|
||||
rmStore.setRMDispatcher(rmDispatcher);
|
||||
rmStore.setResourceManager(rm);
|
||||
} catch (Exception e) {
|
||||
// the Exception from stateStore.init() needs to be handled for
|
||||
// HA and we need to give up master status if we got fenced
|
||||
@ -729,39 +732,31 @@ public void handle(SchedulerEvent event) {
|
||||
@Private
|
||||
public static class RMFatalEventDispatcher
|
||||
implements EventHandler<RMFatalEvent> {
|
||||
private final RMContext rmContext;
|
||||
private final ResourceManager rm;
|
||||
|
||||
public RMFatalEventDispatcher(
|
||||
RMContext rmContext, ResourceManager resourceManager) {
|
||||
this.rmContext = rmContext;
|
||||
this.rm = resourceManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(RMFatalEvent event) {
|
||||
LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " +
|
||||
event.getType().name() + ". Cause:\n" + event.getCause());
|
||||
|
||||
if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) {
|
||||
LOG.info("RMStateStore has been fenced");
|
||||
if (rmContext.isHAEnabled()) {
|
||||
try {
|
||||
// Transition to standby and reinit active services
|
||||
LOG.info("Transitioning RM to Standby mode");
|
||||
rm.transitionToStandby(true);
|
||||
rm.adminService.resetLeaderElection();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.fatal("Failed to transition RM to Standby mode.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ExitUtil.terminate(1, event.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
public void handleTransitionToStandBy() {
|
||||
if (rmContext.isHAEnabled()) {
|
||||
try {
|
||||
// Transition to standby and reinit active services
|
||||
LOG.info("Transitioning RM to Standby mode");
|
||||
transitionToStandby(true);
|
||||
adminService.resetLeaderElection();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.fatal("Failed to transition RM to Standby mode.");
|
||||
ExitUtil.terminate(1, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
public static final class ApplicationEventDispatcher implements
|
||||
EventHandler<RMAppEvent> {
|
||||
@ -990,7 +985,7 @@ protected void startWepApp() {
|
||||
* @throws Exception
|
||||
*/
|
||||
protected void createAndInitActiveServices() throws Exception {
|
||||
activeServices = new RMActiveServices();
|
||||
activeServices = new RMActiveServices(this);
|
||||
activeServices.init(conf);
|
||||
}
|
||||
|
||||
@ -1227,7 +1222,7 @@ public static void main(String argv[]) {
|
||||
private Dispatcher setupDispatcher() {
|
||||
Dispatcher dispatcher = createDispatcher();
|
||||
dispatcher.register(RMFatalEventType.class,
|
||||
new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
|
||||
new ResourceManager.RMFatalEventDispatcher());
|
||||
return dispatcher;
|
||||
}
|
||||
|
||||
|
@ -49,6 +49,7 @@
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
@ -87,6 +88,7 @@ public abstract class RMStateStore extends AbstractService {
|
||||
"AMRMTokenSecretManagerRoot";
|
||||
protected static final String VERSION_NODE = "RMVersionNode";
|
||||
protected static final String EPOCH_NODE = "EpochNode";
|
||||
private ResourceManager resourceManager;
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
|
||||
|
||||
@ -818,13 +820,15 @@ protected void handleStoreEvent(RMStateStoreEvent event) {
|
||||
* @param failureCause the exception due to which the operation failed
|
||||
*/
|
||||
protected void notifyStoreOperationFailed(Exception failureCause) {
|
||||
RMFatalEventType type;
|
||||
if (failureCause instanceof StoreFencedException) {
|
||||
type = RMFatalEventType.STATE_STORE_FENCED;
|
||||
Thread standByTransitionThread =
|
||||
new Thread(new StandByTransitionThread());
|
||||
standByTransitionThread.setName("StandByTransitionThread Handler");
|
||||
standByTransitionThread.start();
|
||||
} else {
|
||||
type = RMFatalEventType.STATE_STORE_OP_FAILED;
|
||||
rmDispatcher.getEventHandler().handle(
|
||||
new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause));
|
||||
}
|
||||
rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -866,4 +870,16 @@ public void handle(RMStateStoreEvent event) {
|
||||
* @throws Exception
|
||||
*/
|
||||
public abstract void deleteStore() throws Exception;
|
||||
|
||||
public void setResourceManager(ResourceManager rm) {
|
||||
this.resourceManager = rm;
|
||||
}
|
||||
|
||||
private class StandByTransitionThread implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
LOG.info("RMStateStore has been fenced");
|
||||
resourceManager.handleTransitionToStandBy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -46,6 +46,7 @@
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
@ -451,6 +452,67 @@ public synchronized void startInternal() throws Exception {
|
||||
checkActiveRMFunctionality();
|
||||
}
|
||||
|
||||
@Test(timeout = 90000)
|
||||
public void testTransitionedToStandbyShouldNotHang() throws Exception {
|
||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
Configuration conf = new YarnConfiguration(configuration);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||
@Override
|
||||
public synchronized void updateApplicationState(ApplicationState appState) {
|
||||
notifyStoreOperationFailed(new StoreFencedException());
|
||||
}
|
||||
};
|
||||
memStore.init(conf);
|
||||
rm = new MockRM(conf, memStore) {
|
||||
@Override
|
||||
void stopActiveServices() throws Exception {
|
||||
Thread.sleep(10000);
|
||||
super.stopActiveServices();
|
||||
}
|
||||
};
|
||||
rm.init(conf);
|
||||
final StateChangeRequestInfo requestInfo =
|
||||
new StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||
|
||||
assertEquals(STATE_ERR, HAServiceState.INITIALIZING, rm.adminService
|
||||
.getServiceStatus().getState());
|
||||
assertFalse("RM is ready to become active before being started",
|
||||
rm.adminService.getServiceStatus().isReadyToBecomeActive());
|
||||
checkMonitorHealth();
|
||||
|
||||
rm.start();
|
||||
checkMonitorHealth();
|
||||
checkStandbyRMFunctionality();
|
||||
|
||||
// 2. Transition to Active.
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
|
||||
// 3. Try Transition to standby
|
||||
Thread t = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
rm.transitionToStandby(true);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
} catch (Exception e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
|
||||
rm.getRMContext().getStateStore().updateApplicationState(null);
|
||||
t.join(); // wait for thread to finish
|
||||
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
checkStandbyRMFunctionality();
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
public void innerTestHAWithRMHostName(boolean includeBindHost) {
|
||||
//this is run two times, with and without a bind host configured
|
||||
if (includeBindHost) {
|
||||
|
Loading…
Reference in New Issue
Block a user