YARN-4761. NMs reconnecting with changed capabilities can lead to wrong cluster resource calculations on fair scheduler. Contributed by Sangjin Lee

This commit is contained in:
Zhihai Xu 2016-03-06 19:46:09 -08:00
parent 19ee185907
commit e1ccc9622b
3 changed files with 134 additions and 112 deletions

View File

@ -889,7 +889,7 @@ private synchronized void addNode(List<NMContainerStatus> containerReports,
} else {
nodesPerRack.put(rackName, 1);
}
Resources.addTo(clusterResource, node.getTotalCapability());
Resources.addTo(clusterResource, schedulerNode.getTotalResource());
updateMaximumAllocation(schedulerNode, true);
triggerUpdate();
@ -909,7 +909,7 @@ private synchronized void removeNode(RMNode rmNode) {
if (node == null) {
return;
}
Resources.subtractFrom(clusterResource, rmNode.getTotalCapability());
Resources.subtractFrom(clusterResource, node.getTotalResource());
updateRootQueueMetrics();
triggerUpdate();

View File

@ -28,6 +28,8 @@
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@ -38,11 +40,24 @@
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -51,12 +66,19 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@SuppressWarnings("unchecked")
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
@ -615,4 +637,114 @@ private void verifyMaximumResourceCapability(
Assert.assertEquals(expectedMaximumResource.getVirtualCores(),
schedulerMaximumResourceCapability.getVirtualCores());
}
private class SleepHandler implements EventHandler<SchedulerEvent> {
boolean sleepFlag = false;
int sleepTime = 20;
@Override
public void handle(SchedulerEvent event) {
try {
if (sleepFlag) {
Thread.sleep(sleepTime);
}
} catch(InterruptedException ie) {
}
}
}
private ResourceTrackerService getPrivateResourceTrackerService(
Dispatcher privateDispatcher, ResourceManager rm,
SleepHandler sleepHandler) {
Configuration conf = getConf();
RMContext privateContext =
new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
null, null, null);
privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
privateDispatcher.register(SchedulerEventType.class, sleepHandler);
privateDispatcher.register(SchedulerEventType.class,
rm.getResourceScheduler());
privateDispatcher.register(RMNodeEventType.class,
new ResourceManager.NodeEventDispatcher(privateContext));
((Service) privateDispatcher).init(conf);
((Service) privateDispatcher).start();
NMLivelinessMonitor nmLivelinessMonitor =
new NMLivelinessMonitor(privateDispatcher);
nmLivelinessMonitor.init(conf);
nmLivelinessMonitor.start();
NodesListManager nodesListManager = new NodesListManager(privateContext);
nodesListManager.init(conf);
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.start();
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.start();
ResourceTrackerService privateResourceTrackerService =
new ResourceTrackerService(privateContext, nodesListManager,
nmLivelinessMonitor, containerTokenSecretManager,
nmTokenSecretManager);
privateResourceTrackerService.init(conf);
privateResourceTrackerService.start();
rm.getResourceScheduler().setRMContext(privateContext);
return privateResourceTrackerService;
}
/**
* Test the behavior of the scheduler when a node reconnects
* with changed capabilities. This test is to catch any race conditions
* that might occur due to the use of the RMNode object.
* @throws Exception
*/
@Test(timeout = 60000)
public void testNodemanagerReconnect() throws Exception {
configureScheduler();
Configuration conf = getConf();
MockRM rm = new MockRM(conf);
try {
rm.start();
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
DrainDispatcher privateDispatcher = new DrainDispatcher();
SleepHandler sleepHandler = new SleepHandler();
ResourceTrackerService privateResourceTrackerService =
getPrivateResourceTrackerService(privateDispatcher, rm, sleepHandler);
// Register node1
String hostname1 = "localhost1";
Resource capability = BuilderUtils.newResource(4096, 4);
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
RegisterNodeManagerRequest request1 =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
request1.setNodeId(nodeId1);
request1.setHttpPort(0);
request1.setResource(capability);
privateResourceTrackerService.registerNodeManager(request1);
privateDispatcher.await();
Resource clusterResource =
rm.getResourceScheduler().getClusterResource();
Assert.assertEquals("Initial cluster resources don't match", capability,
clusterResource);
Resource newCapability = BuilderUtils.newResource(1024, 1);
RegisterNodeManagerRequest request2 =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request2.setNodeId(nodeId1);
request2.setHttpPort(0);
request2.setResource(newCapability);
// hold up the disaptcher and register the same node with lower capability
sleepHandler.sleepFlag = true;
privateResourceTrackerService.registerNodeManager(request2);
privateDispatcher.await();
Assert.assertEquals("Cluster resources don't match", newCapability,
rm.getResourceScheduler().getClusterResource());
privateResourceTrackerService.stop();
} finally {
rm.stop();
}
}
}

