YARN-1141. Updating resource requests should be decoupled with updating blacklist (Zhijie Shen via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1528632 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-10-02 21:31:47 +00:00
parent 3c7813bbad
commit 0a6e275ee3
10 changed files with 148 additions and 56 deletions

View File

@ -124,6 +124,9 @@ Release 2.1.2 - UNRELEASED
install with https enabled doesn't have broken link on NM UI. (Omkar Vinit
Joshi via vinodkv)
YARN-1141. Updating resource requests should be decoupled with updating
blacklist (Zhijie Shen via bikas)
Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES

View File

@ -116,14 +116,11 @@ public int getNewContainerId() {
* The ApplicationMaster is updating resource requirements for the
* application, by asking for more resources and releasing resources acquired
* by the application.
*
*
* @param requests resources to be acquired
* @param blacklistAdditions resources to be added to the blacklist
* @param blacklistRemovals resources to be removed from the blacklist
*/
synchronized public void updateResourceRequests(
List<ResourceRequest> requests,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
List<ResourceRequest> requests) {
QueueMetrics metrics = queue.getMetrics();
// Update resource requests
@ -181,11 +178,16 @@ synchronized public void updateResourceRequests(
lastRequestContainers)));
}
}
}
//
// Update blacklist
//
/**
* The ApplicationMaster is updating the blacklist
*
* @param blacklistAdditions resources to be added to the blacklist
* @param blacklistRemovals resources to be removed from the blacklist
*/
synchronized public void updateBlacklist(
List<String> blacklistAdditions, List<String> blacklistRemovals) {
// Add to blacklist
if (blacklistAdditions != null) {
blacklist.addAll(blacklistAdditions);

View File

@ -572,8 +572,7 @@ ask, getResourceCalculator(), getClusterResources(),
application.showRequests();
// Update application requests
application.updateResourceRequests(ask,
blacklistAdditions, blacklistRemovals);
application.updateResourceRequests(ask);
LOG.debug("allocate: post-update");
application.showRequests();
@ -585,6 +584,8 @@ ask, getResourceCalculator(), getClusterResources(),
" #ask=" + ask.size());
}
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
return application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());
}

View File

@ -141,10 +141,16 @@ public String getUser() {
}
public synchronized void updateResourceRequests(
List<ResourceRequest> requests,
List<ResourceRequest> requests) {
if (!isStopped) {
this.appSchedulingInfo.updateResourceRequests(requests);
}
}
public synchronized void updateBlacklist(
List<String> blacklistAdditions, List<String> blacklistRemovals) {
if (!isStopped) {
this.appSchedulingInfo.updateResourceRequests(requests,
this.appSchedulingInfo.updateBlacklist(
blacklistAdditions, blacklistRemovals);
}
}

View File

@ -138,7 +138,7 @@ public String getUser() {
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
this.appSchedulingInfo.updateResourceRequests(requests, null, null);
this.appSchedulingInfo.updateResourceRequests(requests);
}
public Map<String, ResourceRequest> getResourceRequests(Priority priority) {

View File

@ -304,7 +304,7 @@ public Allocation allocate(
application.showRequests();
// Update application requests
application.updateResourceRequests(ask, blacklistAdditions, blacklistRemovals);
application.updateResourceRequests(ask);
LOG.debug("allocate: post-update" +
" applicationId=" + applicationAttemptId +
@ -316,13 +316,16 @@ public Allocation allocate(
" #ask=" + ask.size());
}
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
return new Allocation(
application.pullNewlyAllocatedContainers(),
application.getHeadroom());
}
}
private FiCaSchedulerApp getApplication(
@VisibleForTesting
FiCaSchedulerApp getApplication(
ApplicationAttemptId applicationAttemptId) {
return applications.get(applicationAttemptId);
}

View File

@ -513,7 +513,7 @@ public void testHeadroom() throws Exception {
app_0_0_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
true, priority_1, recordFactory));
app_0_0.updateResourceRequests(app_0_0_requests, null, null);
app_0_0.updateResourceRequests(app_0_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0);
@ -532,7 +532,7 @@ public void testHeadroom() throws Exception {
app_0_1_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
true, priority_1, recordFactory));
app_0_1.updateResourceRequests(app_0_1_requests, null, null);
app_0_1.updateResourceRequests(app_0_1_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0); // Schedule to compute
@ -551,7 +551,7 @@ public void testHeadroom() throws Exception {
app_1_0_requests.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
true, priority_1, recordFactory));
app_1_0.updateResourceRequests(app_1_0_requests, null, null);
app_1_0.updateResourceRequests(app_1_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0); // Schedule to compute

