YARN-5197. RM leaks containers if running container disappears from node update. Contributed by Jason Lowe.

This commit is contained in:
Rohith Sharma K S 2016-06-11 10:22:27 +05:30
parent 8a1dccecce
commit e0f4620cc7
3 changed files with 121 additions and 17 deletions

View File

@ -24,6 +24,7 @@
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -72,6 +73,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@ -1311,6 +1313,7 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
new ArrayList<ContainerStatus>(); new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = List<ContainerStatus> completedContainers =
new ArrayList<ContainerStatus>(); new ArrayList<ContainerStatus>();
int numRemoteRunningContainers = 0;
for (ContainerStatus remoteContainer : containerStatuses) { for (ContainerStatus remoteContainer : containerStatuses) {
ContainerId containerId = remoteContainer.getContainerId(); ContainerId containerId = remoteContainer.getContainerId();
@ -1344,6 +1347,7 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
if (remoteContainer.getState() == ContainerState.RUNNING) { if (remoteContainer.getState() == ContainerState.RUNNING) {
// Process only GUARANTEED containers in the RM. // Process only GUARANTEED containers in the RM.
if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) { if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
++numRemoteRunningContainers;
if (!launchedContainers.contains(containerId)) { if (!launchedContainers.contains(containerId)) {
// Just launched container. RM knows about it the first time. // Just launched container. RM knows about it the first time.
launchedContainers.add(containerId); launchedContainers.add(containerId);
@ -1366,12 +1370,45 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
completedContainers.add(remoteContainer); completedContainers.add(remoteContainer);
} }
} }
completedContainers.addAll(findLostContainers(
numRemoteRunningContainers, containerStatuses));
if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers, nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
completedContainers)); completedContainers));
} }
} }
private List<ContainerStatus> findLostContainers(int numRemoteRunning,
List<ContainerStatus> containerStatuses) {
if (numRemoteRunning >= launchedContainers.size()) {
return Collections.emptyList();
}
Set<ContainerId> nodeContainers =
new HashSet<ContainerId>(numRemoteRunning);
List<ContainerStatus> lostContainers = new ArrayList<ContainerStatus>(
launchedContainers.size() - numRemoteRunning);
for (ContainerStatus remoteContainer : containerStatuses) {
if (remoteContainer.getState() == ContainerState.RUNNING
&& remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
nodeContainers.add(remoteContainer.getContainerId());
}
}
Iterator<ContainerId> iter = launchedContainers.iterator();
while (iter.hasNext()) {
ContainerId containerId = iter.next();
if (!nodeContainers.contains(containerId)) {
String diag = "Container " + containerId
+ " was running but not reported from " + nodeId;
LOG.warn(diag);
lostContainers.add(SchedulerUtils.createAbnormalContainerStatus(
containerId, diag));
iter.remove();
}
}
return lostContainers;
}
private void handleLogAggregationStatus( private void handleLogAggregationStatus(
List<LogAggregationReport> logAggregationReportsForApps) { List<LogAggregationReport> logAggregationReportsForApps) {
for (LogAggregationReport report : logAggregationReportsForApps) { for (LogAggregationReport report : logAggregationReportsForApps) {

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -57,6 +58,8 @@ public class MockNM {
private MasterKey currentContainerTokenMasterKey; private MasterKey currentContainerTokenMasterKey;
private MasterKey currentNMTokenMasterKey; private MasterKey currentNMTokenMasterKey;
private String version; private String version;
private Map<ContainerId, ContainerStatus> containerStats =
new HashMap<ContainerId, ContainerStatus>();
public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
// scale vcores based on the requested memory // scale vcores based on the requested memory
@ -106,14 +109,12 @@ public void containerStatus(ContainerStatus containerStatus) throws Exception {
} }
public void containerIncreaseStatus(Container container) throws Exception { public void containerIncreaseStatus(Container container) throws Exception {
Map<ApplicationId, List<ContainerStatus>> conts = new HashMap<>();
ContainerStatus containerStatus = BuilderUtils.newContainerStatus( ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
container.getId(), ContainerState.RUNNING, "Success", 0, container.getId(), ContainerState.RUNNING, "Success", 0,
container.getResource()); container.getResource());
conts.put(container.getId().getApplicationAttemptId().getApplicationId(),
Collections.singletonList(containerStatus));
List<Container> increasedConts = Collections.singletonList(container); List<Container> increasedConts = Collections.singletonList(container);
nodeHeartbeat(conts, increasedConts, true, ++responseId); nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts,
true, ++responseId);
} }
public RegisterNodeManagerResponse registerNode() throws Exception { public RegisterNodeManagerResponse registerNode() throws Exception {
@ -147,18 +148,27 @@ public RegisterNodeManagerResponse registerNode(
memory = (int) newResource.getMemorySize(); memory = (int) newResource.getMemorySize();
vCores = newResource.getVirtualCores(); vCores = newResource.getVirtualCores();
} }
containerStats.clear();
if (containerReports != null) {
for (NMContainerStatus report : containerReports) {
if (report.getContainerState() != ContainerState.COMPLETE) {
containerStats.put(report.getContainerId(),
ContainerStatus.newInstance(report.getContainerId(),
report.getContainerState(), report.getDiagnostics(),
report.getContainerExitStatus()));
}
}
}
return registrationResponse; return registrationResponse;
} }
public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
isHealthy, ++responseId); Collections.<Container>emptyList(), isHealthy, ++responseId);
} }
public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
long containerId, ContainerState containerState) throws Exception { long containerId, ContainerState containerState) throws Exception {
HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
new HashMap<ApplicationId, List<ContainerStatus>>(1);
ContainerStatus containerStatus = BuilderUtils.newContainerStatus( ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId(attemptId, containerId), containerState, BuilderUtils.newContainerId(attemptId, containerId), containerState,
"Success", 0, BuilderUtils.newResource(memory, vCores)); "Success", 0, BuilderUtils.newResource(memory, vCores));
@ -166,8 +176,8 @@ public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
new ArrayList<ContainerStatus>(1); new ArrayList<ContainerStatus>(1);
containerStatusList.add(containerStatus); containerStatusList.add(containerStatus);
Log.info("ContainerStatus: " + containerStatus); Log.info("ContainerStatus: " + containerStatus);
nodeUpdate.put(attemptId.getApplicationId(), containerStatusList); return nodeHeartbeat(containerStatusList,
return nodeHeartbeat(nodeUpdate, true); Collections.<Container>emptyList(), true, ++responseId);
} }
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId, public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
@ -177,19 +187,32 @@ public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId, public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception { List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
return nodeHeartbeat(conts, new ArrayList<Container>(), isHealthy, resId); ArrayList<ContainerStatus> updatedStats = new ArrayList<ContainerStatus>();
for (List<ContainerStatus> stats : conts.values()) {
updatedStats.addAll(stats);
}
return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
isHealthy, resId);
} }
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId, public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
List<ContainerStatus>> conts, List<Container> increasedConts, List<Container> increasedConts, boolean isHealthy, int resId)
boolean isHealthy, int resId) throws Exception { throws Exception {
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus status = Records.newRecord(NodeStatus.class); NodeStatus status = Records.newRecord(NodeStatus.class);
status.setResponseId(resId); status.setResponseId(resId);
status.setNodeId(nodeId); status.setNodeId(nodeId);
for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) { ArrayList<ContainerId> completedContainers = new ArrayList<ContainerId>();
Log.info("entry.getValue() " + entry.getValue()); for (ContainerStatus stat : updatedStats) {
status.setContainersStatuses(entry.getValue()); if (stat.getState() == ContainerState.COMPLETE) {
completedContainers.add(stat.getContainerId());
}
containerStats.put(stat.getContainerId(), stat);
}
status.setContainersStatuses(
new ArrayList<ContainerStatus>(containerStats.values()));
for (ContainerId cid : completedContainers) {
containerStats.remove(cid);
} }
status.setIncreasedContainers(increasedConts); status.setIncreasedContainers(increasedConts);
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class); NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -1021,4 +1022,47 @@ public void testResourceUpdateOnRecommissioningNode() {
Resource originalCapacity = node.getOriginalTotalCapability(); Resource originalCapacity = node.getOriginalTotalCapability();
assertEquals("Original total capability not null after recommission", null, originalCapacity); assertEquals("Original total capability not null after recommission", null, originalCapacity);
} }
@Test
public void testDisappearingContainer() {
ContainerId cid1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(1, 1), 1), 1);
ContainerId cid2 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(2, 2), 2), 2);
ArrayList<ContainerStatus> containerStats =
new ArrayList<ContainerStatus>();
containerStats.add(ContainerStatus.newInstance(cid1,
ContainerState.RUNNING, "", -1));
containerStats.add(ContainerStatus.newInstance(cid2,
ContainerState.RUNNING, "", -1));
node = getRunningNode();
node.handle(getMockRMNodeStatusEvent(containerStats));
assertEquals("unexpected number of running containers",
2, node.getLaunchedContainers().size());
Assert.assertTrue("first container not running",
node.getLaunchedContainers().contains(cid1));
Assert.assertTrue("second container not running",
node.getLaunchedContainers().contains(cid2));
assertEquals("already completed containers",
0, completedContainers.size());
containerStats.remove(0);
node.handle(getMockRMNodeStatusEvent(containerStats));
assertEquals("expected one container to be completed",
1, completedContainers.size());
ContainerStatus cs = completedContainers.get(0);
assertEquals("first container not the one that completed",
cid1, cs.getContainerId());
assertEquals("completed container not marked complete",
ContainerState.COMPLETE, cs.getState());
assertEquals("completed container not marked aborted",
ContainerExitStatus.ABORTED, cs.getExitStatus());
Assert.assertTrue("completed container not marked missing",
cs.getDiagnostics().contains("not reported"));
assertEquals("unexpected number of running containers",
1, node.getLaunchedContainers().size());
Assert.assertTrue("second container not running",
node.getLaunchedContainers().contains(cid2));
}
} }