YARN-9819. Make TestOpportunisticContainerAllocatorAMService more resilient. Contribued by Abhishek Modi

This commit is contained in:
Abhishek Modi 2019-09-12 08:09:57 +05:30
parent f537410563
commit 3b06f0bf9e
2 changed files with 124 additions and 142 deletions

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
@ -233,6 +234,13 @@ public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
return nodeHeartbeat(conts, isHealthy, responseId);
}
/**
* Sends the heartbeat of the node.
* @param isHealthy whether node is healthy.
* @param resId response id.
* @return response of the heartbeat.
* @throws Exception
*/
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
ArrayList<ContainerStatus> updatedStats = new ArrayList<ContainerStatus>();
@ -243,15 +251,62 @@ public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
isHealthy, resId);
}
/**
* Sends the heartbeat of the node.
* @param updatedStats containers with updated status.
* @param isHealthy whether node is healthy.
* @return response of the heartbeat.
* @throws Exception
*/
public NodeHeartbeatResponse nodeHeartbeat(
List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception {
return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
isHealthy, responseId);
}
public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
List<Container> increasedConts, boolean isHealthy, int resId)
/**
* Sends the heartbeat of the node.
* @param oppContainersStatus opportunistic containers status.
* @param isHealthy whether node is healthy.
* @return response of the heartbeat.
* @throws Exception
*/
public NodeHeartbeatResponse nodeHeartbeat(
OpportunisticContainersStatus oppContainersStatus, boolean isHealthy)
throws Exception {
return nodeHeartbeat(Collections.emptyList(),
Collections.emptyList(), isHealthy, responseId, oppContainersStatus);
}
/**
* Sends the heartbeat of the node.
* @param updatedStats containers with updated status.
* @param increasedConts containers whose resource has been increased.
* @param isHealthy whether node is healthy.
* @param resId response id.
* @return response of the heartbeat.
* @throws Exception
*/
public NodeHeartbeatResponse nodeHeartbeat(
List<ContainerStatus> updatedStats, List<Container> increasedConts,
boolean isHealthy, int resId) throws Exception {
return nodeHeartbeat(updatedStats, increasedConts,
isHealthy, resId, null);
}
/**
* Sends the heartbeat of the node.
* @param updatedStats containers with updated status.
* @param increasedConts containers whose resource has been increased.
* @param isHealthy whether node is healthy.
* @param resId response id.
* @param oppContainersStatus opportunistic containers status.
* @return response of the heartbeat.
* @throws Exception
*/
public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
List<Container> increasedConts, boolean isHealthy, int resId,
OpportunisticContainersStatus oppContainersStatus) throws Exception {
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus status = Records.newRecord(NodeStatus.class);
status.setResponseId(resId);
@ -269,6 +324,7 @@ public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
containerStats.remove(cid);
}
status.setIncreasedContainers(increasedConts);
status.setOpportunisticContainersStatus(oppContainersStatus);
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
healthStatus.setHealthReport("");
healthStatus.setIsNodeHealthy(isHealthy);

View File

