YARN-4677. RMNodeResourceUpdateEvent update from scheduler can lead to race condition (wilfreds and gphillips via rkanter)
This commit is contained in:
parent
04cf699dd5
commit
0cd145a443
@ -1106,12 +1106,16 @@ protected void nodeUpdate(RMNode nm) {
|
||||
}
|
||||
|
||||
// Process new container information
|
||||
// NOTICE: it is possible to not find the NodeID as a node can be
|
||||
// decommissioned at the same time. Skip updates if node is null.
|
||||
SchedulerNode schedulerNode = getNode(nm.getNodeID());
|
||||
List<ContainerStatus> completedContainers = updateNewContainerInfo(nm,
|
||||
schedulerNode);
|
||||
|
||||
// Notify Scheduler Node updated.
|
||||
if (schedulerNode != null) {
|
||||
schedulerNode.notifyNodeUpdate();
|
||||
}
|
||||
|
||||
// Process completed containers
|
||||
Resource releasedResources = Resource.newInstance(0, 0);
|
||||
@ -1121,9 +1125,7 @@ protected void nodeUpdate(RMNode nm) {
|
||||
// If the node is decommissioning, send an update to have the total
|
||||
// resource equal to the used resource, so no available resource to
|
||||
// schedule.
|
||||
// TODO YARN-5128: Fix possible race-condition when request comes in before
|
||||
// update is propagated
|
||||
if (nm.getState() == NodeState.DECOMMISSIONING) {
|
||||
if (nm.getState() == NodeState.DECOMMISSIONING && schedulerNode != null) {
|
||||
this.rmContext
|
||||
.getDispatcher()
|
||||
.getEventHandler()
|
||||
@ -1133,13 +1135,16 @@ protected void nodeUpdate(RMNode nm) {
|
||||
}
|
||||
|
||||
updateSchedulerHealthInformation(releasedResources, releasedContainers);
|
||||
if (schedulerNode != null) {
|
||||
updateNodeResourceUtilization(nm, schedulerNode);
|
||||
}
|
||||
|
||||
// Now node data structures are up-to-date and ready for scheduling.
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"Node being looked for scheduling " + nm + " availableResource: "
|
||||
+ schedulerNode.getUnallocatedResource());
|
||||
"Node being looked for scheduling " + nm + " availableResource: " +
|
||||
(schedulerNode == null ? "unknown (decommissioned)" :
|
||||
schedulerNode.getUnallocatedResource()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1096,7 +1096,7 @@ void attemptScheduling(FSSchedulerNode node) {
|
||||
return;
|
||||
}
|
||||
|
||||
final NodeId nodeID = node.getNodeID();
|
||||
final NodeId nodeID = (node != null ? node.getNodeID() : null);
|
||||
if (!nodeTracker.exists(nodeID)) {
|
||||
// The node might have just been removed while this thread was waiting
|
||||
// on the synchronized lock before it entered this synchronized method
|
||||
|
@ -966,7 +966,9 @@ protected synchronized void nodeUpdate(RMNode nm) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
|
||||
// A decommissioned node might be removed before we get here
|
||||
if (node != null &&
|
||||
Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
|
||||
node.getUnallocatedResource(), minimumAllocation)) {
|
||||
LOG.debug("Node heartbeat " + nm.getNodeID() +
|
||||
" available resource = " + node.getUnallocatedResource());
|
||||
|
@ -258,14 +258,12 @@ public void testConfValidation() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
private NodeManager
|
||||
registerNode(String hostName, int containerManagerPort, int httpPort,
|
||||
String rackName, Resource capability)
|
||||
private NodeManager registerNode(String hostName, int containerManagerPort,
|
||||
int httpPort, String rackName,
|
||||
Resource capability)
|
||||
throws IOException, YarnException {
|
||||
NodeManager nm =
|
||||
new NodeManager(
|
||||
hostName, containerManagerPort, httpPort, rackName, capability,
|
||||
resourceManager);
|
||||
NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
|
||||
rackName, capability, resourceManager);
|
||||
NodeAddedSchedulerEvent nodeAddEvent1 =
|
||||
new NodeAddedSchedulerEvent(resourceManager.getRMContext()
|
||||
.getRMNodes().get(nm.getNodeId()));
|
||||
@ -280,13 +278,13 @@ public void testCapacityScheduler() throws Exception {
|
||||
|
||||
// Register node1
|
||||
String host_0 = "host_0";
|
||||
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
|
||||
NodeManager nm_0 =
|
||||
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(4 * GB, 1));
|
||||
|
||||
// Register node2
|
||||
String host_1 = "host_1";
|
||||
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 =
|
||||
NodeManager nm_1 =
|
||||
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(2 * GB, 1));
|
||||
|
||||
@ -4038,6 +4036,29 @@ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
|
||||
Assert.fail("Cannot find RMContainer");
|
||||
}
|
||||
}
|
||||
@Test
|
||||
public void testRemovedNodeDecomissioningNode() throws Exception {
|
||||
// Register nodemanager
|
||||
NodeManager nm = registerNode("host_decom", 1234, 2345,
|
||||
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
|
||||
|
||||
RMNode node =
|
||||
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
|
||||
// Send a heartbeat to kick the tires on the Scheduler
|
||||
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
|
||||
resourceManager.getResourceScheduler().handle(nodeUpdate);
|
||||
|
||||
// force remove the node to simulate race condition
|
||||
((CapacityScheduler) resourceManager.getResourceScheduler()).getNodeTracker().
|
||||
removeNode(nm.getNodeId());
|
||||
// Kick off another heartbeat with the node state mocked to decommissioning
|
||||
RMNode spyNode =
|
||||
Mockito.spy(resourceManager.getRMContext().getRMNodes()
|
||||
.get(nm.getNodeId()));
|
||||
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
|
||||
resourceManager.getResourceScheduler().handle(
|
||||
new NodeUpdateSchedulerEvent(spyNode));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourceUpdateDecommissioningNode() throws Exception {
|
||||
@ -4064,9 +4085,8 @@ public void handle(Event event) {
|
||||
((AsyncDispatcher) mockDispatcher).start();
|
||||
// Register node
|
||||
String host_0 = "host_0";
|
||||
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
|
||||
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(8 * GB, 4));
|
||||
NodeManager nm_0 = registerNode(host_0, 1234, 2345,
|
||||
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
|
||||
// ResourceRequest priorities
|
||||
Priority priority_0 = Priority.newInstance(0);
|
||||
|
||||
|
@ -82,6 +82,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
@ -4973,6 +4974,30 @@ public void testUserAsDefaultQueueWithLeadingTrailingSpaceUserName()
|
||||
.get(attId3.getApplicationId()).getQueue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemovedNodeDecomissioningNode() throws Exception {
|
||||
// Register nodemanager
|
||||
NodeManager nm = registerNode("host_decom", 1234, 2345,
|
||||
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
|
||||
|
||||
RMNode node =
|
||||
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
|
||||
// Send a heartbeat to kick the tires on the Scheduler
|
||||
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
|
||||
resourceManager.getResourceScheduler().handle(nodeUpdate);
|
||||
|
||||
// Force remove the node to simulate race condition
|
||||
((FairScheduler) resourceManager.getResourceScheduler())
|
||||
.getNodeTracker().removeNode(nm.getNodeId());
|
||||
// Kick off another heartbeat with the node state mocked to decommissioning
|
||||
RMNode spyNode =
|
||||
Mockito.spy(resourceManager.getRMContext().getRMNodes()
|
||||
.get(nm.getNodeId()));
|
||||
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
|
||||
resourceManager.getResourceScheduler().handle(
|
||||
new NodeUpdateSchedulerEvent(spyNode));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourceUpdateDecommissioningNode() throws Exception {
|
||||
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
|
||||
@ -4998,9 +5023,8 @@ public void handle(Event event) {
|
||||
((AsyncDispatcher) mockDispatcher).start();
|
||||
// Register node
|
||||
String host_0 = "host_0";
|
||||
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
|
||||
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(8 * GB, 4));
|
||||
NodeManager nm_0 = registerNode(host_0, 1234, 2345,
|
||||
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
|
||||
|
||||
RMNode node =
|
||||
resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId());
|
||||
@ -5038,13 +5062,12 @@ public void handle(Event event) {
|
||||
Assert.assertEquals(availableResource.getVirtualCores(), 0);
|
||||
}
|
||||
|
||||
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode(
|
||||
String hostName, int containerManagerPort, int httpPort, String rackName,
|
||||
Resource capability) throws IOException, YarnException {
|
||||
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
|
||||
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
|
||||
containerManagerPort, httpPort, rackName, capability,
|
||||
resourceManager);
|
||||
private NodeManager registerNode(String hostName, int containerManagerPort,
|
||||
int httpPort, String rackName,
|
||||
Resource capability)
|
||||
throws IOException, YarnException {
|
||||
NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
|
||||
rackName, capability, resourceManager);
|
||||
|
||||
// after YARN-5375, scheduler event is processed in rm main dispatcher,
|
||||
// wait it processed, or may lead dead lock
|
||||
|
@ -66,6 +66,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
@ -138,14 +139,12 @@ public void tearDown() throws Exception {
|
||||
resourceManager.stop();
|
||||
}
|
||||
|
||||
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
|
||||
registerNode(String hostName, int containerManagerPort, int nmHttpPort,
|
||||
String rackName, Resource capability) throws IOException,
|
||||
YarnException {
|
||||
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
|
||||
new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
|
||||
containerManagerPort, nmHttpPort, rackName, capability,
|
||||
resourceManager);
|
||||
private NodeManager registerNode(String hostName, int containerManagerPort,
|
||||
int nmHttpPort, String rackName,
|
||||
Resource capability)
|
||||
throws IOException, YarnException {
|
||||
NodeManager nm = new NodeManager(hostName, containerManagerPort,
|
||||
nmHttpPort, rackName, capability, resourceManager);
|
||||
NodeAddedSchedulerEvent nodeAddEvent1 =
|
||||
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
|
||||
.get(nm.getNodeId()));
|
||||
@ -1195,6 +1194,30 @@ public void testResourceOverCommit() throws Exception {
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemovedNodeDecomissioningNode() throws Exception {
|
||||
// Register nodemanager
|
||||
NodeManager nm = registerNode("host_decom", 1234, 2345,
|
||||
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
|
||||
|
||||
RMNode node =
|
||||
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
|
||||
// Send a heartbeat to kick the tires on the Scheduler
|
||||
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
|
||||
resourceManager.getResourceScheduler().handle(nodeUpdate);
|
||||
|
||||
// Force remove the node to simulate race condition
|
||||
((FifoScheduler) resourceManager.getResourceScheduler())
|
||||
.getNodeTracker().removeNode(nm.getNodeId());
|
||||
// Kick off another heartbeat with the node state mocked to decommissioning
|
||||
RMNode spyNode =
|
||||
Mockito.spy(resourceManager.getRMContext().getRMNodes()
|
||||
.get(nm.getNodeId()));
|
||||
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
|
||||
resourceManager.getResourceScheduler().handle(
|
||||
new NodeUpdateSchedulerEvent(spyNode));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourceUpdateDecommissioningNode() throws Exception {
|
||||
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
|
||||
@ -1220,9 +1243,8 @@ public void handle(Event event) {
|
||||
((AsyncDispatcher) mockDispatcher).start();
|
||||
// Register node
|
||||
String host_0 = "host_0";
|
||||
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
|
||||
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||
Resources.createResource(8 * GB, 4));
|
||||
NodeManager nm_0 = registerNode(host_0, 1234, 2345,
|
||||
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
|
||||
// ResourceRequest priorities
|
||||
Priority priority_0 = Priority.newInstance(0);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user