YARN-4610. Reservations continue looking for one app causes other apps to starve. Contributed by Jason Lowe
This commit is contained in:
parent
1bb31fb22e
commit
468a53b22f
@ -1368,6 +1368,9 @@ Release 2.7.3 - UNRELEASED
|
|||||||
YARN-4581. AHS writer thread leak makes RM crash while RM is recovering.
|
YARN-4581. AHS writer thread leak makes RM crash while RM is recovering.
|
||||||
(sandflee via junping_du)
|
(sandflee via junping_du)
|
||||||
|
|
||||||
|
YARN-4610. Reservations continue looking for one app causes other apps to
|
||||||
|
starve (jlowe)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -1152,9 +1152,11 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
|
|||||||
@Private
|
@Private
|
||||||
protected synchronized boolean canAssignToUser(Resource clusterResource,
|
protected synchronized boolean canAssignToUser(Resource clusterResource,
|
||||||
String userName, Resource limit, FiCaSchedulerApp application,
|
String userName, Resource limit, FiCaSchedulerApp application,
|
||||||
String nodePartition, ResourceLimits currentResoureLimits) {
|
String nodePartition, ResourceLimits currentResourceLimits) {
|
||||||
User user = getUser(userName);
|
User user = getUser(userName);
|
||||||
|
|
||||||
|
currentResourceLimits.setAmountNeededUnreserve(Resources.none());
|
||||||
|
|
||||||
// Note: We aren't considering the current request since there is a fixed
|
// Note: We aren't considering the current request since there is a fixed
|
||||||
// overhead of the AM, but it's a > check, not a >= check, so...
|
// overhead of the AM, but it's a > check, not a >= check, so...
|
||||||
if (Resources
|
if (Resources
|
||||||
@ -1181,7 +1183,7 @@ protected synchronized boolean canAssignToUser(Resource clusterResource,
|
|||||||
Resources.subtract(user.getUsed(nodePartition), limit);
|
Resources.subtract(user.getUsed(nodePartition), limit);
|
||||||
// we can only acquire a new container if we unreserve first to
|
// we can only acquire a new container if we unreserve first to
|
||||||
// respect user-limit
|
// respect user-limit
|
||||||
currentResoureLimits.setAmountNeededUnreserve(amountNeededToUnreserve);
|
currentResourceLimits.setAmountNeededUnreserve(amountNeededToUnreserve);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -100,12 +100,17 @@ public void setUp() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void setup(CapacitySchedulerConfiguration csConf) throws Exception {
|
private void setup(CapacitySchedulerConfiguration csConf) throws Exception {
|
||||||
|
setup(csConf, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setup(CapacitySchedulerConfiguration csConf,
|
||||||
|
boolean addUserLimits) throws Exception {
|
||||||
|
|
||||||
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
|
csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true);
|
||||||
final String newRoot = "root" + System.currentTimeMillis();
|
final String newRoot = "root" + System.currentTimeMillis();
|
||||||
// final String newRoot = "root";
|
// final String newRoot = "root";
|
||||||
|
|
||||||
setupQueueConfiguration(csConf, newRoot);
|
setupQueueConfiguration(csConf, newRoot, addUserLimits);
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
cs.setConf(conf);
|
cs.setConf(conf);
|
||||||
|
|
||||||
@ -146,7 +151,7 @@ private void setup(CapacitySchedulerConfiguration csConf) throws Exception {
|
|||||||
private static final String A = "a";
|
private static final String A = "a";
|
||||||
|
|
||||||
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf,
|
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf,
|
||||||
final String newRoot) {
|
final String newRoot, boolean addUserLimits) {
|
||||||
|
|
||||||
// Define top-level queues
|
// Define top-level queues
|
||||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
@ -167,6 +172,10 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf,
|
|||||||
conf.setMaximumCapacity(Q_A, 100);
|
conf.setMaximumCapacity(Q_A, 100);
|
||||||
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
|
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
|
||||||
|
|
||||||
|
if (addUserLimits) {
|
||||||
|
conf.setUserLimit(Q_A, 25);
|
||||||
|
conf.setUserLimitFactor(Q_A, 0.25f);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static LeafQueue stubLeafQueue(LeafQueue queue) {
|
static LeafQueue stubLeafQueue(LeafQueue queue) {
|
||||||
@ -334,6 +343,140 @@ public void testReservation() throws Exception {
|
|||||||
assertEquals(0, app_0.getTotalRequiredResources(priorityReduce));
|
assertEquals(0, app_0.getTotalRequiredResources(priorityReduce));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that hitting a reservation limit and needing to unreserve
|
||||||
|
// does not affect assigning containers for other users
|
||||||
|
@Test
|
||||||
|
public void testReservationLimitOtherUsers() throws Exception {
|
||||||
|
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
|
||||||
|
setup(csConf, true);
|
||||||
|
|
||||||
|
// Manipulate queue 'a'
|
||||||
|
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
|
||||||
|
|
||||||
|
// Users
|
||||||
|
final String user_0 = "user_0";
|
||||||
|
final String user_1 = "user_1";
|
||||||
|
|
||||||
|
// Submit applications
|
||||||
|
final ApplicationAttemptId appAttemptId_0 = TestUtils
|
||||||
|
.getMockApplicationAttemptId(0, 0);
|
||||||
|
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||||
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_0 = spy(app_0);
|
||||||
|
Mockito.doNothing().when(app_0).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
|
rmContext.getRMApps().put(app_0.getApplicationId(), mock(RMApp.class));
|
||||||
|
|
||||||
|
a.submitApplicationAttempt(app_0, user_0);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_1 = TestUtils
|
||||||
|
.getMockApplicationAttemptId(1, 0);
|
||||||
|
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_1, a,
|
||||||
|
mock(ActiveUsersManager.class), spyRMContext);
|
||||||
|
app_1 = spy(app_1);
|
||||||
|
Mockito.doNothing().when(app_1).updateAMContainerDiagnostics(any(AMState.class),
|
||||||
|
any(String.class));
|
||||||
|
rmContext.getRMApps().put(app_1.getApplicationId(), mock(RMApp.class));
|
||||||
|
|
||||||
|
a.submitApplicationAttempt(app_1, user_1);
|
||||||
|
|
||||||
|
// Setup some nodes
|
||||||
|
String host_0 = "host_0";
|
||||||
|
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
|
||||||
|
8 * GB);
|
||||||
|
String host_1 = "host_1";
|
||||||
|
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
|
||||||
|
8 * GB);
|
||||||
|
String host_2 = "host_2";
|
||||||
|
FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0,
|
||||||
|
8 * GB);
|
||||||
|
|
||||||
|
when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
|
||||||
|
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
|
||||||
|
when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2);
|
||||||
|
|
||||||
|
cs.getAllNodes().put(node_0.getNodeID(), node_0);
|
||||||
|
cs.getAllNodes().put(node_1.getNodeID(), node_1);
|
||||||
|
cs.getAllNodes().put(node_2.getNodeID(), node_2);
|
||||||
|
|
||||||
|
final int numNodes = 3;
|
||||||
|
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
// Setup resource-requests
|
||||||
|
Priority priorityAM = TestUtils.createMockPriority(1);
|
||||||
|
Priority priorityMap = TestUtils.createMockPriority(5);
|
||||||
|
Priority priorityReduce = TestUtils.createMockPriority(10);
|
||||||
|
|
||||||
|
app_0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||||
|
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
|
||||||
|
priorityAM, recordFactory)));
|
||||||
|
app_1.updateResourceRequests(Collections.singletonList(TestUtils
|
||||||
|
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true,
|
||||||
|
priorityAM, recordFactory)));
|
||||||
|
|
||||||
|
// Start testing...
|
||||||
|
// Only AM
|
||||||
|
a.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
assertEquals(2 * GB, a.getUsedResources().getMemory());
|
||||||
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0 * GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
||||||
|
assertEquals(2 * GB, a.getMetrics().getAllocatedMB());
|
||||||
|
assertEquals(22 * GB, a.getMetrics().getAvailableMB());
|
||||||
|
assertEquals(2 * GB, node_0.getUsedResource().getMemory());
|
||||||
|
assertEquals(0 * GB, node_1.getUsedResource().getMemory());
|
||||||
|
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||||
|
|
||||||
|
a.assignContainers(clusterResource, node_1,
|
||||||
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
assertEquals(4 * GB, a.getUsedResources().getMemory());
|
||||||
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(2 * GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(0 * GB, a.getMetrics().getReservedMB());
|
||||||
|
assertEquals(4 * GB, a.getMetrics().getAllocatedMB());
|
||||||
|
assertEquals(20 * GB, a.getMetrics().getAvailableMB());
|
||||||
|
assertEquals(2 * GB, node_0.getUsedResource().getMemory());
|
||||||
|
assertEquals(2 * GB, node_1.getUsedResource().getMemory());
|
||||||
|
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||||
|
|
||||||
|
// Add a few requests to each app
|
||||||
|
app_0.updateResourceRequests(Collections.singletonList(TestUtils
|
||||||
|
.createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true,
|
||||||
|
priorityMap, recordFactory)));
|
||||||
|
app_1.updateResourceRequests(Collections.singletonList(TestUtils
|
||||||
|
.createResourceRequest(ResourceRequest.ANY, 2 * GB, 2, true,
|
||||||
|
priorityMap, recordFactory)));
|
||||||
|
|
||||||
|
// add a reservation for app_0
|
||||||
|
a.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
assertEquals(12 * GB, a.getUsedResources().getMemory());
|
||||||
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(2 * GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(8 * GB, a.getMetrics().getReservedMB());
|
||||||
|
assertEquals(4 * GB, a.getMetrics().getAllocatedMB());
|
||||||
|
assertEquals(12 * GB, a.getMetrics().getAvailableMB());
|
||||||
|
assertEquals(2 * GB, node_0.getUsedResource().getMemory());
|
||||||
|
assertEquals(2 * GB, node_1.getUsedResource().getMemory());
|
||||||
|
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||||
|
|
||||||
|
// next assignment is beyond user limit for user_0 but it should assign to
|
||||||
|
// app_1 for user_1
|
||||||
|
a.assignContainers(clusterResource, node_1,
|
||||||
|
new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
assertEquals(14 * GB, a.getUsedResources().getMemory());
|
||||||
|
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(4 * GB, app_1.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(8 * GB, a.getMetrics().getReservedMB());
|
||||||
|
assertEquals(6 * GB, a.getMetrics().getAllocatedMB());
|
||||||
|
assertEquals(10 * GB, a.getMetrics().getAvailableMB());
|
||||||
|
assertEquals(2 * GB, node_0.getUsedResource().getMemory());
|
||||||
|
assertEquals(4 * GB, node_1.getUsedResource().getMemory());
|
||||||
|
assertEquals(0 * GB, node_2.getUsedResource().getMemory());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReservationNoContinueLook() throws Exception {
|
public void testReservationNoContinueLook() throws Exception {
|
||||||
// Test that with reservations-continue-look-all-nodes feature off
|
// Test that with reservations-continue-look-all-nodes feature off
|
||||||
|
Loading…
Reference in New Issue
Block a user