YARN-6597. Add RMContainer recovery test to verify tag population in the AllocationTagsManager. (Panagiotis Garefalakis via asuresh)
This commit is contained in:
parent
f8c5f5b237
commit
add993e26a
@ -20,7 +20,6 @@
|
|||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
@ -40,7 +39,6 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
@ -533,7 +531,7 @@ public RMContainerState transition(RMContainerImpl container,
|
|||||||
RMContainerEvent event) {
|
RMContainerEvent event) {
|
||||||
NMContainerStatus report =
|
NMContainerStatus report =
|
||||||
((RMContainerRecoverEvent) event).getContainerReport();
|
((RMContainerRecoverEvent) event).getContainerReport();
|
||||||
// Set the allocation tags from the
|
// Set the allocation tags from the NMContainerStatus
|
||||||
container.setAllocationTags(report.getAllocationTags());
|
container.setAllocationTags(report.getAllocationTags());
|
||||||
// Notify AllocationTagsManager
|
// Notify AllocationTagsManager
|
||||||
container.rmContext.getAllocationTagsManager().addContainer(
|
container.rmContext.getAllocationTagsManager().addContainer(
|
||||||
@ -689,7 +687,7 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
|
|||||||
// Something wrong happened, kill the container
|
// Something wrong happened, kill the container
|
||||||
LOG.warn("Something wrong happened, container size reported by NM"
|
LOG.warn("Something wrong happened, container size reported by NM"
|
||||||
+ " is not expected, ContainerID=" + container.getContainerId()
|
+ " is not expected, ContainerID=" + container.getContainerId()
|
||||||
+ " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:"
|
+ " rm-size-resource:" + rmContainerResource + " nm-size-resource:"
|
||||||
+ nmContainerResource);
|
+ nmContainerResource);
|
||||||
container.eventHandler.handle(new RMNodeCleanContainerEvent(
|
container.eventHandler.handle(new RMNodeCleanContainerEvent(
|
||||||
container.nodeId, container.getContainerId()));
|
container.nodeId, container.getContainerId()));
|
||||||
@ -702,7 +700,7 @@ private static class FinishedTransition extends BaseTransition {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
||||||
// Notify placementManager
|
// Notify AllocationTagsManager
|
||||||
container.rmContext.getAllocationTagsManager().removeContainer(
|
container.rmContext.getAllocationTagsManager().removeContainer(
|
||||||
container.getNodeId(), container.getContainerId(),
|
container.getNodeId(), container.getContainerId(),
|
||||||
container.getAllocationTags());
|
container.getAllocationTags());
|
||||||
|
@ -49,6 +49,7 @@
|
|||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
@ -171,7 +172,7 @@ public void testReleaseWhileRunning() {
|
|||||||
assertEquals(containerStatus, cfEvent.getContainerStatus());
|
assertEquals(containerStatus, cfEvent.getContainerStatus());
|
||||||
assertEquals(RMAppAttemptEventType.CONTAINER_FINISHED, cfEvent.getType());
|
assertEquals(RMAppAttemptEventType.CONTAINER_FINISHED, cfEvent.getType());
|
||||||
|
|
||||||
// In RELEASED state. A FINIHSED event may come in.
|
// In RELEASED state. A FINISHED event may come in.
|
||||||
rmContainer.handle(new RMContainerFinishedEvent(containerId, SchedulerUtils
|
rmContainer.handle(new RMContainerFinishedEvent(containerId, SchedulerUtils
|
||||||
.createAbnormalContainerStatus(containerId, "FinishedContainer"),
|
.createAbnormalContainerStatus(containerId, "FinishedContainer"),
|
||||||
RMContainerEventType.FINISHED));
|
RMContainerEventType.FINISHED));
|
||||||
@ -375,7 +376,7 @@ public void testStoreOnlyAMContainerMetrics() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testContainerTransitionNotifyPlacementTagsManager()
|
public void testContainerTransitionNotifyAllocationTagsManager()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
DrainDispatcher drainDispatcher = new DrainDispatcher();
|
DrainDispatcher drainDispatcher = new DrainDispatcher();
|
||||||
EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(
|
EventHandler<RMAppAttemptEvent> appAttemptEventHandler = mock(
|
||||||
@ -494,5 +495,25 @@ public void testContainerTransitionNotifyPlacementTagsManager()
|
|||||||
|
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
||||||
|
|
||||||
|
/* Fourth container: NEW -> RECOVERED */
|
||||||
|
rmContainer = new RMContainerImpl(container,
|
||||||
|
SchedulerRequestKey.extractFrom(container), appAttemptId, nodeId,
|
||||||
|
"user", rmContext);
|
||||||
|
rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
|
||||||
|
|
||||||
|
Assert.assertEquals(0,
|
||||||
|
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
||||||
|
|
||||||
|
NMContainerStatus containerStatus = NMContainerStatus
|
||||||
|
.newInstance(containerId, 0, ContainerState.NEW,
|
||||||
|
Resource.newInstance(1024, 1), "recover container", 0,
|
||||||
|
Priority.newInstance(0), 0);
|
||||||
|
containerStatus.setAllocationTags(ImmutableSet.of("mapper"));
|
||||||
|
rmContainer
|
||||||
|
.handle(new RMContainerRecoverEvent(containerId, containerStatus));
|
||||||
|
|
||||||
|
Assert.assertEquals(1,
|
||||||
|
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user