View File

@ -46,7 +46,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@ -74,7 +73,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -82,7 +80,6 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
@ -90,13 +87,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
@ -116,7 +110,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -136,7 +129,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@ -3285,108 +3277,6 @@ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
}
}
private class SleepHandler implements EventHandler<SchedulerEvent> {
boolean sleepFlag = false;
int sleepTime = 20;
@Override
public void handle(SchedulerEvent event) {
try {
if(sleepFlag) {
Thread.sleep(sleepTime);
}
}
catch(InterruptedException ie) {
}
}
}
private ResourceTrackerService getPrivateResourceTrackerService(
Dispatcher privateDispatcher, SleepHandler sleepHandler) {
Configuration conf = new Configuration();
ResourceTrackerService privateResourceTrackerService;
RMContext privateContext =
new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
null, null, null);
privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
privateDispatcher.register(SchedulerEventType.class, sleepHandler);
privateDispatcher.register(SchedulerEventType.class,
resourceManager.getResourceScheduler());
privateDispatcher.register(RMNodeEventType.class,
new ResourceManager.NodeEventDispatcher(privateContext));
((Service) privateDispatcher).init(conf);
((Service) privateDispatcher).start();
NMLivelinessMonitor nmLivelinessMonitor =
new NMLivelinessMonitor(privateDispatcher);
nmLivelinessMonitor.init(conf);
nmLivelinessMonitor.start();
NodesListManager nodesListManager = new NodesListManager(privateContext);
nodesListManager.init(conf);
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
containerTokenSecretManager.start();
NMTokenSecretManagerInRM nmTokenSecretManager =
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.start();
privateResourceTrackerService =
new ResourceTrackerService(privateContext, nodesListManager,
nmLivelinessMonitor, containerTokenSecretManager,
nmTokenSecretManager);
privateResourceTrackerService.init(conf);
privateResourceTrackerService.start();
resourceManager.getResourceScheduler().setRMContext(privateContext);
return privateResourceTrackerService;
}
/**
* Test the behaviour of the capacity scheduler when a node reconnects
* with changed capabilities. This test is to catch any race conditions
* that might occur due to the use of the RMNode object.
* @throws Exception
*/
@Test
public void testNodemanagerReconnect() throws Exception {
DrainDispatcher privateDispatcher = new DrainDispatcher();
SleepHandler sleepHandler = new SleepHandler();
ResourceTrackerService privateResourceTrackerService =
getPrivateResourceTrackerService(privateDispatcher,
sleepHandler);
// Register node1
String hostname1 = "localhost1";
Resource capability = BuilderUtils.newResource(4096, 4);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
RegisterNodeManagerRequest request1 =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
request1.setNodeId(nodeId1);
request1.setHttpPort(0);
request1.setResource(capability);
privateResourceTrackerService.registerNodeManager(request1);
privateDispatcher.await();
Resource clusterResource = resourceManager.getResourceScheduler().getClusterResource();
Assert.assertEquals("Initial cluster resources don't match", capability,
clusterResource);
Resource newCapability = BuilderUtils.newResource(1024, 1);
RegisterNodeManagerRequest request2 =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request2.setNodeId(nodeId1);
request2.setHttpPort(0);
request2.setResource(newCapability);
// hold up the disaptcher and register the same node with lower capability
sleepHandler.sleepFlag = true;
privateResourceTrackerService.registerNodeManager(request2);
privateDispatcher.await();
Assert.assertEquals("Cluster resources don't match", newCapability,
resourceManager.getResourceScheduler().getClusterResource());
privateResourceTrackerService.stop();
}
@Test
public void testResourceUpdateDecommissioningNode() throws Exception {
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode