diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 6de1999284..c17872f227 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -22,6 +22,9 @@ Release 2.0.4-beta - UNRELEASED IMPROVEMENTS + YARN-365. Change NM heartbeat handling to not generate a scheduler event + on each heartbeat. (Xuan Gong via sseth) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index aafa3dbdef..4b1f8f9e6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -106,4 +106,13 @@ public interface RMNode { public List getAppsToCleanup(); public HeartbeatResponse getLastHeartBeatResponse(); + + /** + * Get and clear the list of containerUpdates accumulated across NM + * heartbeats. + * + * @return containerUpdates accumulated across NM heartbeats. + */ + public List pullContainerUpdates(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 83833b9bdb..5db61e7b22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -60,6 +61,8 @@ import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.BuilderUtils.ContainerIdComparator; +import com.google.common.annotations.VisibleForTesting; + /** * This class is used to keep track of all the applications/containers * running on a node. @@ -78,6 +81,9 @@ public class RMNodeImpl implements RMNode, EventHandler { private final ReadLock readLock; private final WriteLock writeLock; + private final ConcurrentLinkedQueue nodeUpdateQueue; + private volatile boolean nextHeartBeat = true; + private final NodeId nodeId; private final RMContext context; private final String hostName; @@ -186,6 +192,7 @@ public class RMNodeImpl implements RMNode, EventHandler { this.stateMachine = stateMachineFactory.make(this); + this.nodeUpdateQueue = new ConcurrentLinkedQueue(); } @Override @@ -400,6 +407,7 @@ public class RMNodeImpl implements RMNode, EventHandler { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Kill containers since node is rejoining. + rmNode.nodeUpdateQueue.clear(); rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); @@ -458,6 +466,7 @@ public class RMNodeImpl implements RMNode, EventHandler { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler + rmNode.nodeUpdateQueue.clear(); rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); rmNode.context.getDispatcher().getEventHandler().handle( @@ -489,6 +498,7 @@ public class RMNodeImpl implements RMNode, EventHandler { statusEvent.getNodeHealthStatus(); rmNode.setNodeHealthStatus(remoteNodeHealthStatus); if (!remoteNodeHealthStatus.getIsNodeHealthy()) { + rmNode.nodeUpdateQueue.clear(); // Inform the scheduler rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); @@ -538,10 +548,16 @@ public class RMNodeImpl implements RMNode, EventHandler { completedContainers.add(remoteContainer); } } - - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers, - completedContainers)); + if(newlyLaunchedContainers.size() != 0 + || completedContainers.size() != 0) { + rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo + (newlyLaunchedContainers, completedContainers)); + } + if(rmNode.nextHeartBeat) { + rmNode.nextHeartBeat = false; + rmNode.context.getDispatcher().getEventHandler().handle( + new NodeUpdateSchedulerEvent(rmNode)); + } rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications( statusEvent.getKeepAliveAppIds()); @@ -584,4 +600,25 @@ public class RMNodeImpl implements RMNode, EventHandler { return NodeState.UNHEALTHY; } } + + @Override + public List pullContainerUpdates() { + List latestContainerInfoList = + new ArrayList(); + while(nodeUpdateQueue.peek() != null){ + latestContainerInfoList.add(nodeUpdateQueue.poll()); + } + this.nextHeartBeat = true; + return latestContainerInfoList; + } + + @VisibleForTesting + public void setNextHeartBeat(boolean nextHeartBeat) { + this.nextHeartBeat = nextHeartBeat; + } + + @VisibleForTesting + public int getQueueSize() { + return nodeUpdateQueue.size(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/UpdatedContainerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/UpdatedContainerInfo.java new file mode 100644 index 0000000000..284b53665a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/UpdatedContainerInfo.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; + +public class UpdatedContainerInfo { + private List newlyLaunchedContainers; + private List completedContainers; + + public UpdatedContainerInfo() { + } + + public UpdatedContainerInfo(List newlyLaunchedContainers + , List completedContainers) { + this.newlyLaunchedContainers = newlyLaunchedContainers; + this.completedContainers = completedContainers; + } + + public List getNewlyLaunchedContainers() { + return this.newlyLaunchedContainers; + } + + public List getCompletedContainers() { + return this.completedContainers; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 2ce3a464a8..2fc754069f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -562,15 +563,20 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable { return root.getQueueUserAclInfo(user); } - private synchronized void nodeUpdate(RMNode nm, - List newlyLaunchedContainers, - List completedContainers) { + private synchronized void nodeUpdate(RMNode nm) { if (LOG.isDebugEnabled()) { LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource); } - - FiCaSchedulerNode node = getNode(nm.getNodeID()); + FiCaSchedulerNode node = getNode(nm.getNodeID()); + List containerInfoList = nm.pullContainerUpdates(); + List newlyLaunchedContainers = new ArrayList(); + List completedContainers = new ArrayList(); + for(UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } + // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { containerLaunchedOnNode(launchedContainer.getContainerId(), node); @@ -666,9 +672,7 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable { case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - nodeUpdate(nodeUpdatedEvent.getRMNode(), - nodeUpdatedEvent.getNewlyLaunchedContainers(), - nodeUpdatedEvent.getCompletedContainers()); + nodeUpdate(nodeUpdatedEvent.getRMNode()); } break; case APP_ADDED: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java index ff51d62d91..7a8686c83f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java @@ -18,35 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; -import java.util.List; - -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; public class NodeUpdateSchedulerEvent extends SchedulerEvent { private final RMNode rmNode; - private final List newlyLaunchedContainers; - private final List completedContainersStatuses; - public NodeUpdateSchedulerEvent(RMNode rmNode, - List newlyLaunchedContainers, - List completedContainers) { + public NodeUpdateSchedulerEvent(RMNode rmNode) { super(SchedulerEventType.NODE_UPDATE); this.rmNode = rmNode; - this.newlyLaunchedContainers = newlyLaunchedContainers; - this.completedContainersStatuses = completedContainers; } public RMNode getRMNode() { return rmNode; } - - public List getNewlyLaunchedContainers() { - return newlyLaunchedContainers; - } - - public List getCompletedContainers() { - return completedContainersStatuses; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index ab0f1a4f6f..27d25d75dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.SystemClock; @@ -61,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -750,15 +750,20 @@ public class FairScheduler implements ResourceScheduler { /** * Process a heartbeat update from a node. */ - private synchronized void nodeUpdate(RMNode nm, - List newlyLaunchedContainers, - List completedContainers) { + private synchronized void nodeUpdate(RMNode nm) { if (LOG.isDebugEnabled()) { LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity); } eventLog.log("HEARTBEAT", nm.getHostName()); FSSchedulerNode node = nodes.get(nm.getNodeID()); + List containerInfoList = nm.pullContainerUpdates(); + List newlyLaunchedContainers = new ArrayList(); + List completedContainers = new ArrayList(); + for(UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { containerLaunchedOnNode(launchedContainer.getContainerId(), node); @@ -864,9 +869,7 @@ public class FairScheduler implements ResourceScheduler { throw new RuntimeException("Unexpected event type: " + event); } NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - nodeUpdate(nodeUpdatedEvent.getRMNode(), - nodeUpdatedEvent.getNewlyLaunchedContainers(), - nodeUpdatedEvent.getCompletedContainers()); + nodeUpdate(nodeUpdatedEvent.getRMNode()); break; case APP_ADDED: if (!(event instanceof AppAddedSchedulerEvent)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 880b98ae61..3f25537823 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -576,11 +577,16 @@ public class FifoScheduler implements ResourceScheduler, Configurable { return assignedContainers; } - private synchronized void nodeUpdate(RMNode rmNode, - List newlyLaunchedContainers, - List completedContainers) { + private synchronized void nodeUpdate(RMNode rmNode) { FiCaSchedulerNode node = getNode(rmNode.getNodeID()); + List containerInfoList = rmNode.pullContainerUpdates(); + List newlyLaunchedContainers = new ArrayList(); + List completedContainers = new ArrayList(); + for(UpdatedContainerInfo containerInfo : containerInfoList) { + newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); + completedContainers.addAll(containerInfo.getCompletedContainers()); + } // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { containerLaunchedOnNode(launchedContainer.getContainerId(), node); @@ -628,9 +634,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable { { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; - nodeUpdate(nodeUpdatedEvent.getRMNode(), - nodeUpdatedEvent.getNewlyLaunchedContainers(), - nodeUpdatedEvent.getCompletedContainers()); + nodeUpdate(nodeUpdatedEvent.getRMNode()); } break; case APP_ADDED: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 0c56a27ada..37e1017038 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -31,6 +33,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import com.google.common.collect.Lists; @@ -187,6 +190,11 @@ public class MockNodes { public HeartbeatResponse getLastHeartBeatResponse() { return null; } + + @Override + public List pullContainerUpdates() { + return new ArrayList(); + } }; private static RMNode buildRMNode(int rack, final Resource perNode, NodeState state, String httpAddr) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 2a708ed2d5..9190433ce4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -201,7 +201,7 @@ public class TestFifoScheduler { testMinimumAllocation(conf, allocMB / 2); } - @Test + @Test (timeout = 5000) public void testReconnectedNode() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); conf.setQueues("default", new String[] {"default"}); @@ -215,19 +215,19 @@ public class TestFifoScheduler { fs.handle(new NodeAddedSchedulerEvent(n1)); fs.handle(new NodeAddedSchedulerEvent(n2)); List emptyList = new ArrayList(); - fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList)); + fs.handle(new NodeUpdateSchedulerEvent(n1)); Assert.assertEquals(6 * GB, fs.getRootQueueMetrics().getAvailableMB()); // reconnect n1 with downgraded memory n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1); fs.handle(new NodeRemovedSchedulerEvent(n1)); fs.handle(new NodeAddedSchedulerEvent(n1)); - fs.handle(new NodeUpdateSchedulerEvent(n1, emptyList, emptyList)); + fs.handle(new NodeUpdateSchedulerEvent(n1)); Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB()); } - @Test + @Test (timeout = 5000) public void testHeadroom() throws Exception { Configuration conf = new Configuration(); @@ -275,7 +275,7 @@ public class TestFifoScheduler { fs.allocate(appAttemptId2, ask2, emptyId); // Trigger container assignment - fs.handle(new NodeUpdateSchedulerEvent(n1, emptyStatus, emptyStatus)); + fs.handle(new NodeUpdateSchedulerEvent(n1)); // Get the allocation for the applications and verify headroom Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 6c14008626..982d2af506 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import java.util.ArrayList; import java.util.Collections; @@ -42,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -63,7 +65,7 @@ public class TestRMNodeTransitions { private YarnScheduler scheduler; private SchedulerEventType eventType; - private List completedContainers; + private List completedContainers = new ArrayList(); private final class TestSchedulerEventDispatcher implements EventHandler { @@ -89,10 +91,11 @@ public class TestRMNodeTransitions { final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]); eventType = event.getType(); if (eventType == SchedulerEventType.NODE_UPDATE) { - completedContainers = - ((NodeUpdateSchedulerEvent)event).getCompletedContainers(); - } else { - completedContainers = null; + List lastestContainersInfoList = + ((NodeUpdateSchedulerEvent)event).getRMNode().pullContainerUpdates(); + for(UpdatedContainerInfo lastestContainersInfo : lastestContainersInfoList) { + completedContainers.addAll(lastestContainersInfo.getCompletedContainers()); + } } return null; } @@ -125,16 +128,16 @@ public class TestRMNodeTransitions { return event; } - @Test + @Test (timeout = 5000) public void testExpiredContainer() { // Start the node node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container - ContainerId completedContainerId = BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId( - BuilderUtils.newApplicationId(0, 0), 0), 0); + ContainerId completedContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(0, 0), 0), 0); node.handle(new RMNodeCleanContainerEvent(null, completedContainerId)); Assert.assertEquals(1, node.getContainersToCleanUp().size()); @@ -146,9 +149,110 @@ public class TestRMNodeTransitions { doReturn(Collections.singletonList(containerStatus)). when(statusEvent).getContainers(); node.handle(statusEvent); - Assert.assertEquals(0, completedContainers.size()); + /* Expect the scheduler call handle function 2 times + * 1. RMNode status from new to Running, handle the add_node event + * 2. handle the node update event + */ + verify(scheduler,times(2)).handle(any(NodeUpdateSchedulerEvent.class)); } + + @Test (timeout = 5000) + public void testContainerUpdate() throws InterruptedException{ + //Start the node + node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + + NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); + RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null); + node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + + ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(0, 0), 0), 0); + ContainerId completedContainerIdFromNode2_1 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(1, 1), 1), 1); + ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(1, 1), 1), 2); + + RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(); + RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(); + RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(); + + ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class); + ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class); + ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class); + doReturn(completedContainerIdFromNode1).when(containerStatusFromNode1) + .getContainerId(); + doReturn(Collections.singletonList(containerStatusFromNode1)) + .when(statusEventFromNode1).getContainers(); + node.handle(statusEventFromNode1); + Assert.assertEquals(1, completedContainers.size()); + Assert.assertEquals(completedContainerIdFromNode1, + completedContainers.get(0).getContainerId()); + + completedContainers.clear(); + + doReturn(completedContainerIdFromNode2_1).when(containerStatusFromNode2_1) + .getContainerId(); + doReturn(Collections.singletonList(containerStatusFromNode2_1)) + .when(statusEventFromNode2_1).getContainers(); + + doReturn(completedContainerIdFromNode2_2).when(containerStatusFromNode2_2) + .getContainerId(); + doReturn(Collections.singletonList(containerStatusFromNode2_2)) + .when(statusEventFromNode2_2).getContainers(); + + node2.setNextHeartBeat(false); + node2.handle(statusEventFromNode2_1); + node2.setNextHeartBeat(true); + node2.handle(statusEventFromNode2_2); + + Assert.assertEquals(2, completedContainers.size()); + Assert.assertEquals(completedContainerIdFromNode2_1,completedContainers.get(0) + .getContainerId()); + Assert.assertEquals(completedContainerIdFromNode2_2,completedContainers.get(1) + .getContainerId()); + } + + @Test (timeout = 5000) + public void testStatusChange(){ + //Start the node + node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + //Add info to the queue first + node.setNextHeartBeat(false); + + ContainerId completedContainerId1 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(0, 0), 0), 0); + ContainerId completedContainerId2 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(1, 1), 1), 1); + + RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(); + RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(); + + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + ContainerStatus containerStatus2 = mock(ContainerStatus.class); + + doReturn(completedContainerId1).when(containerStatus1).getContainerId(); + doReturn(Collections.singletonList(containerStatus1)) + .when(statusEvent1).getContainers(); + + doReturn(completedContainerId2).when(containerStatus2).getContainerId(); + doReturn(Collections.singletonList(containerStatus2)) + .when(statusEvent2).getContainers(); + + verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class)); + node.handle(statusEvent1); + node.handle(statusEvent2); + verify(scheduler,times(1)).handle(any(NodeUpdateSchedulerEvent.class)); + Assert.assertEquals(2, node.getQueueSize()); + node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE)); + Assert.assertEquals(0, node.getQueueSize()); + } + @Test public void testRunningExpire() { RMNodeImpl node = getRunningNode(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index a0e17588e0..a5cfadd255 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -30,7 +30,6 @@ import java.io.PrintWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -276,7 +275,7 @@ public class TestFairScheduler { Assert.assertEquals(3, queueManager.getLeafQueues().size()); } - @Test + @Test (timeout = 5000) public void testSimpleContainerAllocation() { // Add a node RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); @@ -292,8 +291,7 @@ public class TestFairScheduler { scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, - new ArrayList(), new ArrayList()); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); // Asked for less than min_allocation. @@ -301,15 +299,14 @@ public class TestFairScheduler { scheduler.getQueueManager().getQueue("queue1"). getResourceUsage().getMemory()); - NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, - new ArrayList(), new ArrayList()); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). getResourceUsage().getMemory()); } - @Test + @Test (timeout = 5000) public void testSimpleContainerReservation() throws InterruptedException { // Add a node RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); @@ -319,8 +316,7 @@ public class TestFairScheduler { // Queue 1 requests full capacity of node createSchedulingRequest(1024, "queue1", "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, - new ArrayList(), new ArrayList()); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); // Make sure queue 1 is allocated app capacity @@ -340,8 +336,7 @@ public class TestFairScheduler { // Now another node checks in with capacity RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); - NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, - new ArrayList(), new ArrayList()); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(nodeEvent2); scheduler.handle(updateEvent2); @@ -738,7 +733,7 @@ public class TestFairScheduler { assertEquals(300000, queueManager.getFairSharePreemptionTimeout()); } - @Test + @Test (timeout = 5000) public void testIsStarvedForMinShare() throws Exception { Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); @@ -767,8 +762,7 @@ public class TestFairScheduler { // Queue A wants 3 * 1024. Node update gives this all to A createSchedulingRequest(3 * 1024, "queueA", "user1"); scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1, - new LinkedList(), new LinkedList()); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeEvent2); // Queue B arrives and wants 1 * 1024 @@ -797,7 +791,7 @@ public class TestFairScheduler { } } - @Test + @Test (timeout = 5000) public void testIsStarvedForFairShare() throws Exception { Configuration conf = createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); @@ -826,8 +820,7 @@ public class TestFairScheduler { // Queue A wants 3 * 1024. Node update gives this all to A createSchedulingRequest(3 * 1024, "queueA", "user1"); scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1, - new LinkedList(), new LinkedList()); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeEvent2); // Queue B arrives and wants 1 * 1024 @@ -857,7 +850,7 @@ public class TestFairScheduler { } } - @Test + @Test (timeout = 5000) /** * Make sure containers are chosen to be preempted in the correct order. Right * now this means decreasing order of priority. @@ -921,16 +914,13 @@ public class TestFairScheduler { // Sufficient node check-ins to fully schedule containers for (int i = 0; i < 2; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, - new LinkedList(), new LinkedList()); + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeUpdate1); - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2, - new LinkedList(), new LinkedList()); + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(nodeUpdate2); - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3, - new LinkedList(), new LinkedList()); + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); scheduler.handle(nodeUpdate3); } @@ -991,7 +981,7 @@ public class TestFairScheduler { assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); } - @Test + @Test (timeout = 5000) /** * Tests the timing of decision to preempt tasks. */ @@ -1062,16 +1052,13 @@ public class TestFairScheduler { // Sufficient node check-ins to fully schedule containers for (int i = 0; i < 2; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, - new LinkedList(), new LinkedList()); + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(nodeUpdate1); - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2, - new LinkedList(), new LinkedList()); + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(nodeUpdate2); - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3, - new LinkedList(), new LinkedList()); + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); scheduler.handle(nodeUpdate3); } @@ -1119,7 +1106,7 @@ public class TestFairScheduler { Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime()))); } - @Test + @Test (timeout = 5000) public void testMultipleContainersWaitingForReservation() { // Add a node RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); @@ -1129,8 +1116,7 @@ public class TestFairScheduler { // Request full capacity of node createSchedulingRequest(1024, "queue1", "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, - new ArrayList(), new ArrayList()); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue2", "user2", 1); @@ -1146,7 +1132,7 @@ public class TestFairScheduler { scheduler.applications.get(attId2).getCurrentReservation().getMemory()); } - @Test + @Test (timeout = 5000) public void testUserMaxRunningApps() throws Exception { // Set max running apps Configuration conf = createConfiguration(); @@ -1175,8 +1161,7 @@ public class TestFairScheduler { "user1", 1); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, - new ArrayList(), new ArrayList()); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); // App 1 should be running @@ -1201,7 +1186,7 @@ public class TestFairScheduler { assertEquals(2, scheduler.applications.get(attId1).getLiveContainers().size()); } - @Test + @Test (timeout = 5000) public void testReservationWhileMultiplePriorities() { // Add a node RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); @@ -1211,8 +1196,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", "user1", 1, 2); scheduler.update(); - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, - new ArrayList(), new ArrayList()); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); FSSchedulerApp app = scheduler.applications.get(attId); @@ -1285,7 +1269,7 @@ public class TestFairScheduler { assertNull("The application was allowed", app2); } - @Test + @Test (timeout = 5000) public void testMultipleNodesSingleRackRequest() throws Exception { RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); @@ -1312,22 +1296,20 @@ public class TestFairScheduler { // node 1 checks in scheduler.update(); - NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1, - new ArrayList(), new ArrayList()); + NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent1); // should assign node local assertEquals(1, scheduler.applications.get(appId).getLiveContainers().size()); // node 2 checks in scheduler.update(); - NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, - new ArrayList(), new ArrayList()); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); // should assign rack local assertEquals(2, scheduler.applications.get(appId).getLiveContainers().size()); } - @Test + @Test (timeout = 5000) public void testFifoWithinQueue() throws Exception { RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(3072)); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); @@ -1351,8 +1333,7 @@ public class TestFairScheduler { // Because tests set assignmultiple to false, each heartbeat assigns a single // container. - NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, - new ArrayList(), new ArrayList()); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); assertEquals(1, app1.getLiveContainers().size());