@ -79,7 +79,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@ -90,9 +89,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -103,7 +100,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -122,6 +118,9 @@ public class TestOpportunisticContainerAllocatorAMService {
private MockRM rm;
private DrainDispatcher dispatcher;
private OpportunisticContainersStatus oppContainersStatus =
getOpportunisticStatus();
@Before
public void createAndStartRM() {
CapacitySchedulerConfiguration csConf =
@ -184,38 +183,24 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception
nm3.registerNode();
nm4.registerNode();
nm1.nodeHeartbeat(oppContainersStatus, true);
nm2.nodeHeartbeat(oppContainersStatus, true);
nm3.nodeHeartbeat(oppContainersStatus, true);
nm4.nodeHeartbeat(oppContainersStatus, true);
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId());
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
amservice.handle(new NodeAddedSchedulerEvent(rmNode3));
amservice.handle(new NodeAddedSchedulerEvent(rmNode4));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode3));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode4));
// All nodes 1 - 4 will be applicable for scheduling.
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
nm1.nodeHeartbeat(oppContainersStatus, true);
nm2.nodeHeartbeat(oppContainersStatus, true);
nm3.nodeHeartbeat(oppContainersStatus, true);
nm4.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
amservice.getLeastLoadedNodes().size() == 4, 10, 10 * 100);
@ -253,7 +238,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception
container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED)));
// Node on same host should not result in allocation
sameHostDiffNode.nodeHeartbeat(true);
sameHostDiffNode.nodeHeartbeat(oppContainersStatus, true);
rm.drainEvents();
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
@ -296,7 +281,7 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception
.getUpdateContainerRequest().getContainerId());
// Ensure after correct node heartbeats, we should get the allocation
allocNode.nodeHeartbeat(true);
allocNode.nodeHeartbeat(oppContainersStatus, true);
rm.drainEvents();
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
@ -310,10 +295,10 @@ public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception
// Allocated cores+mem should have increased, available should decrease
verifyMetrics(metrics, 14336, 14, 2048, 2, 2);
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
nm1.nodeHeartbeat(oppContainersStatus, true);
nm2.nodeHeartbeat(oppContainersStatus, true);
nm3.nodeHeartbeat(oppContainersStatus, true);
nm4.nodeHeartbeat(oppContainersStatus, true);
rm.drainEvents();
// Verify that the container is still in ACQUIRED state wrt the RM.
@ -352,36 +337,20 @@ public void testContainerPromoteAfterContainerStart() throws Exception {
nm1.registerNode();
nm2.registerNode();
nm1.nodeHeartbeat(oppContainersStatus, true);
nm2.nodeHeartbeat(oppContainersStatus, true);
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode2)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
.getApplicationAttempt(attemptId).getOpportunisticContainerContext();
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
// All nodes 1 to 2 will be applicable for scheduling.
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm1.nodeHeartbeat(oppContainersStatus, true);
nm2.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
@ -478,36 +447,21 @@ public void testContainerPromoteAfterContainerComplete() throws Exception {
nm1.registerNode();
nm2.registerNode();
nm1.nodeHeartbeat(oppContainersStatus, true);
nm2.nodeHeartbeat(oppContainersStatus, true);
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode2)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
.getApplicationAttempt(attemptId).getOpportunisticContainerContext();
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
// All nodes 1 to 2 will be applicable for scheduling.
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm1.nodeHeartbeat(oppContainersStatus, true);
nm2.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
@ -591,30 +545,17 @@ public void testContainerAutoUpdateContainer() throws Exception {
createAndStartRMWithAutoUpdateContainer();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nm1.registerNode();
nm1.nodeHeartbeat(oppContainersStatus, true);
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
nm1.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt =
((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
.getOpportunisticContainerContext();
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
nm1.nodeHeartbeat(true);
nm1.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
amservice.getLeastLoadedNodes().size() == 1, 10, 10 * 100);
@ -713,7 +654,7 @@ public void testContainerAutoUpdateContainer() throws Exception {
Assert.assertEquals(Resource.newInstance(1 * GB, 1),
response.getContainersToUpdate().get(0).getResource());
nm1.nodeHeartbeat(true);
nm1.nodeHeartbeat(oppContainersStatus, true);
// DEMOTE the container
allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
UpdateContainerRequest.newInstance(3, container.getId(),
@ -735,7 +676,7 @@ public void testContainerAutoUpdateContainer() throws Exception {
uc.getContainer().getExecutionType());
// Check that the container is updated in NM through NM heartbeat response
if (response.getContainersToUpdate().size() == 0) {
response = nm1.nodeHeartbeat(true);
response = nm1.nodeHeartbeat(oppContainersStatus, true);
}
Assert.assertEquals(1, response.getContainersToUpdate().size());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
@ -761,6 +702,10 @@ public void testOpportunisticSchedulerMetrics() throws Exception {
nodes.put(nm2.getNodeId(), nm2);
nm1.registerNode();
nm2.registerNode();
nm1.nodeHeartbeat(oppContainersStatus, true);
nm2.nodeHeartbeat(oppContainersStatus, true);
OpportunisticSchedulerMetrics metrics =
OpportunisticSchedulerMetrics.getMetrics();
@ -777,28 +722,10 @@ public void testOpportunisticSchedulerMetrics() throws Exception {
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode2)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
.getApplicationAttempt(attemptId).getOpportunisticContainerContext();
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
// All nodes 1 to 2 will be applicable for scheduling.
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm1.nodeHeartbeat(oppContainersStatus, true);
nm2.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
@ -890,6 +817,10 @@ public void testNodeRemovalDuringAllocate() throws Exception {
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
nm1.nodeHeartbeat(oppContainersStatus, true);
nm2.nodeHeartbeat(oppContainersStatus, true);
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
@ -900,20 +831,14 @@ public void testNodeRemovalDuringAllocate() throws Exception {
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
((RMNodeImpl) rmNode2)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
.getApplicationAttempt(attemptId).getOpportunisticContainerContext();
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
// Both node 1 and node 2 will be applicable for scheduling.
nm1.nodeHeartbeat(oppContainersStatus, true);
nm2.nodeHeartbeat(oppContainersStatus, true);
for (int i = 0; i < 10; i++) {
am1.allocate(
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
@ -948,10 +873,10 @@ public void testNodeRemovalDuringAllocate() throws Exception {
@Test(timeout = 60000)
public void testAppAttemptRemovalAfterNodeRemoval() throws Exception {
MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
nm.registerNode();
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
nm.nodeHeartbeat(oppContainersStatus, true);
RMApp app = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app.getCurrentAppAttempt().getAppAttemptId();
@ -960,12 +885,8 @@ public void testAppAttemptRemovalAfterNodeRemoval() throws Exception {
SchedulerApplicationAttempt schedulerAttempt =
((CapacityScheduler)scheduler).getApplicationAttempt(attemptId);
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm.getNodeId());
nm.nodeHeartbeat(true);
((RMNodeImpl) rmNode1)
.setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
// Send add and update node events to AM Service.
amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
nm.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
scheduler.getNumClusterNodes() == 1, 10, 200 * 100);
@ -1000,13 +921,18 @@ public void testAppAttemptRemovalAfterNodeRemoval() throws Exception {
RMAppAttemptState.FAILED, false));
}
private OpportunisticContainersStatus getOpportunisticStatus() {
return getOppurtunisticStatus(-1, 100, 1000);
}
private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime,
int queueLength) {
OpportunisticContainersStatus status1 =
Mockito.mock(OpportunisticContainersStatus.class);
Mockito.when(status1.getEstimatedQueueWaitTime()).thenReturn(waitTime);
Mockito.when(status1.getWaitQueueLength()).thenReturn(queueLength);
return status1;
int queueLength, int queueCapacity) {
OpportunisticContainersStatus status =
OpportunisticContainersStatus.newInstance();
status.setEstimatedQueueWaitTime(waitTime);
status.setOpportQueueCapacity(queueCapacity);
status.setWaitQueueLength(queueLength);
return status;
}
// Test if the OpportunisticContainerAllocatorAMService can handle both