YARN-10467. ContainerIdPBImpl objects can be leaked in RMNodeImpl.completedContainers. Contributed by Haibo Chen
This commit is contained in:
parent
eb84793af1
commit
bab5bf9743
@ -179,6 +179,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||
private long launchAMEndTime = 0;
|
||||
private long scheduledTime = 0;
|
||||
private long containerAllocatedTime = 0;
|
||||
private boolean nonWorkPreservingAMContainerFinished = false;
|
||||
|
||||
// Set to null initially. Will eventually get set
|
||||
// if an RMAppAttemptUnregistrationEvent occurs
|
||||
@ -853,7 +854,7 @@ public List<ContainerStatus> pullJustFinishedContainers() {
|
||||
|
||||
// A new allocate means the AM received the previously sent
|
||||
// finishedContainers. We can ack this to NM now
|
||||
sendFinishedContainersToNM();
|
||||
sendFinishedContainersToNM(finishedContainersSentToAM);
|
||||
|
||||
// Mark every containerStatus as being sent to AM though we may return
|
||||
// only the ones that belong to the current attempt
|
||||
@ -1980,12 +1981,13 @@ private void sendFinishedAMContainerToNM(NodeId nodeId,
|
||||
}
|
||||
|
||||
// Ack NM to remove finished containers from context.
|
||||
private void sendFinishedContainersToNM() {
|
||||
for (NodeId nodeId : finishedContainersSentToAM.keySet()) {
|
||||
private void sendFinishedContainersToNM(
|
||||
Map<NodeId, List<ContainerStatus>> finishedContainers) {
|
||||
for (NodeId nodeId : finishedContainers.keySet()) {
|
||||
|
||||
// Clear and get current values
|
||||
List<ContainerStatus> currentSentContainers =
|
||||
finishedContainersSentToAM.put(nodeId, new ArrayList<>());
|
||||
finishedContainers.put(nodeId, new ArrayList<>());
|
||||
List<ContainerId> containerIdList =
|
||||
new ArrayList<>(currentSentContainers.size());
|
||||
for (ContainerStatus containerStatus : currentSentContainers) {
|
||||
@ -1994,7 +1996,7 @@ private void sendFinishedContainersToNM() {
|
||||
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
|
||||
containerIdList));
|
||||
}
|
||||
this.finishedContainersSentToAM.clear();
|
||||
finishedContainers.clear();
|
||||
}
|
||||
|
||||
// Add am container to the list so that am container instance will be
|
||||
@ -2020,7 +2022,16 @@ private static void amContainerFinished(RMAppAttemptImpl appAttempt,
|
||||
appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,
|
||||
new ArrayList<>());
|
||||
appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);
|
||||
appAttempt.sendFinishedContainersToNM();
|
||||
appAttempt.sendFinishedContainersToNM(
|
||||
appAttempt.finishedContainersSentToAM);
|
||||
// there might be some completed containers that have not been pulled
|
||||
// by the AM heartbeat, explicitly add them for cleanup.
|
||||
appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
|
||||
|
||||
// mark the fact that AM container has finished so that future finished
|
||||
// containers will be cleaned up without the engagement of AM containers
|
||||
// (through heartbeat)
|
||||
appAttempt.nonWorkPreservingAMContainerFinished = true;
|
||||
} else {
|
||||
appAttempt.sendFinishedAMContainerToNM(nodeId,
|
||||
containerStatus.getContainerId());
|
||||
@ -2048,6 +2059,11 @@ private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
|
||||
.getNodeId(), new ArrayList<>());
|
||||
appAttempt.justFinishedContainers.get(containerFinishedEvent
|
||||
.getNodeId()).add(containerFinishedEvent.getContainerStatus());
|
||||
|
||||
if (appAttempt.nonWorkPreservingAMContainerFinished) {
|
||||
// AM container has finished, so no more AM heartbeats to do the cleanup.
|
||||
appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class ContainerFinishedAtFinalStateTransition
|
||||
|
@ -643,6 +643,8 @@ private Container allocateApplicationAttempt() {
|
||||
RMContainer rmContainer = mock(RMContainerImpl.class);
|
||||
when(scheduler.getRMContainer(container.getId())).
|
||||
thenReturn(rmContainer);
|
||||
when(container.getNodeId()).thenReturn(
|
||||
BuilderUtils.newNodeId("localhost", 0));
|
||||
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
|
||||
@ -1530,6 +1532,119 @@ public void testFinishedContainer() {
|
||||
.handle(Mockito.any(RMNodeEvent.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check a completed container that is not yet pulled by AM heartbeat,
|
||||
* is ACKed to NM for cleanup when the AM container exits.
|
||||
*/
|
||||
@Test
|
||||
public void testFinishedContainerNotBeingPulledByAMHeartbeat() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||
|
||||
application.handle(new RMAppRunningOnNodeEvent(application
|
||||
.getApplicationId(), amContainer.getNodeId()));
|
||||
|
||||
// Complete a non-AM container
|
||||
ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
|
||||
.getAppAttemptId(), 2);
|
||||
Container container1 = mock(Container.class);
|
||||
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
|
||||
when(container1.getId()).thenReturn(
|
||||
containerId1);
|
||||
when(containerStatus1.getContainerId()).thenReturn(containerId1);
|
||||
when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
|
||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||
applicationAttempt.getAppAttemptId(), containerStatus1,
|
||||
container1.getNodeId()));
|
||||
|
||||
// Verify justFinishedContainers
|
||||
ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
|
||||
ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
|
||||
Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers()
|
||||
.size());
|
||||
Assert.assertEquals(container1.getId(), applicationAttempt
|
||||
.getJustFinishedContainers().get(0).getContainerId());
|
||||
Assert.assertTrue(
|
||||
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
|
||||
|
||||
// finish AM container to emulate AM exit event
|
||||
containerStatus1 = mock(ContainerStatus.class);
|
||||
ContainerId amContainerId = amContainer.getId();
|
||||
when(containerStatus1.getContainerId()).thenReturn(amContainerId);
|
||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||
applicationAttempt.getAppAttemptId(), containerStatus1,
|
||||
amContainer.getNodeId()));
|
||||
|
||||
Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
|
||||
List<RMNodeFinishedContainersPulledByAMEvent> containerPulledEvents =
|
||||
captor.getAllValues();
|
||||
// Verify AM container is acked to NM via the RMNodeEvent immediately
|
||||
Assert.assertEquals(amContainer.getId(),
|
||||
containerPulledEvents.get(0).getContainers().get(0));
|
||||
// Verify the non-AM container is acked to NM via the RMNodeEvent
|
||||
Assert.assertEquals(container1.getId(),
|
||||
containerPulledEvents.get(1).getContainers().get(0));
|
||||
Assert.assertTrue("No container shall be added to justFinishedContainers" +
|
||||
" as soon as AM container exits",
|
||||
applicationAttempt.getJustFinishedContainers().isEmpty());
|
||||
Assert.assertTrue(
|
||||
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
|
||||
}
|
||||
|
||||
/**
|
||||
* Check a completed container is ACKed to NM for cleanup after the AM
|
||||
* container has exited.
|
||||
*/
|
||||
@Test
|
||||
public void testFinishedContainerAfterAMExit() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||
|
||||
// finish AM container to emulate AM exit event
|
||||
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
|
||||
ContainerId amContainerId = amContainer.getId();
|
||||
when(containerStatus1.getContainerId()).thenReturn(amContainerId);
|
||||
application.handle(new RMAppRunningOnNodeEvent(application
|
||||
.getApplicationId(),
|
||||
amContainer.getNodeId()));
|
||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||
applicationAttempt.getAppAttemptId(), containerStatus1,
|
||||
amContainer.getNodeId()));
|
||||
|
||||
// Verify AM container is acked to NM via the RMNodeEvent immediately
|
||||
ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
|
||||
ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
|
||||
Mockito.verify(rmnodeEventHandler).handle(captor.capture());
|
||||
Assert.assertEquals(amContainer.getId(),
|
||||
captor.getValue().getContainers().get(0));
|
||||
|
||||
// Complete a non-AM container
|
||||
ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
|
||||
.getAppAttemptId(), 2);
|
||||
Container container1 = mock(Container.class);
|
||||
containerStatus1 = mock(ContainerStatus.class);
|
||||
when(container1.getId()).thenReturn(containerId1);
|
||||
when(containerStatus1.getContainerId()).thenReturn(containerId1);
|
||||
when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
|
||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||
applicationAttempt.getAppAttemptId(), containerStatus1,
|
||||
container1.getNodeId()));
|
||||
|
||||
// Verify container is acked to NM via the RMNodeEvent immediately
|
||||
captor = ArgumentCaptor.forClass(
|
||||
RMNodeFinishedContainersPulledByAMEvent.class);
|
||||
Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
|
||||
Assert.assertEquals(container1.getId(),
|
||||
captor.getAllValues().get(1).getContainers().get(0));
|
||||
Assert.assertTrue("No container shall be added to justFinishedContainers" +
|
||||
" after AM container exited",
|
||||
applicationAttempt.getJustFinishedContainers().isEmpty());
|
||||
Assert.assertTrue(
|
||||
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
|
||||
}
|
||||
|
||||
private static List<ContainerStatus> getFinishedContainersSentToAM(
|
||||
RMAppAttempt applicationAttempt) {
|
||||
List<ContainerStatus> containers = new ArrayList<ContainerStatus>();
|
||||
|
Loading…
Reference in New Issue
Block a user