YARN-8011. TestOpportunisticContainerAllocatorAMService#testContainerPromoteAndDemoteBeforeContainerStart fails intermittently. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2018-03-08 18:13:36 +08:00
parent 4cc9a6d9bb
commit b451889e8e

View File

@ -243,13 +243,13 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception
null, ExecutionType.GUARANTEED))); null, ExecutionType.GUARANTEED)));
// Node on same host should not result in allocation // Node on same host should not result in allocation
sameHostDiffNode.nodeHeartbeat(true); sameHostDiffNode.nodeHeartbeat(true);
Thread.sleep(200); rm.drainEvents();
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
// Wait for scheduler to process all events // Wait for scheduler to process all events
dispatcher.waitForEventThreadToWait(); dispatcher.waitForEventThreadToWait();
Thread.sleep(1000); rm.drainEvents();
// Verify Metrics After OPP allocation (Nothing should change again) // Verify Metrics After OPP allocation (Nothing should change again)
verifyMetrics(metrics, 15360, 15, 1024, 1, 1); verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
@ -286,7 +286,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception
// Ensure after correct node heartbeats, we should get the allocation // Ensure after correct node heartbeats, we should get the allocation
allocNode.nodeHeartbeat(true); allocNode.nodeHeartbeat(true);
Thread.sleep(200); rm.drainEvents();
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
Container uc = Container uc =
@ -303,7 +303,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception
nm2.nodeHeartbeat(true); nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true); nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true); nm4.nodeHeartbeat(true);
Thread.sleep(200); rm.drainEvents();
// Verify that the container is still in ACQUIRED state wrt the RM. // Verify that the container is still in ACQUIRED state wrt the RM.
RMContainer rmContainer = ((CapacityScheduler) scheduler) RMContainer rmContainer = ((CapacityScheduler) scheduler)
@ -325,6 +325,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception
// Wait for scheduler to finish processing events // Wait for scheduler to finish processing events
dispatcher.waitForEventThreadToWait(); dispatcher.waitForEventThreadToWait();
rm.drainEvents();
// Verify Metrics After OPP allocation : // Verify Metrics After OPP allocation :
// Everything should have reverted to what it was // Everything should have reverted to what it was
verifyMetrics(metrics, 15360, 15, 1024, 1, 1); verifyMetrics(metrics, 15360, 15, 1024, 1, 1);
@ -396,7 +397,7 @@ public void testContainerPromoteAfterContainerStart() throws Exception {
ContainerStatus.newInstance(container.getId(), ContainerStatus.newInstance(container.getId(),
ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
true); true);
Thread.sleep(200); rm.drainEvents();
// Verify that container is actually running wrt the RM.. // Verify that container is actually running wrt the RM..
RMContainer rmContainer = ((CapacityScheduler) scheduler) RMContainer rmContainer = ((CapacityScheduler) scheduler)
@ -434,7 +435,7 @@ public void testContainerPromoteAfterContainerStart() throws Exception {
ContainerStatus.newInstance(container.getId(), ContainerStatus.newInstance(container.getId(),
ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
true); true);
Thread.sleep(200); rm.drainEvents();
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
@ -521,7 +522,7 @@ public void testContainerPromoteAfterContainerComplete() throws Exception {
ContainerStatus.newInstance(container.getId(), ContainerStatus.newInstance(container.getId(),
ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)),
true); true);
Thread.sleep(200); rm.drainEvents();
// Verify that container is actually running wrt the RM.. // Verify that container is actually running wrt the RM..
RMContainer rmContainer = ((CapacityScheduler) scheduler) RMContainer rmContainer = ((CapacityScheduler) scheduler)
@ -535,7 +536,7 @@ public void testContainerPromoteAfterContainerComplete() throws Exception {
ContainerStatus.newInstance(container.getId(), ContainerStatus.newInstance(container.getId(),
ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)), ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)),
true); true);
Thread.sleep(200); rm.drainEvents();
// Verify that container has been removed.. // Verify that container has been removed..
rmContainer = ((CapacityScheduler) scheduler) rmContainer = ((CapacityScheduler) scheduler)
@ -618,7 +619,7 @@ public void testContainerAutoUpdateContainer() throws Exception {
nm1.nodeHeartbeat(Arrays.asList(ContainerStatus nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC,
ContainerState.RUNNING, "", 0)), true); ContainerState.RUNNING, "", 0)), true);
Thread.sleep(200); rm.drainEvents();
// Verify that container is actually running wrt the RM.. // Verify that container is actually running wrt the RM..
RMContainer rmContainer = ((CapacityScheduler) scheduler) RMContainer rmContainer = ((CapacityScheduler) scheduler)
@ -636,7 +637,7 @@ public void testContainerAutoUpdateContainer() throws Exception {
nm1.nodeHeartbeat(Arrays.asList(ContainerStatus nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC,
ContainerState.RUNNING, "", 0)), true); ContainerState.RUNNING, "", 0)), true);
Thread.sleep(200); rm.drainEvents();
// Get the update response on next allocate // Get the update response on next allocate
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
// Check the update response from YARNRM // Check the update response from YARNRM
@ -662,7 +663,7 @@ public void testContainerAutoUpdateContainer() throws Exception {
.newInstance(container.getId(), ExecutionType.GUARANTEED, .newInstance(container.getId(), ExecutionType.GUARANTEED,
ContainerState.RUNNING, "", 0)), true); ContainerState.RUNNING, "", 0)), true);
Thread.sleep(200); rm.drainEvents();
if (allocateResponse.getUpdatedContainers().size() == 0) { if (allocateResponse.getUpdatedContainers().size() == 0) {
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
} }
@ -671,7 +672,7 @@ public void testContainerAutoUpdateContainer() throws Exception {
Assert.assertEquals(container.getId(), uc.getContainer().getId()); Assert.assertEquals(container.getId(), uc.getContainer().getId());
Assert.assertEquals(Resource.newInstance(2 * GB, 1), Assert.assertEquals(Resource.newInstance(2 * GB, 1),
uc.getContainer().getResource()); uc.getContainer().getResource());
Thread.sleep(1000); rm.drainEvents();
// Check that the container resources are increased in // Check that the container resources are increased in
// NM through NM heartbeat response // NM through NM heartbeat response
@ -688,7 +689,7 @@ public void testContainerAutoUpdateContainer() throws Exception {
ContainerUpdateType.DECREASE_RESOURCE, ContainerUpdateType.DECREASE_RESOURCE,
Resources.createResource(1 * GB, 1), null))); Resources.createResource(1 * GB, 1), null)));
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
Thread.sleep(1000); rm.drainEvents();
// Check that the container resources are decreased // Check that the container resources are decreased
// in NM through NM heartbeat response // in NM through NM heartbeat response
@ -707,7 +708,7 @@ public void testContainerAutoUpdateContainer() throws Exception {
response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
.newInstance(container.getId(), ExecutionType.GUARANTEED, .newInstance(container.getId(), ExecutionType.GUARANTEED,
ContainerState.RUNNING, "", 0)), true); ContainerState.RUNNING, "", 0)), true);
Thread.sleep(200); rm.drainEvents();
if (allocateResponse.getUpdatedContainers().size() == 0) { if (allocateResponse.getUpdatedContainers().size() == 0) {
// Get the update response on next allocate // Get the update response on next allocate
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());