YARN-2889. Limit the number of opportunistic container allocated per AM heartbeat. Contributed by Abhishek Modi.

This commit is contained in:
Inigo Goiri 2019-04-22 09:49:03 -07:00
parent 1ddb48872f
commit 96e3027e46
6 changed files with 283 additions and 8 deletions

View File

@ -402,6 +402,17 @@ public static boolean isAclEnabled(Configuration conf) {
public static final boolean public static final boolean
DEFAULT_OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = false; DEFAULT_OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = false;
/**
* Maximum number of opportunistic containers to be allocated in
* AM heartbeat.
*/
@Unstable
public static final String
OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT =
RM_PREFIX + "opportunistic.max.container-allocation.per.am.heartbeat";
public static final int
DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT = -1;
/** Number of nodes to be used by the Opportunistic Container allocator for /** Number of nodes to be used by the Opportunistic Container allocator for
* dispatching containers during container allocation. */ * dispatching containers during container allocation. */
@Unstable @Unstable

View File

@ -3340,6 +3340,15 @@
<value>false</value> <value>false</value>
</property> </property>
<property>
<description>
Maximum number of opportunistic containers to be allocated per
Application Master heartbeat.
</description>
<name>yarn.resourcemanager.opportunistic.max.container-allocation.per.am.heartbeat</name>
<value>-1</value>
</property>
<property> <property>
<description> <description>
Number of nodes to be used by the Opportunistic Container Allocator for Number of nodes to be used by the Opportunistic Container Allocator for

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.scheduler; package org.apache.hadoop.yarn.server.scheduler;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
@ -70,6 +71,8 @@ public class OpportunisticContainerAllocator {
private static final int RACK_LOCAL_LOOP = 1; private static final int RACK_LOCAL_LOOP = 1;
private static final int OFF_SWITCH_LOOP = 2; private static final int OFF_SWITCH_LOOP = 2;
private int maxAllocationsPerAMHeartbeat = -1;
/** /**
* This class encapsulates application specific parameters used to build a * This class encapsulates application specific parameters used to build a
* Container. * Container.
@ -291,6 +294,24 @@ public OpportunisticContainerAllocator(
this.tokenSecretManager = tokenSecretManager; this.tokenSecretManager = tokenSecretManager;
} }
/**
* Create a new Opportunistic Container Allocator.
* @param tokenSecretManager TokenSecretManager
* @param maxAllocationsPerAMHeartbeat max number of containers to be
* allocated in one AM heartbeat
*/
public OpportunisticContainerAllocator(
BaseContainerTokenSecretManager tokenSecretManager,
int maxAllocationsPerAMHeartbeat) {
this.tokenSecretManager = tokenSecretManager;
this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat;
}
@VisibleForTesting
void setMaxAllocationsPerAMHeartbeat(int maxAllocationsPerAMHeartbeat) {
this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat;
}
/** /**
* Allocate OPPORTUNISTIC containers. * Allocate OPPORTUNISTIC containers.
* @param blackList Resource BlackList Request * @param blackList Resource BlackList Request
@ -316,7 +337,6 @@ public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
// Add OPPORTUNISTIC requests to the outstanding ones. // Add OPPORTUNISTIC requests to the outstanding ones.
opportContext.addToOutstandingReqs(oppResourceReqs); opportContext.addToOutstandingReqs(oppResourceReqs);
Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist()); Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
Set<String> allocatedNodes = new HashSet<>(); Set<String> allocatedNodes = new HashSet<>();
List<Container> allocatedContainers = new ArrayList<>(); List<Container> allocatedContainers = new ArrayList<>();
@ -334,9 +354,21 @@ public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
// might be different than what is requested, which is why // might be different than what is requested, which is why
// we need the requested capability (key) to match against // we need the requested capability (key) to match against
// the outstanding reqs) // the outstanding reqs)
int remAllocs = -1;
if (maxAllocationsPerAMHeartbeat > 0) {
remAllocs =
maxAllocationsPerAMHeartbeat - allocatedContainers.size()
- getTotalAllocations(allocations);
if (remAllocs <= 0) {
LOG.info("Not allocating more containers as we have reached max "
+ "allocations per AM heartbeat {}",
maxAllocationsPerAMHeartbeat);
break;
}
}
Map<Resource, List<Allocation>> allocation = allocate( Map<Resource, List<Allocation>> allocation = allocate(
rmIdentifier, opportContext, schedulerKey, applicationAttemptId, rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
appSubmitter, nodeBlackList, allocatedNodes); appSubmitter, nodeBlackList, allocatedNodes, remAllocs);
if (allocation.size() > 0) { if (allocation.size() > 0) {
allocations.add(allocation); allocations.add(allocation);
continueLoop = true; continueLoop = true;
@ -356,17 +388,42 @@ public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
return allocatedContainers; return allocatedContainers;
} }
private int getTotalAllocations(
List<Map<Resource, List<Allocation>>> allocations) {
int totalAllocs = 0;
for (Map<Resource, List<Allocation>> allocation : allocations) {
for (List<Allocation> allocs : allocation.values()) {
totalAllocs += allocs.size();
}
}
return totalAllocs;
}
private Map<Resource, List<Allocation>> allocate(long rmIdentifier, private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
ApplicationAttemptId appAttId, String userName, Set<String> blackList, ApplicationAttemptId appAttId, String userName, Set<String> blackList,
Set<String> allocatedNodes) Set<String> allocatedNodes, int maxAllocations)
throws YarnException { throws YarnException {
Map<Resource, List<Allocation>> containers = new HashMap<>(); Map<Resource, List<Allocation>> containers = new HashMap<>();
for (EnrichedResourceRequest enrichedAsk : for (EnrichedResourceRequest enrichedAsk :
appContext.getOutstandingOpReqs().get(schedKey).values()) { appContext.getOutstandingOpReqs().get(schedKey).values()) {
int remainingAllocs = -1;
if (maxAllocations > 0) {
int totalAllocated = 0;
for (List<Allocation> allocs : containers.values()) {
totalAllocated += allocs.size();
}
remainingAllocs = maxAllocations - totalAllocated;
if (remainingAllocs <= 0) {
LOG.info("Not allocating more containers as max allocations per AM "
+ "heartbeat {} has reached", maxAllocationsPerAMHeartbeat);
break;
}
}
allocateContainersInternal(rmIdentifier, appContext.getAppParams(), allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
appContext.getContainerIdGenerator(), blackList, allocatedNodes, appContext.getContainerIdGenerator(), blackList, allocatedNodes,
appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk); appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk,
remainingAllocs);
ResourceRequest anyAsk = enrichedAsk.getRequest(); ResourceRequest anyAsk = enrichedAsk.getRequest();
if (!containers.isEmpty()) { if (!containers.isEmpty()) {
LOG.info("Opportunistic allocation requested for [priority={}, " LOG.info("Opportunistic allocation requested for [priority={}, "
@ -384,7 +441,7 @@ private void allocateContainersInternal(long rmIdentifier,
Set<String> blacklist, Set<String> allocatedNodes, Set<String> blacklist, Set<String> allocatedNodes,
ApplicationAttemptId id, Map<String, RemoteNode> allNodes, ApplicationAttemptId id, Map<String, RemoteNode> allNodes,
String userName, Map<Resource, List<Allocation>> allocations, String userName, Map<Resource, List<Allocation>> allocations,
EnrichedResourceRequest enrichedAsk) EnrichedResourceRequest enrichedAsk, int maxAllocations)
throws YarnException { throws YarnException {
if (allNodes.size() == 0) { if (allNodes.size() == 0) {
LOG.info("No nodes currently available to " + LOG.info("No nodes currently available to " +
@ -397,6 +454,9 @@ private void allocateContainersInternal(long rmIdentifier,
allocations.get(anyAsk.getCapability()).size()); allocations.get(anyAsk.getCapability()).size());
toAllocate = Math.min(toAllocate, toAllocate = Math.min(toAllocate,
appParams.getMaxAllocationsPerSchedulerKeyPerRound()); appParams.getMaxAllocationsPerSchedulerKeyPerRound());
if (maxAllocations >= 0) {
toAllocate = Math.min(maxAllocations, toAllocate);
}
int numAllocated = 0; int numAllocated = 0;
// Node Candidates are selected as follows: // Node Candidates are selected as follows:
// * Node local candidates selected in loop == 0 // * Node local candidates selected in loop == 0

View File

@ -643,4 +643,190 @@ public void testAllocationWithNodeLabels() throws Exception {
Assert.assertEquals(1, containers.size()); Assert.assertEquals(1, containers.size());
Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size()); Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size());
} }
/**
* Tests maximum number of opportunistic containers that can be allocated in
* AM heartbeat.
* @throws Exception
*/
@Test
public void testMaxAllocationsPerAMHeartbeat() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
allocator.setMaxAllocationsPerAMHeartbeat(2);
final Priority priority = Priority.newInstance(1);
final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true);
final Resource resource = Resources.createResource(1 * GB);
List<ResourceRequest> reqs =
Arrays.asList(
ResourceRequest.newInstance(priority, "*",
resource, 3, true, null, oppRequest),
ResourceRequest.newInstance(priority, "h6",
resource, 3, true, null, oppRequest),
ResourceRequest.newInstance(priority, "/r3",
resource, 3, true, null, oppRequest));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
LOG.info("Containers: {}", containers);
// Although capacity is present, but only 2 containers should be allocated
// as max allocation per AM heartbeat is set to 2.
Assert.assertEquals(2, containers.size());
containers = allocator.allocateContainers(
blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1");
LOG.info("Containers: {}", containers);
// Remaining 1 container should be allocated.
Assert.assertEquals(1, containers.size());
}
/**
* Tests maximum opportunistic container allocation per AM heartbeat for
* allocation requests with different scheduler key.
* @throws Exception
*/
@Test
public void testMaxAllocationsPerAMHeartbeatDifferentSchedKey()
throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
allocator.setMaxAllocationsPerAMHeartbeat(2);
final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true);
final Resource resource = Resources.createResource(1 * GB);
List<ResourceRequest> reqs =
Arrays.asList(
ResourceRequest.newInstance(Priority.newInstance(1), "*",
resource, 1, true, null, oppRequest),
ResourceRequest.newInstance(Priority.newInstance(2), "h6",
resource, 2, true, null, oppRequest),
ResourceRequest.newInstance(Priority.newInstance(3), "/r3",
resource, 2, true, null, oppRequest));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
LOG.info("Containers: {}", containers);
// Although capacity is present, but only 2 containers should be allocated
// as max allocation per AM heartbeat is set to 2.
Assert.assertEquals(2, containers.size());
containers = allocator.allocateContainers(
blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1");
LOG.info("Containers: {}", containers);
// 2 more containers should be allocated from pending allocation requests.
Assert.assertEquals(2, containers.size());
containers = allocator.allocateContainers(
blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1");
LOG.info("Containers: {}", containers);
// Remaining 1 container should be allocated.
Assert.assertEquals(1, containers.size());
}
/**
* Tests maximum opportunistic container allocation per AM heartbeat when
* limit is set to -1.
* @throws Exception
*/
@Test
public void testMaxAllocationsPerAMHeartbeatWithNoLimit() throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
allocator.setMaxAllocationsPerAMHeartbeat(-1);
Priority priority = Priority.newInstance(1);
Resource capability = Resources.createResource(1 * GB);
List<ResourceRequest> reqs = new ArrayList<>();
for (int i = 0; i < 20; i++) {
reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
.priority(priority)
.resourceName("h1")
.capability(capability)
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build());
}
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
// all containers should be allocated in single heartbeat.
Assert.assertEquals(20, containers.size());
}
/**
* Tests maximum opportunistic container allocation per AM heartbeat when
* limit is set to higher value.
* @throws Exception
*/
@Test
public void testMaxAllocationsPerAMHeartbeatWithHighLimit()
throws Exception {
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>());
allocator.setMaxAllocationsPerAMHeartbeat(100);
Priority priority = Priority.newInstance(1);
Resource capability = Resources.createResource(1 * GB);
List<ResourceRequest> reqs = new ArrayList<>();
for (int i = 0; i < 20; i++) {
reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
.priority(priority)
.resourceName("h1")
.capability(capability)
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build());
}
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
// all containers should be allocated in single heartbeat.
Assert.assertEquals(20, containers.size());
}
} }

