YARN-7739. DefaultAMSProcessor should properly check customized resource types against minimum/maximum allocation. (wangda)
Change-Id: I10cc9341237d9a2fc0f8c855efb98a36b91389e2
This commit is contained in:
parent
d4c98579e3
commit
d02e42cee4
@ -35,6 +35,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
|
||||
@ -51,6 +52,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
|
||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
/**
|
||||
@ -276,23 +278,23 @@ private static void validateResourceRequest(ResourceRequest resReq,
|
||||
throw new InvalidResourceRequestException(ye);
|
||||
}
|
||||
|
||||
if (resReq.getCapability().getMemorySize() < 0 ||
|
||||
resReq.getCapability().getMemorySize() > maximumResource.getMemorySize()) {
|
||||
throw new InvalidResourceRequestException("Invalid resource request"
|
||||
+ ", requested memory < 0"
|
||||
+ ", or requested memory > max configured"
|
||||
+ ", requestedMemory=" + resReq.getCapability().getMemorySize()
|
||||
+ ", maxMemory=" + maximumResource.getMemorySize());
|
||||
}
|
||||
if (resReq.getCapability().getVirtualCores() < 0 ||
|
||||
resReq.getCapability().getVirtualCores() >
|
||||
maximumResource.getVirtualCores()) {
|
||||
throw new InvalidResourceRequestException("Invalid resource request"
|
||||
+ ", requested virtual cores < 0"
|
||||
+ ", or requested virtual cores > max configured"
|
||||
+ ", requestedVirtualCores="
|
||||
+ resReq.getCapability().getVirtualCores()
|
||||
+ ", maxVirtualCores=" + maximumResource.getVirtualCores());
|
||||
Resource requestedResource = resReq.getCapability();
|
||||
for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
|
||||
ResourceInformation reqRI = requestedResource.getResourceInformation(i);
|
||||
ResourceInformation maxRI = maximumResource.getResourceInformation(i);
|
||||
if (reqRI.getValue() < 0 || reqRI.getValue() > maxRI.getValue()) {
|
||||
throw new InvalidResourceRequestException(
|
||||
"Invalid resource request, requested resource type=[" + reqRI
|
||||
.getName()
|
||||
+ "] < 0 or greater than maximum allowed allocation. Requested "
|
||||
+ "resource=" + requestedResource
|
||||
+ ", maximum allowed allocation=" + maximumResource
|
||||
+ ", please note that maximum allowed allocation is calculated "
|
||||
+ "by scheduler based on maximum resource of registered "
|
||||
+ "NodeManagers, which might be less than configured "
|
||||
+ "maximum allocation=" + ResourceUtils
|
||||
.getResourceTypesMaximumAllocation());
|
||||
}
|
||||
}
|
||||
String labelExp = resReq.getNodeLabelExpression();
|
||||
// we don't allow specify label expression other than resourceName=ANY now
|
||||
|
@ -19,6 +19,8 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static java.lang.Thread.sleep;
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB;
|
||||
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@ -29,6 +31,7 @@
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
@ -41,6 +44,7 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords
|
||||
.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
@ -48,27 +52,39 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.MockResourceProfileManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -665,6 +681,180 @@ public void testPriorityInAllocatedResponse() throws Exception {
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testCSValidateRequestCapacityAgainstMinMaxAllocation()
|
||||
throws Exception {
|
||||
testValidateRequestCapacityAgainstMinMaxAllocation(CapacityScheduler.class);
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testFSValidateRequestCapacityAgainstMinMaxAllocation()
|
||||
throws Exception {
|
||||
testValidateRequestCapacityAgainstMinMaxAllocation(FairScheduler.class);
|
||||
}
|
||||
|
||||
private void testValidateRequestCapacityAgainstMinMaxAllocation(Class<?> schedulerCls)
|
||||
throws Exception {
|
||||
|
||||
// Initialize resource map for 2 types.
|
||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
|
||||
// Initialize mandatory resources
|
||||
ResourceInformation memory = ResourceInformation.newInstance(
|
||||
ResourceInformation.MEMORY_MB.getName(),
|
||||
ResourceInformation.MEMORY_MB.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||
ResourceInformation vcores = ResourceInformation.newInstance(
|
||||
ResourceInformation.VCORES.getName(),
|
||||
ResourceInformation.VCORES.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||
riMap.put(ResourceInformation.MEMORY_URI, memory);
|
||||
riMap.put(ResourceInformation.VCORES_URI, vcores);
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
|
||||
CapacitySchedulerConfiguration csconf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csconf.setResourceComparator(DominantResourceCalculator.class);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration(csconf);
|
||||
// Don't reset resource types since we have already configured resource
|
||||
// types
|
||||
conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerCls,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false);
|
||||
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils
|
||||
.createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, null));
|
||||
|
||||
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
// Now request resource, memory > allowed
|
||||
boolean exception = false;
|
||||
try {
|
||||
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
|
||||
Resource.newInstance(9 * GB, 1)).numContainers(1).resourceName("*")
|
||||
.build()), null);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
exception = true;
|
||||
}
|
||||
Assert.assertTrue(exception);
|
||||
|
||||
exception = false;
|
||||
try {
|
||||
// Now request resource, vcore > allowed
|
||||
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
|
||||
Resource.newInstance(8 * GB, 18)).numContainers(1).resourceName("*")
|
||||
.build()), null);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
exception = true;
|
||||
}
|
||||
Assert.assertTrue(exception);
|
||||
|
||||
rm.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypes()
|
||||
throws Exception {
|
||||
|
||||
// Initialize resource map for 2 types.
|
||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
|
||||
// Initialize mandatory resources
|
||||
ResourceInformation memory = ResourceInformation.newInstance(
|
||||
ResourceInformation.MEMORY_MB.getName(),
|
||||
ResourceInformation.MEMORY_MB.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||
ResourceInformation vcores = ResourceInformation.newInstance(
|
||||
ResourceInformation.VCORES.getName(),
|
||||
ResourceInformation.VCORES.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||
ResourceInformation res_1 = ResourceInformation.newInstance("res_1",
|
||||
ResourceInformation.VCORES.getUnits(), 0, 4);
|
||||
riMap.put(ResourceInformation.MEMORY_URI, memory);
|
||||
riMap.put(ResourceInformation.VCORES_URI, vcores);
|
||||
riMap.put("res_1", res_1);
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
|
||||
CapacitySchedulerConfiguration csconf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csconf.setResourceComparator(DominantResourceCalculator.class);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration(csconf);
|
||||
// Don't reset resource types since we have already configured resource
|
||||
// types
|
||||
conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false);
|
||||
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
|
||||
|
||||
MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils
|
||||
.createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
|
||||
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
|
||||
ImmutableMap.of("res_1", 4)));
|
||||
|
||||
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
Assert.assertEquals(Resource.newInstance(1 * GB, 1),
|
||||
leafQueue.getUsedResources());
|
||||
|
||||
// Now request resource, memory > allowed
|
||||
boolean exception = false;
|
||||
try {
|
||||
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
|
||||
TestUtils.createResource(9 * GB, 1, ImmutableMap.of("res_1", 1)))
|
||||
.numContainers(1).resourceName("*").build()), null);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
exception = true;
|
||||
}
|
||||
Assert.assertTrue(exception);
|
||||
|
||||
exception = false;
|
||||
try {
|
||||
// Now request resource, vcore > allowed
|
||||
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
|
||||
TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1)))
|
||||
.numContainers(1).resourceName("*")
|
||||
.build()), null);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
exception = true;
|
||||
}
|
||||
Assert.assertTrue(exception);
|
||||
|
||||
exception = false;
|
||||
try {
|
||||
// Now request resource, res_1 > allowed
|
||||
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
|
||||
TestUtils.createResource(8 * GB, 1, ImmutableMap.of("res_1", 100)))
|
||||
.numContainers(1).resourceName("*")
|
||||
.build()), null);
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
exception = true;
|
||||
}
|
||||
Assert.assertTrue(exception);
|
||||
|
||||
rm.close();
|
||||
}
|
||||
|
||||
private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
RMContainer rmContainer = cs.getRMContainer(containerId);
|
||||
|
@ -478,9 +478,11 @@ public FiCaSchedulerApp getApplicationAttempt(
|
||||
public static Resource createResource(long memory, int vcores,
|
||||
Map<String, Integer> nameToValues) {
|
||||
Resource res = Resource.newInstance(memory, vcores);
|
||||
for (Map.Entry<String, Integer> entry : nameToValues.entrySet()) {
|
||||
res.setResourceInformation(entry.getKey(), ResourceInformation
|
||||
.newInstance(entry.getKey(), "", entry.getValue()));
|
||||
if (nameToValues != null) {
|
||||
for (Map.Entry<String, Integer> entry : nameToValues.entrySet()) {
|
||||
res.setResourceInformation(entry.getKey(), ResourceInformation
|
||||
.newInstance(entry.getKey(), "", entry.getValue()));
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user