diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c73b06f044..6748d60a7f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -299,6 +299,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode. (Siddharth Seth via vinodkv) + MAPREDUCE-3530. Fixed an NPE occuring during scheduling in the + ResourceManager. (Arun C Murthy via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 24fc80d442..2cadd89071 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -262,6 +262,16 @@ public List pullAppsToCleanup() { } + @Private + public List getContainersToCleanUp() { + this.readLock.lock(); + try { + return new ArrayList(containersToClean); + } finally { + this.readLock.unlock(); + } + } + @Override public List pullContainersToCleanUp() { @@ -342,7 +352,6 @@ public static class CleanUpContainerTransition implements @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - rmNode.containersToClean.add((( RMNodeCleanContainerEvent) event).getContainerId()); } @@ -396,8 +405,17 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { List completedContainers = new ArrayList(); for (ContainerStatus remoteContainer : statusEvent.getContainers()) { - // Process running containers ContainerId containerId = remoteContainer.getContainerId(); + + // Don't bother with containers already scheduled for cleanup, + // the scheduler doens't need to know any more about this container + if (rmNode.containersToClean.contains(containerId)) { + LOG.info("Container " + containerId + " already scheduled for " + + "cleanup, no further processing"); + continue; + } + + // Process running containers if (remoteContainer.getState() == ContainerState.RUNNING) { if (!rmNode.justLaunchedContainers.containsKey(containerId)) { // Just launched container. RM knows about it the first time. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java new file mode 100644 index 0000000000..6a717a4daf --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.util.Collections; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore; +import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestRMNodeTransitions { + + RMNodeImpl node; + + private RMContext rmContext; + private YarnScheduler scheduler; + + private SchedulerEventType eventType; + private List completedContainers; + + private final class TestSchedulerEventDispatcher implements + EventHandler { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + } + + @Before + public void setUp() throws Exception { + InlineDispatcher rmDispatcher = new InlineDispatcher(); + + rmContext = + new RMContextImpl(new MemStore(), rmDispatcher, null, null, null); + scheduler = mock(YarnScheduler.class); + doAnswer( + new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]); + eventType = event.getType(); + if (eventType == SchedulerEventType.NODE_UPDATE) { + completedContainers = + ((NodeUpdateSchedulerEvent)event).getCompletedContainers(); + } else { + completedContainers = null; + } + return null; + } + } + ).when(scheduler).handle(any(SchedulerEvent.class)); + + rmDispatcher.register(SchedulerEventType.class, + new TestSchedulerEventDispatcher()); + + + node = new RMNodeImpl(null, rmContext, null, 0, 0, null, null); + + } + + @After + public void tearDown() throws Exception { + } + + private RMNodeStatusEvent getMockRMNodeStatusEvent() { + HeartbeatResponse response = mock(HeartbeatResponse.class); + + NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); + Boolean yes = new Boolean(true); + doReturn(yes).when(healthStatus).getIsNodeHealthy(); + + RMNodeStatusEvent event = mock(RMNodeStatusEvent.class); + doReturn(healthStatus).when(event).getNodeHealthStatus(); + doReturn(response).when(event).getLatestResponse(); + doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType(); + return event; + } + + @Test + public void testExpiredContainer() { + // Start the node + node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED)); + verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); + + // Expire a container + ContainerId completedContainerId = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + BuilderUtils.newApplicationId(0, 0), 0), 0); + node.handle(new RMNodeCleanContainerEvent(null, completedContainerId)); + Assert.assertEquals(1, node.getContainersToCleanUp().size()); + + // Now verify that scheduler isn't notified of an expired container + // by checking number of 'completedContainers' it got in the previous event + RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(); + ContainerStatus containerStatus = mock(ContainerStatus.class); + doReturn(completedContainerId).when(containerStatus).getContainerId(); + doReturn(Collections.singletonList(containerStatus)). + when(statusEvent).getContainers(); + node.handle(statusEvent); + Assert.assertEquals(0, completedContainers.size()); + } + +}