diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ca89b32d6a..f789bcb1b3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1368,6 +1368,9 @@ Release 2.7.3 - UNRELEASED YARN-4581. AHS writer thread leak makes RM crash while RM is recovering. (sandflee via junping_du) + YARN-4610. Reservations continue looking for one app causes other apps to + starve (jlowe) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9c6d8ee4f6..9e64b42ff9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1152,9 +1152,11 @@ private Resource computeUserLimit(FiCaSchedulerApp application, @Private protected synchronized boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, - String nodePartition, ResourceLimits currentResoureLimits) { + String nodePartition, ResourceLimits currentResourceLimits) { User user = getUser(userName); + currentResourceLimits.setAmountNeededUnreserve(Resources.none()); + // Note: We aren't considering the current request since there is a fixed // overhead of the AM, but it's a > check, not a >= check, so... if (Resources @@ -1181,7 +1183,7 @@ protected synchronized boolean canAssignToUser(Resource clusterResource, Resources.subtract(user.getUsed(nodePartition), limit); // we can only acquire a new container if we unreserve first to // respect user-limit - currentResoureLimits.setAmountNeededUnreserve(amountNeededToUnreserve); + currentResourceLimits.setAmountNeededUnreserve(amountNeededToUnreserve); return true; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 49de4787aa..9b920d08a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -100,12 +100,17 @@ public void setUp() throws Exception { } private void setup(CapacitySchedulerConfiguration csConf) throws Exception { + setup(csConf, false); + } + + private void setup(CapacitySchedulerConfiguration csConf, + boolean addUserLimits) throws Exception { csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true); final String newRoot = "root" + System.currentTimeMillis(); // final String newRoot = "root"; - setupQueueConfiguration(csConf, newRoot); + setupQueueConfiguration(csConf, newRoot, addUserLimits); YarnConfiguration conf = new YarnConfiguration(); cs.setConf(conf); @@ -146,7 +151,7 @@ private void setup(CapacitySchedulerConfiguration csConf) throws Exception { private static final String A = "a"; private void setupQueueConfiguration(CapacitySchedulerConfiguration conf, - final String newRoot) { + final String newRoot, boolean addUserLimits) { // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, @@ -167,6 +172,10 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf, conf.setMaximumCapacity(Q_A, 100); conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*"); + if (addUserLimits) { + conf.setUserLimit(Q_A, 25); + conf.setUserLimitFactor(Q_A, 0.25f); + } } static LeafQueue stubLeafQueue(LeafQueue queue) { @@ -334,6 +343,140 @@ public void testReservation() throws Exception { assertEquals(0, app_0.getTotalRequiredResources(priorityReduce)); } + // Test that hitting a reservation limit and needing to unreserve + // does not affect assigning containers for other users + @Test + public void testReservationLimitOtherUsers() throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf, true); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); + app_0 = spy(app_0); + Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class), + any(String.class)); + rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class)); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_1, a, + mock(ActiveUsersManager.class), spyRMContext); + app_1 = spy(app_1); + Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class), + any(String.class)); + rmContext.getRMApps().put(app_1.getApplicationId(), mock(RMApp.class)); + + a.submitApplicationAttempt(app_1, user_1); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + cs.getAllNodes().put(node_0.getNodeID(), node_0); + cs.getAllNodes().put(node_1.getNodeID(), node_1); + cs.getAllNodes().put(node_2.getNodeID(), node_2); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_1.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(22 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(4 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(2 * GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(4 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(20 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(2 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Add a few requests to each app + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true, + priorityMap, recordFactory))); + app_1.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 2, true, + priorityMap, recordFactory))); + + // add a reservation for app_0 + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(12 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(2 * GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(8 * GB, a.getMetrics().getReservedMB()); + assertEquals(4 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(12 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(2 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // next assignment is beyond user limit for user_0 but it should assign to + // app_1 for user_1 + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(14 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(4 * GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(8 * GB, a.getMetrics().getReservedMB()); + assertEquals(6 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(10 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(4 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + } + @Test public void testReservationNoContinueLook() throws Exception { // Test that with reservations-continue-look-all-nodes feature off