From 0921b706f7f80c40e061d2c0f8c8b2e4910071e5 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Tue, 8 Jan 2019 14:30:53 +0800 Subject: [PATCH] YARN-9037. [CSI] Ignore volume resource in resource calculators based on tags. Contributed by Sunil Govindan. --- .../yarn/util/resource/ResourceUtils.java | 26 ++++++++ .../resource/DominantResourceCalculator.java | 32 +++++----- .../hadoop/yarn/util/resource/Resources.java | 16 ++--- .../yarn/util/resource/TestResourceUtils.java | 53 ++++++++++++++++ .../resource-types/resource-types-6.xml | 58 ++++++++++++++++++ .../volume/csi/ContainerVolumePublisher.java | 4 +- ...AbstractPreemptableResourceCalculator.java | 2 +- .../capacity/TempQueuePerPartition.java | 2 +- .../scheduler/ClusterNodeTracker.java | 2 +- .../scheduler/SchedulerUtils.java | 6 +- .../scheduler/capacity/ParentQueue.java | 4 +- .../scheduler/fair/ConfigurableResource.java | 2 +- .../fair/FairSchedulerConfiguration.java | 2 +- .../DominantResourceFairnessPolicy.java | 2 +- .../volume/csi/TestVolumeProcessor.java | 61 +++++++++++++++++++ 15 files changed, 236 insertions(+), 36 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/resource-types-6.xml diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 20b64bd87e..26d75922ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -60,6 +60,7 @@ public class ResourceUtils { public static final String TAGS = ".tags"; public static final String MINIMUM_ALLOCATION = ".minimum-allocation"; public static final String MAXIMUM_ALLOCATION = ".maximum-allocation"; + public static final String EXTERNAL_VOLUME_RESOURCE_TAG = "system:csi-volume"; private static final String MEMORY = ResourceInformation.MEMORY_MB.getName(); private static final String VCORES = ResourceInformation.VCORES.getName(); @@ -74,10 +75,12 @@ public class ResourceUtils { private static final Map RESOURCE_NAME_TO_INDEX = new ConcurrentHashMap(); private static volatile Map resourceTypes; + private static volatile Map nonCountableResourceTypes; private static volatile ResourceInformation[] resourceTypesArray; private static volatile boolean initializedNodeResources = false; private static volatile Map readOnlyNodeResources; private static volatile int numKnownResourceTypes = -1; + private static volatile int numNonCountableResourceTypes = -1; static final Logger LOG = LoggerFactory.getLogger(ResourceUtils.class); @@ -290,15 +293,18 @@ static void initializeResourcesMap(Configuration conf) { public static void initializeResourcesFromResourceInformationMap( Map resourceInformationMap) { resourceTypes = Collections.unmodifiableMap(resourceInformationMap); + nonCountableResourceTypes = new HashMap<>(); updateKnownResources(); updateResourceTypeIndex(); initializedResources = true; numKnownResourceTypes = resourceTypes.size(); + numNonCountableResourceTypes = nonCountableResourceTypes.size(); } private static void updateKnownResources() { // Update resource names. resourceTypesArray = new ResourceInformation[resourceTypes.size()]; + List nonCountableResources = new ArrayList<>(); int index = 2; for (ResourceInformation resInfo : resourceTypes.values()) { @@ -309,10 +315,22 @@ private static void updateKnownResources() { resourceTypesArray[1] = ResourceInformation .newInstance(resourceTypes.get(VCORES)); } else { + if (resInfo.getTags() != null && resInfo.getTags() + .contains(EXTERNAL_VOLUME_RESOURCE_TAG)) { + nonCountableResources.add(resInfo); + continue; + } resourceTypesArray[index] = ResourceInformation.newInstance(resInfo); index++; } } + + // Add all non-countable resource types to the end of the resource array. + for(ResourceInformation resInfo: nonCountableResources) { + resourceTypesArray[index] = ResourceInformation.newInstance(resInfo); + nonCountableResourceTypes.put(resInfo.getName(), resInfo); + index++; + } } private static void updateResourceTypeIndex() { @@ -355,6 +373,13 @@ public static int getNumberOfKnownResourceTypes() { return numKnownResourceTypes; } + public static int getNumberOfCountableResourceTypes() { + if (numKnownResourceTypes < 0) { + initializeResourceTypesIfNeeded(); + } + return numKnownResourceTypes - numNonCountableResourceTypes; + } + private static Map getResourceTypes( Configuration conf) { return getResourceTypes(conf, @@ -383,6 +408,7 @@ private static void initializeResourceTypesIfNeeded(Configuration conf, } } numKnownResourceTypes = resourceTypes.size(); + numNonCountableResourceTypes = nonCountableResourceTypes.size(); } private static Map getResourceTypes( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 29d7e7ed3b..17244e9eef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -72,7 +72,7 @@ private int compare(Resource lhs, Resource rhs) { boolean rhsGreater = false; int ret = 0; - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation lhsResourceInformation = lhs .getResourceInformation(i); @@ -110,7 +110,7 @@ public int compare(Resource clusterResource, Resource lhs, Resource rhs, // resources and then look for which resource has the biggest // share overall. ResourceInformation[] clusterRes = clusterResource.getResources(); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); // If array creation shows up as a time sink, these arrays could be cached // because they're always the same length. @@ -183,7 +183,7 @@ private void calculateShares(ResourceInformation[] clusterRes, Resource first, ResourceInformation[] firstRes = first.getResources(); ResourceInformation[] secondRes = second.getResources(); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { firstShares[i] = calculateShare(clusterRes[i], firstRes[i]); secondShares[i] = calculateShare(clusterRes[i], secondRes[i]); @@ -274,7 +274,7 @@ private void calculateShares(ResourceInformation[] clusterRes, Resource first, max[0] = 0.0; max[1] = 0.0; - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { firstShares[i] = calculateShare(clusterRes[i], firstRes[i]); secondShares[i] = calculateShare(clusterRes[i], secondRes[i]); @@ -330,7 +330,7 @@ private double compareShares(double[] lhsShares, double[] rhsShares) { public long computeAvailableContainers(Resource available, Resource required) { long min = Long.MAX_VALUE; - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation availableResource = available .getResourceInformation(i); @@ -346,7 +346,7 @@ public long computeAvailableContainers(Resource available, @Override public float divide(Resource clusterResource, Resource numerator, Resource denominator) { - int nKnownResourceTypes = ResourceUtils.getNumberOfKnownResourceTypes(); + int nKnownResourceTypes = ResourceUtils.getNumberOfCountableResourceTypes(); ResourceInformation[] clusterRes = clusterResource.getResources(); // We have to provide the calculateShares() method with somewhere to store // the shares. We don't actually need these shares afterwards. @@ -375,7 +375,7 @@ public boolean isInvalidDivisor(Resource r) { @Override public float ratio(Resource a, Resource b) { float ratio = 0.0f; - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation aResourceInformation = a.getResourceInformation(i); ResourceInformation bResourceInformation = b.getResourceInformation(i); @@ -393,7 +393,7 @@ public Resource divideAndCeil(Resource numerator, int denominator) { public Resource divideAndCeil(Resource numerator, long denominator) { Resource ret = Resource.newInstance(numerator); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation resourceInformation = ret.getResourceInformation(i); resourceInformation @@ -414,7 +414,7 @@ public Resource divideAndCeil(Resource numerator, float denominator) { public Resource normalize(Resource r, Resource minimumResource, Resource maximumResource, Resource stepFactor) { Resource ret = Resource.newInstance(r); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation rResourceInformation = r.getResourceInformation(i); ResourceInformation minimumResourceInformation = minimumResource @@ -448,7 +448,7 @@ public Resource roundDown(Resource r, Resource stepFactor) { private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) { Resource ret = Resource.newInstance(r); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation rResourceInformation = r.getResourceInformation(i); ResourceInformation stepFactorResourceInformation = stepFactor @@ -473,7 +473,7 @@ private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) { public Resource multiplyAndNormalizeUp(Resource r, double[] by, Resource stepFactor) { Resource ret = Resource.newInstance(r); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation rResourceInformation = r.getResourceInformation(i); ResourceInformation stepFactorResourceInformation = stepFactor @@ -502,7 +502,7 @@ public Resource multiplyAndNormalizeDown(Resource r, double by, private Resource multiplyAndNormalize(Resource r, double by, Resource stepFactor, boolean roundUp) { Resource ret = Resource.newInstance(r); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation rResourceInformation = r.getResourceInformation(i); ResourceInformation stepFactorResourceInformation = stepFactor @@ -528,7 +528,7 @@ private Resource multiplyAndNormalize(Resource r, double by, @Override public boolean fitsIn(Resource smaller, Resource bigger) { - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation sResourceInformation = smaller .getResourceInformation(i); @@ -544,7 +544,7 @@ public boolean fitsIn(Resource smaller, Resource bigger) { @Override public Resource normalizeDown(Resource r, Resource stepFactor) { Resource ret = Resource.newInstance(r); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation rResourceInformation = r.getResourceInformation(i); ResourceInformation stepFactorResourceInformation = stepFactor @@ -564,7 +564,7 @@ public Resource normalizeDown(Resource r, Resource stepFactor) { @Override public boolean isAnyMajorResourceZeroOrNegative(Resource resource) { - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation resourceInformation = resource.getResourceInformation( i); @@ -577,7 +577,7 @@ public boolean isAnyMajorResourceZeroOrNegative(Resource resource) { @Override public boolean isAnyMajorResourceAboveZero(Resource resource) { - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation resourceInformation = resource.getResourceInformation( i); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 48c2c364ae..bf1df8dbfa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -251,7 +251,7 @@ public static Resource clone(Resource res) { } public static Resource addTo(Resource lhs, Resource rhs) { - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { try { ResourceInformation rhsValue = rhs.getResourceInformation(i); @@ -270,7 +270,7 @@ public static Resource add(Resource lhs, Resource rhs) { } public static Resource subtractFrom(Resource lhs, Resource rhs) { - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { try { ResourceInformation rhsValue = rhs.getResourceInformation(i); @@ -325,7 +325,7 @@ public static Resource negate(Resource resource) { } public static Resource multiplyTo(Resource lhs, double by) { - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { try { ResourceInformation lhsValue = lhs.getResourceInformation(i); @@ -348,7 +348,7 @@ public static Resource multiply(Resource lhs, double by) { */ public static Resource multiplyAndAddTo( Resource lhs, Resource rhs, double by) { - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { try { ResourceInformation rhsValue = rhs.getResourceInformation(i); @@ -381,7 +381,7 @@ public static Resource multiplyAndNormalizeDown( public static Resource multiplyAndRoundDown(Resource lhs, double by) { Resource out = clone(lhs); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { try { ResourceInformation lhsValue = lhs.getResourceInformation(i); @@ -490,7 +490,7 @@ public static Resource max( } public static boolean fitsIn(Resource smaller, Resource bigger) { - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { try { ResourceInformation rhsValue = bigger.getResourceInformation(i); @@ -513,7 +513,7 @@ public static boolean fitsIn(ResourceCalculator rc, public static Resource componentwiseMin(Resource lhs, Resource rhs) { Resource ret = createResource(0); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { try { ResourceInformation rhsValue = rhs.getResourceInformation(i); @@ -532,7 +532,7 @@ public static Resource componentwiseMin(Resource lhs, Resource rhs) { public static Resource componentwiseMax(Resource lhs, Resource rhs) { Resource ret = createResource(0); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { try { ResourceInformation rhsValue = rhs.getResourceInformation(i); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java index a9c98bd77d..d6e0565776 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceUtils.java @@ -400,4 +400,57 @@ public static String setupResourceTypes(Configuration conf, String filename) ResourceUtils.getResourceTypes(); return dest.getAbsolutePath(); } + + @Test + public void testMultipleOpsForResourcesWithTags() throws Exception { + + Configuration conf = new YarnConfiguration(); + setupResourceTypes(conf, "resource-types-6.xml"); + Resource resourceA = Resource.newInstance(2, 4); + Resource resourceB = Resource.newInstance(3, 6); + + resourceA.setResourceInformation("resource1", + ResourceInformation.newInstance("resource1", "T", 5L)); + + resourceA.setResourceInformation("resource2", + ResourceInformation.newInstance("resource2", "M", 2L)); + resourceA.setResourceInformation("yarn.io/gpu", + ResourceInformation.newInstance("yarn.io/gpu", "", 1)); + resourceA.setResourceInformation("yarn.io/test-volume", + ResourceInformation.newInstance("yarn.io/test-volume", "", 2)); + + resourceB.setResourceInformation("resource1", + ResourceInformation.newInstance("resource1", "T", 3L)); + + resourceB.setResourceInformation("resource2", + ResourceInformation.newInstance("resource2", "M", 4L)); + resourceB.setResourceInformation("yarn.io/gpu", + ResourceInformation.newInstance("yarn.io/gpu", "", 2)); + resourceB.setResourceInformation("yarn.io/test-volume", + ResourceInformation.newInstance("yarn.io/test-volume", "", 3)); + + Resource addedResource = Resources.add(resourceA, resourceB); + Assert.assertEquals(addedResource.getMemorySize(), 5); + Assert.assertEquals(addedResource.getVirtualCores(), 10); + Assert.assertEquals( + addedResource.getResourceInformation("resource1").getValue(), 8); + + // Verify that value of resourceA and resourceB is not added up for + // "yarn.io/test-volume". + Assert.assertEquals( + addedResource.getResourceInformation("yarn.io/test-volume").getValue(), + 2); + + Resource mulResource = Resources.multiplyAndRoundDown(resourceA, 3); + Assert.assertEquals(mulResource.getMemorySize(), 6); + Assert.assertEquals(mulResource.getVirtualCores(), 12); + Assert.assertEquals( + mulResource.getResourceInformation("resource1").getValue(), 15); + + // Verify that value of resourceA is not multiplied up for + // "yarn.io/test-volume". + Assert.assertEquals( + mulResource.getResourceInformation("yarn.io/test-volume").getValue(), + 2); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/resource-types-6.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/resource-types-6.xml new file mode 100644 index 0000000000..5987ccebb7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/resource-types/resource-types-6.xml @@ -0,0 +1,58 @@ + + + + + + + + yarn.resource-types + resource1,resource2,resource3,yarn.io/gpu,yarn.io/test-volume + + + + yarn.resource-types.resource1.units + G + + + + yarn.resource-types.resource2.units + m + + + + yarn.resource-types.resource3.units + G + + + + yarn.resource-types.resource3.tags + resource3_tag_1,resource3_tag_2 + + + + yarn.resource-types.yarn.io/gpu.units + + + + + yarn.resource-types.yarn.io/test-volume.units + G + + + + yarn.resource-types.yarn.io/test-volume.tags + system:csi-volume + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java index 78f2d2db02..3fec9596e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/volume/csi/ContainerVolumePublisher.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.DockerLinuxContainerRuntime; +import org.apache.hadoop.yarn.server.volume.csi.CsiConstants; import org.apache.hadoop.yarn.server.volume.csi.VolumeMetaData; import org.apache.hadoop.yarn.server.volume.csi.exception.InvalidVolumeException; import org.slf4j.Logger; @@ -120,7 +121,8 @@ private List getVolumes() throws InvalidVolumeException { if (containerResource != null) { for (ResourceInformation resourceInformation : containerResource.getAllResourcesListCopy()) { - if (resourceInformation.getTags().contains("system:csi-volume")) { + if (resourceInformation.getTags() + .contains(CsiConstants.CSI_VOLUME_RESOURCE_TAG)) { volumes.addAll(VolumeMetaData.fromResource(resourceInformation)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java index 5b8360a1c9..c8f68a26a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -262,7 +262,7 @@ protected void initIdealAssignment(Resource totGuarant, private void resetCapacity(Resource clusterResource, Collection queues, boolean ignoreGuar) { Resource activeCap = Resource.newInstance(0, 0); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); if (ignoreGuar) { for (TempQueuePerPartition q : queues) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 4fb1862b88..57dc639570 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -98,7 +98,7 @@ public TempQueuePerPartition(String queueName, Resource current, } this.normalizedGuarantee = new double[ResourceUtils - .getNumberOfKnownResourceTypes()]; + .getNumberOfCountableResourceTypes()]; this.children = new ArrayList<>(); this.apps = new ArrayList<>(); this.untouchableExtra = Resource.newInstance(0, 0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index 0f72c761e8..7e12aae5be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -73,7 +73,7 @@ public class ClusterNodeTracker { private boolean reportedMaxAllocation = false; public ClusterNodeTracker() { - maxAllocation = new long[ResourceUtils.getNumberOfKnownResourceTypes()]; + maxAllocation = new long[ResourceUtils.getNumberOfCountableResourceTypes()]; Arrays.fill(maxAllocation, -1); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index a048dacb68..fe6682c96d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -355,7 +355,7 @@ private static void validateResourceRequest(ResourceRequest resReq, private static Map getZeroResources( Resource resource) { Map resourceInformations = Maps.newHashMap(); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation resourceInformation = @@ -372,7 +372,7 @@ private static Map getZeroResources( @VisibleForTesting static void checkResourceRequestAgainstAvailableResource(Resource reqResource, Resource availableResource) throws InvalidResourceRequestException { - for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) { + for (int i = 0; i < ResourceUtils.getNumberOfCountableResourceTypes(); i++) { final ResourceInformation requestedRI = reqResource.getResourceInformation(i); final String reqResourceName = requestedRI.getName(); @@ -404,7 +404,7 @@ static void checkResourceRequestAgainstAvailableResource(Resource reqResource, } List invalidResources = Lists.newArrayList(); - for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) { + for (int i = 0; i < ResourceUtils.getNumberOfCountableResourceTypes(); i++) { final ResourceInformation requestedRI = reqResource.getResourceInformation(i); final String reqResourceName = requestedRI.getName(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 522c10eca0..cd7518f26d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -1031,7 +1031,7 @@ private void calculateEffectiveResourcesAndCapacity(String label, private Resource getMinResourceNormalized(String name, Map effectiveMinRatio, Resource minResource) { Resource ret = Resource.newInstance(minResource); - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation nResourceInformation = minResource .getResourceInformation(i); @@ -1055,7 +1055,7 @@ private Map getEffectiveMinRatioPerResource( Resource configuredMinResources, Resource numeratorForMinRatio) { Map effectiveMinRatioPerResource = new HashMap<>(); if (numeratorForMinRatio != null) { - int maxLength = ResourceUtils.getNumberOfKnownResourceTypes(); + int maxLength = ResourceUtils.getNumberOfCountableResourceTypes(); for (int i = 0; i < maxLength; i++) { ResourceInformation nResourceInformation = numeratorForMinRatio .getResourceInformation(i); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java index f772c4d1c0..62bad4414b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ConfigurableResource.java @@ -63,7 +63,7 @@ public ConfigurableResource(Resource resource) { private static double[] getOneHundredPercentArray() { double[] resourcePercentages = - new double[ResourceUtils.getNumberOfKnownResourceTypes()]; + new double[ResourceUtils.getNumberOfCountableResourceTypes()]; Arrays.fill(resourcePercentages, 1.0); return resourcePercentages; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 3116ad669f..3fa6d85fb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -599,7 +599,7 @@ private static int parseOldStyleResourceVcores(String lCaseValue) private static double[] getResourcePercentage( String val) throws AllocationConfigurationException { int numberOfKnownResourceTypes = ResourceUtils - .getNumberOfKnownResourceTypes(); + .getNumberOfCountableResourceTypes(); double[] resourcePercentage = new double[numberOfKnownResourceTypes]; String[] strings = val.split(","); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 59635d987a..e820341408 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -48,7 +48,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { public static final String NAME = "DRF"; private static final int NUM_RESOURCES = - ResourceUtils.getNumberOfKnownResourceTypes(); + ResourceUtils.getNumberOfCountableResourceTypes(); private static final DominantResourceFairnessComparator COMPARATORN = new DominantResourceFairnessComparatorN(); private static final DominantResourceFairnessComparator COMPARATOR2 = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java index cee8fdf083..a31c6206a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/volume/csi/TestVolumeProcessor.java @@ -17,14 +17,17 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.volume.csi; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.CsiAdaptorProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesRequest; import org.apache.hadoop.yarn.api.protocolrecords.ValidateVolumeCapabilitiesResponse; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceSizing; @@ -57,7 +60,9 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doReturn; @@ -132,6 +137,9 @@ private void writeTmpResourceTypesFile(File tmpFile) throws IOException { yarnConf.set(YarnConfiguration.RESOURCE_TYPES, VOLUME_RESOURCE_NAME); yarnConf.set("yarn.resource-types." + VOLUME_RESOURCE_NAME + ".units", "Mi"); + yarnConf.set("yarn.resource-types." + + VOLUME_RESOURCE_NAME + ".tags", + CsiConstants.CSI_VOLUME_RESOURCE_TAG); yarnConf.writeXml(fw); } finally { fw.close(); @@ -267,4 +275,57 @@ public void testProvisioningFailures() throws Exception { } rm.stop(); } + + @Test (timeout = 10000L) + public void testVolumeResourceAllocate() throws Exception { + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, mockNMS[0]); + Resource resource = Resource.newInstance(1024, 1); + ResourceInformation volumeResource = ResourceInformation + .newInstance(VOLUME_RESOURCE_NAME, "Mi", 1024, + ResourceTypes.COUNTABLE, 0, Long.MAX_VALUE, + ImmutableSet.of(CsiConstants.CSI_VOLUME_RESOURCE_TAG), + ImmutableMap.of( + CsiConstants.CSI_VOLUME_ID, "test-vol-000001", + CsiConstants.CSI_DRIVER_NAME, "hostpath", + CsiConstants.CSI_VOLUME_MOUNT, "/mnt/data" + ) + ); + resource.setResourceInformation(VOLUME_RESOURCE_NAME, volumeResource); + SchedulingRequest sc = SchedulingRequest + .newBuilder().allocationRequestId(0L) + .resourceSizing(ResourceSizing.newInstance(1, resource)) + .build(); + + // inject adaptor client for testing + CsiAdaptorProtocol mockedClient = Mockito + .mock(CsiAdaptorProtocol.class); + rm.getRMContext().getVolumeManager() + .registerCsiDriverAdaptor("hostpath", mockedClient); + + // simulate validation succeed + doReturn(ValidateVolumeCapabilitiesResponse.newInstance(true, "")) + .when(mockedClient) + .validateVolumeCapacity(any(ValidateVolumeCapabilitiesRequest.class)); + + am1.addSchedulingRequest(ImmutableList.of(sc)); + List allocated = new ArrayList<>(); + while (allocated.size() != 1) { + AllocateResponse response = am1.schedule(); + mockNMS[0].nodeHeartbeat(true); + allocated.addAll(response.getAllocatedContainers()); + Thread.sleep(500); + } + + Assert.assertEquals(1, allocated.size()); + Container alloc = allocated.get(0); + Assert.assertEquals(alloc.getResource().getMemorySize(), 1024); + Assert.assertEquals(alloc.getResource().getVirtualCores(), 1); + ResourceInformation allocatedVolume = + alloc.getResource().getResourceInformation(VOLUME_RESOURCE_NAME); + Assert.assertNotNull(allocatedVolume); + Assert.assertEquals(allocatedVolume.getValue(), 1024); + Assert.assertEquals(allocatedVolume.getUnits(), "Mi"); + rm.stop(); + } } \ No newline at end of file