View File

@ -28,6 +28,7 @@
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@ -41,6 +42,7 @@
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@ -52,6 +54,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@ -62,11 +65,14 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
@ -512,8 +518,41 @@ private int getQueueCount(List<QueueUserACLInfo> queueInformation, String queueN
}
return result;
}
@SuppressWarnings("resource")
@Test
public void testBlackListNodes() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
String host = "127.0.0.1";
RMNode node =
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
cs.handle(new NodeAddedSchedulerEvent(node));
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
SchedulerEvent event = new AppAddedSchedulerEvent(appAttemptId, "default",
"user");
cs.handle(event);
// Verify the blacklist can be updated independent of requesting containers
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null);
Assert.assertTrue(cs.getApplication(appAttemptId).isBlacklisted(host));
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host));
Assert.assertFalse(cs.getApplication(appAttemptId).isBlacklisted(host));
rm.stop();
}
@Test (timeout = 5000)
public void testApplicationComparator()
{

View File

@ -294,7 +294,7 @@ public void testSingleQueueOneUserMetrics() throws Exception {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
// Start testing...
@ -416,11 +416,11 @@ public void testSingleQueueWithOneUser() throws Exception {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
// Start testing...
@ -549,11 +549,11 @@ public void testUserLimits() throws Exception {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
/**
* Start testing...
@ -574,7 +574,7 @@ public void testUserLimits() throws Exception {
// Pre MAPREDUCE-3732 this test should fail without this block too
// app_2.updateResourceRequests(Collections.singletonList(
// TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
// recordFactory)));
// recordFactory)));
// 1 container to user_0
a.assignContainers(clusterResource, node_0);
@ -642,11 +642,11 @@ public void testHeadroomWithMaxCap() throws Exception {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
/**
* Start testing...
@ -681,7 +681,7 @@ public void testHeadroomWithMaxCap() throws Exception {
a.setMaxCapacity(.1f);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
// No more to user_0 since he is already over user-limit
@ -698,7 +698,7 @@ public void testHeadroomWithMaxCap() throws Exception {
LOG.info("here");
app_1.updateResourceRequests(Collections.singletonList( // unset
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1);
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
@ -759,11 +759,11 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
/**
* Start testing...
@ -793,11 +793,11 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
app_3.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
// Now allocations should goto app_2 since
// user_0 is at limit inspite of high user-limit-factor
@ -921,11 +921,11 @@ public void testReservation() throws Exception {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
// Start testing...
@ -1025,7 +1025,7 @@ public void testStolenReservedContainer() throws Exception {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
// Setup app_1 to request a 4GB container on host_0 and
// another 4GB container anywhere.
@ -1037,7 +1037,7 @@ public void testStolenReservedContainer() throws Exception {
true, priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 2,
true, priority, recordFactory));
app_1.updateResourceRequests(appRequests_1, null, null);
app_1.updateResourceRequests(appRequests_1);
// Start testing...
@ -1132,11 +1132,11 @@ public void testReservationExchange() throws Exception {
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
priority, recordFactory)), null, null);
priority, recordFactory)));
// Start testing...
@ -1261,7 +1261,7 @@ public void testLocalityScheduling() throws Exception {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0, null, null);
app_0.updateResourceRequests(app_0_requests_0);
// Start testing...
CSAssignment assignment = null;
@ -1326,7 +1326,7 @@ public void testLocalityScheduling() throws Exception {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0, null, null);
app_0.updateResourceRequests(app_0_requests_0);
assertEquals(2, app_0.getTotalRequiredResources(priority));
String host_3 = "127.0.0.4"; // on rack_1
@ -1417,7 +1417,7 @@ public void testApplicationPriorityScheduling() throws Exception {
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
true, priority_2, recordFactory));
app_0.updateResourceRequests(app_0_requests_0, null, null);
app_0.updateResourceRequests(app_0_requests_0);
// Start testing...
@ -1532,7 +1532,7 @@ public void testSchedulingConstraints() throws Exception {
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0, null, null);
app_0.updateResourceRequests(app_0_requests_0);
// Start testing...
@ -1541,7 +1541,7 @@ public void testSchedulingConstraints() throws Exception {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0, null, null);
app_0.updateResourceRequests(app_0_requests_0);
// NODE_LOCAL - node_0_1
a.assignContainers(clusterResource, node_0_0);
@ -1564,7 +1564,7 @@ public void testSchedulingConstraints() throws Exception {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0, null, null);
app_0.updateResourceRequests(app_0_requests_0);
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
@ -1809,8 +1809,8 @@ public void testLocalityConstraints() throws Exception {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
false, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0,
Collections.singletonList(host_0_0), null);
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateBlacklist(Collections.singletonList(host_0_0), null);
app_0_requests_0.clear();
//
@ -1850,8 +1850,8 @@ public void testLocalityConstraints() throws Exception {
app_0_requests_0.add(
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
true, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0,
Collections.singletonList(host_1_1), null);
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateBlacklist(Collections.singletonList(host_1_1), null);
app_0_requests_0.clear();
// resourceName: <priority, memory, #containers, relaxLocality>
@ -1877,7 +1877,8 @@ public void testLocalityConstraints() throws Exception {
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Now, remove node_1_1 from blacklist, but add rack_1 to blacklist
app_0.updateResourceRequests(app_0_requests_0,
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateBlacklist(
Collections.singletonList(rack_1), Collections.singletonList(host_1_1));
app_0_requests_0.clear();
@ -1904,8 +1905,8 @@ public void testLocalityConstraints() throws Exception {
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
// Now remove rack_1 from blacklist
app_0.updateResourceRequests(app_0_requests_0,
null, Collections.singletonList(rack_1));
app_0.updateResourceRequests(app_0_requests_0);
app_0.updateBlacklist(null, Collections.singletonList(rack_1));
app_0_requests_0.clear();
// resourceName: <priority, memory, #containers, relaxLocality>
@ -1937,7 +1938,7 @@ public void testLocalityConstraints() throws Exception {
app_0_requests_0.add(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
false, priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0, null, null);
app_0.updateResourceRequests(app_0_requests_0);
app_0_requests_0.clear();
// resourceName: <priority, memory, #containers, relaxLocality>

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import junit.framework.Assert;
@ -43,6 +44,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@ -67,7 +69,8 @@
public class TestFifoScheduler {
private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
private final int GB = 1024;
private ResourceManager resourceManager = null;
private static final RecordFactory recordFactory =
@ -424,6 +427,40 @@ public void testConcurrentAccessOnApplications() throws Exception {
fs.applications, FiCaSchedulerApp.class);
}
@SuppressWarnings("resource")
@Test
public void testBlackListNodes() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler();
String host = "127.0.0.1";
RMNode node =
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
fs.handle(new NodeAddedSchedulerEvent(node));
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId, 1);
SchedulerEvent event = new AppAddedSchedulerEvent(appAttemptId, "default",
"user");
fs.handle(event);
// Verify the blacklist can be updated independent of requesting containers
fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null);
Assert.assertTrue(fs.getApplication(appAttemptId).isBlacklisted(host));
fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host));
Assert.assertFalse(fs.getApplication(appAttemptId).isBlacklisted(host));
rm.stop();
}
private void checkApplicationResourceUsage(int expected,
Application application) {
Assert.assertEquals(expected, application.getUsedResources().getMemory());