diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1b719701dd..cfb0052a20 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -539,6 +539,9 @@ Release 2.4.0 - UNRELEASED YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write more log-data than the log-length that it records. (Mit Desai via vinodk) + YARN-1849. Fixed NPE in ResourceTrackerService#registerNodeManager for UAM + (Karthik Kambatla via jianhe ) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 8a2c53958c..1d4032048e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -187,12 +188,51 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + /** + * Helper method to handle received ContainerStatus. If this corresponds to + * the completion of a master-container of a managed AM, + * we call the handler for RMAppAttemptContainerFinishedEvent. + */ + @SuppressWarnings("unchecked") + @VisibleForTesting + void handleContainerStatus(ContainerStatus containerStatus) { + ApplicationAttemptId appAttemptId = + containerStatus.getContainerId().getApplicationAttemptId(); + RMApp rmApp = + rmContext.getRMApps().get(appAttemptId.getApplicationId()); + if (rmApp == null) { + LOG.error("Received finished container : " + + containerStatus.getContainerId() + + "for unknown application " + appAttemptId.getApplicationId() + + " Skipping."); + return; + } + + if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring container completion status for unmanaged AM" + + rmApp.getApplicationId()); + } + return; + } + + RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); + Container masterContainer = rmAppAttempt.getMasterContainer(); + if (masterContainer.getId().equals(containerStatus.getContainerId()) + && containerStatus.getState() == ContainerState.COMPLETE) { + // sending master container finished event. + RMAppAttemptContainerFinishedEvent evt = + new RMAppAttemptContainerFinishedEvent(appAttemptId, + containerStatus); + rmContext.getDispatcher().getEventHandler().handle(evt); + } + } + @SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { - NodeId nodeId = request.getNodeId(); String host = nodeId.getHost(); int cmPort = nodeId.getPort(); @@ -204,29 +244,7 @@ public RegisterNodeManagerResponse registerNodeManager( LOG.info("received container statuses on node manager register :" + request.getContainerStatuses()); for (ContainerStatus containerStatus : request.getContainerStatuses()) { - ApplicationAttemptId appAttemptId = - containerStatus.getContainerId().getApplicationAttemptId(); - RMApp rmApp = - rmContext.getRMApps().get(appAttemptId.getApplicationId()); - if (rmApp != null) { - RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); - if (rmAppAttempt != null) { - if (rmAppAttempt.getMasterContainer().getId() - .equals(containerStatus.getContainerId()) - && containerStatus.getState() == ContainerState.COMPLETE) { - // sending master container finished event. - RMAppAttemptContainerFinishedEvent evt = - new RMAppAttemptContainerFinishedEvent(appAttemptId, - containerStatus); - rmContext.getDispatcher().getEventHandler().handle(evt); - } - } - } else { - LOG.error("Received finished container :" - + containerStatus.getContainerId() - + " for non existing application :" - + appAttemptId.getApplicationId()); - } + handleContainerStatus(containerStatus); } } RegisterNodeManagerResponse response = recordFactory diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 697a18099b..3e90ec8ec1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -35,9 +35,11 @@ import javax.crypto.SecretKey; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -629,7 +631,9 @@ public Container getMasterContainer() { } } - private void setMasterContainer(Container container) { + @InterfaceAudience.Private + @VisibleForTesting + public void setMasterContainer(Container container) { masterContainer = container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 303e0fb56e..2f16b85699 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -26,8 +26,6 @@ import java.util.HashMap; import java.util.List; -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.metrics2.MetricsSystem; @@ -45,21 +43,29 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; + import org.junit.After; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; public class TestResourceTrackerService { @@ -468,26 +474,64 @@ private void checkUnealthyNMCount(MockRM rm, MockNM nm1, boolean health, ClusterMetrics.getMetrics().getUnhealthyNMs()); } + @SuppressWarnings("unchecked") @Test - public void testNodeRegistrationWithContainers() throws Exception { - rm = new MockRM(); - rm.init(new YarnConfiguration()); + public void testHandleContainerStatusInvalidCompletions() throws Exception { + rm = new MockRM(new YarnConfiguration()); rm.start(); - RMApp app = rm.submitApp(1024); - MockNM nm = rm.registerNode("host1:1234", 8192); - nm.nodeHeartbeat(true); + EventHandler handler = + spy(rm.getRMContext().getDispatcher().getEventHandler()); - // Register node with some container statuses + // Case 1: Unmanaged AM + RMApp app = rm.submitApp(1024, true); + + // Case 1.1: AppAttemptId is null ContainerStatus status = ContainerStatus.newInstance( ContainerId.newInstance(ApplicationAttemptId.newInstance( app.getApplicationId(), 2), 1), ContainerState.COMPLETE, "Dummy Completed", 0); + rm.getResourceTrackerService().handleContainerStatus(status); + verify(handler, never()).handle((Event) any()); - // The following shouldn't throw NPE - nm.registerNode(Collections.singletonList(status)); - assertEquals("Incorrect number of nodes", 1, - rm.getRMContext().getRMNodes().size()); + // Case 1.2: Master container is null + RMAppAttemptImpl currentAttempt = + (RMAppAttemptImpl) app.getCurrentAppAttempt(); + currentAttempt.setMasterContainer(null); + status = ContainerStatus.newInstance( + ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), + ContainerState.COMPLETE, "Dummy Completed", 0); + rm.getResourceTrackerService().handleContainerStatus(status); + verify(handler, never()).handle((Event)any()); + + // Case 2: Managed AM + app = rm.submitApp(1024); + + // Case 2.1: AppAttemptId is null + status = ContainerStatus.newInstance( + ContainerId.newInstance(ApplicationAttemptId.newInstance( + app.getApplicationId(), 2), 1), + ContainerState.COMPLETE, "Dummy Completed", 0); + try { + rm.getResourceTrackerService().handleContainerStatus(status); + } catch (Exception e) { + // expected - ignore + } + verify(handler, never()).handle((Event)any()); + + // Case 2.2: Master container is null + currentAttempt = + (RMAppAttemptImpl) app.getCurrentAppAttempt(); + currentAttempt.setMasterContainer(null); + status = ContainerStatus.newInstance( + ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), + ContainerState.COMPLETE, "Dummy Completed", 0); + try { + rm.getResourceTrackerService().handleContainerStatus(status); + } catch (Exception e) { + // expected - ignore + } + verify(handler, never()).handle((Event)any()); } @Test