View File

@ -473,10 +473,14 @@ protected void serviceInit(Configuration conf) throws Exception {
.getContainersMonitor(), this.aclsManager, dirsHandler); .getContainersMonitor(), this.aclsManager, dirsHandler);
addService(webServer); addService(webServer);
((NMContext) context).setWebServer(webServer); ((NMContext) context).setWebServer(webServer);
int maxAllocationsPerAMHeartbeat = conf.getInt(
YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT,
YarnConfiguration.
DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
((NMContext) context).setQueueableContainerAllocator( ((NMContext) context).setQueueableContainerAllocator(
new OpportunisticContainerAllocator( new OpportunisticContainerAllocator(
context.getContainerTokenSecretManager())); context.getContainerTokenSecretManager(),
maxAllocationsPerAMHeartbeat));
dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this); dispatcher.register(NodeManagerEventType.class, this);

View File

@ -229,8 +229,13 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext,
YarnScheduler scheduler) { YarnScheduler scheduler) {
super(OpportunisticContainerAllocatorAMService.class.getName(), super(OpportunisticContainerAllocatorAMService.class.getName(),
rmContext, scheduler); rmContext, scheduler);
int maxAllocationsPerAMHeartbeat = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT,
YarnConfiguration.
DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
this.oppContainerAllocator = new OpportunisticContainerAllocator( this.oppContainerAllocator = new OpportunisticContainerAllocator(
rmContext.getContainerTokenSecretManager()); rmContext.getContainerTokenSecretManager(),
maxAllocationsPerAMHeartbeat);
this.k = rmContext.getYarnConfiguration().getInt( this.k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED, YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED); YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED);