diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 1af930f7fc..efa76bcf92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -243,13 +243,13 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception null, ExecutionType.GUARANTEED))); // Node on same host should not result in allocation sameHostDiffNode.nodeHeartbeat(true); - Thread.sleep(200); + rm.drainEvents(); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size()); // Wait for scheduler to process all events dispatcher.waitForEventThreadToWait(); - Thread.sleep(1000); + rm.drainEvents(); // Verify Metrics After OPP allocation (Nothing should change again) verifyMetrics(metrics, 15360, 15, 1024, 1, 1); @@ -286,7 +286,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception // Ensure after correct node heartbeats, we should get the allocation allocNode.nodeHeartbeat(true); - Thread.sleep(200); + rm.drainEvents(); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); Container uc = @@ -303,7 +303,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception nm2.nodeHeartbeat(true); nm3.nodeHeartbeat(true); nm4.nodeHeartbeat(true); - Thread.sleep(200); + rm.drainEvents(); // Verify that the container is still in ACQUIRED state wrt the RM. RMContainer rmContainer = ((CapacityScheduler) scheduler) @@ -325,6 +325,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception // Wait for scheduler to finish processing events dispatcher.waitForEventThreadToWait(); + rm.drainEvents(); // Verify Metrics After OPP allocation : // Everything should have reverted to what it was verifyMetrics(metrics, 15360, 15, 1024, 1, 1); @@ -396,7 +397,7 @@ public void testContainerPromoteAfterContainerStart() throws Exception { ContainerStatus.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), true); - Thread.sleep(200); + rm.drainEvents(); // Verify that container is actually running wrt the RM.. RMContainer rmContainer = ((CapacityScheduler) scheduler) @@ -434,7 +435,7 @@ public void testContainerPromoteAfterContainerStart() throws Exception { ContainerStatus.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), true); - Thread.sleep(200); + rm.drainEvents(); allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); @@ -521,7 +522,7 @@ public void testContainerPromoteAfterContainerComplete() throws Exception { ContainerStatus.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), true); - Thread.sleep(200); + rm.drainEvents(); // Verify that container is actually running wrt the RM.. RMContainer rmContainer = ((CapacityScheduler) scheduler) @@ -535,7 +536,7 @@ public void testContainerPromoteAfterContainerComplete() throws Exception { ContainerStatus.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)), true); - Thread.sleep(200); + rm.drainEvents(); // Verify that container has been removed.. rmContainer = ((CapacityScheduler) scheduler) @@ -618,7 +619,7 @@ public void testContainerAutoUpdateContainer() throws Exception { nm1.nodeHeartbeat(Arrays.asList(ContainerStatus .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), true); - Thread.sleep(200); + rm.drainEvents(); // Verify that container is actually running wrt the RM.. RMContainer rmContainer = ((CapacityScheduler) scheduler) @@ -636,7 +637,7 @@ public void testContainerAutoUpdateContainer() throws Exception { nm1.nodeHeartbeat(Arrays.asList(ContainerStatus .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), true); - Thread.sleep(200); + rm.drainEvents(); // Get the update response on next allocate allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); // Check the update response from YARNRM @@ -662,7 +663,7 @@ public void testContainerAutoUpdateContainer() throws Exception { .newInstance(container.getId(), ExecutionType.GUARANTEED, ContainerState.RUNNING, "", 0)), true); - Thread.sleep(200); + rm.drainEvents(); if (allocateResponse.getUpdatedContainers().size() == 0) { allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>()); } @@ -671,7 +672,7 @@ public void testContainerAutoUpdateContainer() throws Exception { Assert.assertEquals(container.getId(), uc.getContainer().getId()); Assert.assertEquals(Resource.newInstance(2 * GB, 1), uc.getContainer().getResource()); - Thread.sleep(1000); + rm.drainEvents(); // Check that the container resources are increased in // NM through NM heartbeat response @@ -688,7 +689,7 @@ public void testContainerAutoUpdateContainer() throws Exception { ContainerUpdateType.DECREASE_RESOURCE, Resources.createResource(1 * GB, 1), null))); Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size()); - Thread.sleep(1000); + rm.drainEvents(); // Check that the container resources are decreased // in NM through NM heartbeat response @@ -707,7 +708,7 @@ public void testContainerAutoUpdateContainer() throws Exception { response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus .newInstance(container.getId(), ExecutionType.GUARANTEED, ContainerState.RUNNING, "", 0)), true); - Thread.sleep(200); + rm.drainEvents(); if (allocateResponse.getUpdatedContainers().size() == 0) { // Get the update response on next allocate allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());