YARN-1849. Fixed NPE in ResourceTrackerService#registerNodeManager for UAM. Contributed by Karthik Kambatla
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1580077 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a5c08eed16
commit
f67218809c
@ -539,6 +539,9 @@ Release 2.4.0 - UNRELEASED
|
||||
YARN-1670. Fixed a bug in log-aggregation that can cause the writer to write
|
||||
more log-data than the log-length that it records. (Mit Desai via vinodk)
|
||||
|
||||
YARN-1849. Fixed NPE in ResourceTrackerService#registerNodeManager for UAM
|
||||
(Karthik Kambatla via jianhe )
|
||||
|
||||
Release 2.3.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -31,6 +31,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.VersionUtil;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
@ -187,12 +188,51 @@ public class ResourceTrackerService extends AbstractService implements
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to handle received ContainerStatus. If this corresponds to
|
||||
* the completion of a master-container of a managed AM,
|
||||
* we call the handler for RMAppAttemptContainerFinishedEvent.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
@VisibleForTesting
|
||||
void handleContainerStatus(ContainerStatus containerStatus) {
|
||||
ApplicationAttemptId appAttemptId =
|
||||
containerStatus.getContainerId().getApplicationAttemptId();
|
||||
RMApp rmApp =
|
||||
rmContext.getRMApps().get(appAttemptId.getApplicationId());
|
||||
if (rmApp == null) {
|
||||
LOG.error("Received finished container : "
|
||||
+ containerStatus.getContainerId()
|
||||
+ "for unknown application " + appAttemptId.getApplicationId()
|
||||
+ " Skipping.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Ignoring container completion status for unmanaged AM"
|
||||
+ rmApp.getApplicationId());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
|
||||
Container masterContainer = rmAppAttempt.getMasterContainer();
|
||||
if (masterContainer.getId().equals(containerStatus.getContainerId())
|
||||
&& containerStatus.getState() == ContainerState.COMPLETE) {
|
||||
// sending master container finished event.
|
||||
RMAppAttemptContainerFinishedEvent evt =
|
||||
new RMAppAttemptContainerFinishedEvent(appAttemptId,
|
||||
containerStatus);
|
||||
rmContext.getDispatcher().getEventHandler().handle(evt);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public RegisterNodeManagerResponse registerNodeManager(
|
||||
RegisterNodeManagerRequest request) throws YarnException,
|
||||
IOException {
|
||||
|
||||
NodeId nodeId = request.getNodeId();
|
||||
String host = nodeId.getHost();
|
||||
int cmPort = nodeId.getPort();
|
||||
@ -204,29 +244,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||
LOG.info("received container statuses on node manager register :"
|
||||
+ request.getContainerStatuses());
|
||||
for (ContainerStatus containerStatus : request.getContainerStatuses()) {
|
||||
ApplicationAttemptId appAttemptId =
|
||||
containerStatus.getContainerId().getApplicationAttemptId();
|
||||
RMApp rmApp =
|
||||
rmContext.getRMApps().get(appAttemptId.getApplicationId());
|
||||
if (rmApp != null) {
|
||||
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
|
||||
if (rmAppAttempt != null) {
|
||||
if (rmAppAttempt.getMasterContainer().getId()
|
||||
.equals(containerStatus.getContainerId())
|
||||
&& containerStatus.getState() == ContainerState.COMPLETE) {
|
||||
// sending master container finished event.
|
||||
RMAppAttemptContainerFinishedEvent evt =
|
||||
new RMAppAttemptContainerFinishedEvent(appAttemptId,
|
||||
containerStatus);
|
||||
rmContext.getDispatcher().getEventHandler().handle(evt);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.error("Received finished container :"
|
||||
+ containerStatus.getContainerId()
|
||||
+ " for non existing application :"
|
||||
+ appAttemptId.getApplicationId());
|
||||
}
|
||||
handleContainerStatus(containerStatus);
|
||||
}
|
||||
}
|
||||
RegisterNodeManagerResponse response = recordFactory
|
||||
|
@ -35,9 +35,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
@ -629,7 +631,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||
}
|
||||
}
|
||||
|
||||
private void setMasterContainer(Container container) {
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
public void setMasterContainer(Container container) {
|
||||
masterContainer = container;
|
||||
}
|
||||
|
||||
|
@ -26,8 +26,6 @@ import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
@ -45,21 +43,29 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class TestResourceTrackerService {
|
||||
|
||||
@ -468,26 +474,64 @@ public class TestResourceTrackerService {
|
||||
ClusterMetrics.getMetrics().getUnhealthyNMs());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testNodeRegistrationWithContainers() throws Exception {
|
||||
rm = new MockRM();
|
||||
rm.init(new YarnConfiguration());
|
||||
public void testHandleContainerStatusInvalidCompletions() throws Exception {
|
||||
rm = new MockRM(new YarnConfiguration());
|
||||
rm.start();
|
||||
RMApp app = rm.submitApp(1024);
|
||||
|
||||
MockNM nm = rm.registerNode("host1:1234", 8192);
|
||||
nm.nodeHeartbeat(true);
|
||||
EventHandler handler =
|
||||
spy(rm.getRMContext().getDispatcher().getEventHandler());
|
||||
|
||||
// Register node with some container statuses
|
||||
// Case 1: Unmanaged AM
|
||||
RMApp app = rm.submitApp(1024, true);
|
||||
|
||||
// Case 1.1: AppAttemptId is null
|
||||
ContainerStatus status = ContainerStatus.newInstance(
|
||||
ContainerId.newInstance(ApplicationAttemptId.newInstance(
|
||||
app.getApplicationId(), 2), 1),
|
||||
ContainerState.COMPLETE, "Dummy Completed", 0);
|
||||
rm.getResourceTrackerService().handleContainerStatus(status);
|
||||
verify(handler, never()).handle((Event) any());
|
||||
|
||||
// The following shouldn't throw NPE
|
||||
nm.registerNode(Collections.singletonList(status));
|
||||
assertEquals("Incorrect number of nodes", 1,
|
||||
rm.getRMContext().getRMNodes().size());
|
||||
// Case 1.2: Master container is null
|
||||
RMAppAttemptImpl currentAttempt =
|
||||
(RMAppAttemptImpl) app.getCurrentAppAttempt();
|
||||
currentAttempt.setMasterContainer(null);
|
||||
status = ContainerStatus.newInstance(
|
||||
ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
|
||||
ContainerState.COMPLETE, "Dummy Completed", 0);
|
||||
rm.getResourceTrackerService().handleContainerStatus(status);
|
||||
verify(handler, never()).handle((Event)any());
|
||||
|
||||
// Case 2: Managed AM
|
||||
app = rm.submitApp(1024);
|
||||
|
||||
// Case 2.1: AppAttemptId is null
|
||||
status = ContainerStatus.newInstance(
|
||||
ContainerId.newInstance(ApplicationAttemptId.newInstance(
|
||||
app.getApplicationId(), 2), 1),
|
||||
ContainerState.COMPLETE, "Dummy Completed", 0);
|
||||
try {
|
||||
rm.getResourceTrackerService().handleContainerStatus(status);
|
||||
} catch (Exception e) {
|
||||
// expected - ignore
|
||||
}
|
||||
verify(handler, never()).handle((Event)any());
|
||||
|
||||
// Case 2.2: Master container is null
|
||||
currentAttempt =
|
||||
(RMAppAttemptImpl) app.getCurrentAppAttempt();
|
||||
currentAttempt.setMasterContainer(null);
|
||||
status = ContainerStatus.newInstance(
|
||||
ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
|
||||
ContainerState.COMPLETE, "Dummy Completed", 0);
|
||||
try {
|
||||
rm.getResourceTrackerService().handleContainerStatus(status);
|
||||
} catch (Exception e) {
|
||||
// expected - ignore
|
||||
}
|
||||
verify(handler, never()).handle((Event)any());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
x
Reference in New Issue
Block a user