diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 0a01c60e53..d126f0980b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -832,4 +832,8 @@ public long getAggegatedReleasedContainers() { public long getAggregatePreemptedContainers() { return aggregateContainersPreempted.value(); } + + public QueueMetricsForCustomResources getQueueMetricsForCustomResources() { + return queueMetricsForCustomResources; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java index e8c8897920..3470858027 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java @@ -101,4 +101,8 @@ Map getReservedValues() { QueueMetricsCustomResource getAggregatePreemptedSeconds() { return aggregatePreemptedSeconds; } + + public QueueMetricsCustomResource getAvailable() { + return available; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 3deddee5d6..044254ddad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -42,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsCustomResource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsForCustomResources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.util.resource.Resources; @@ -517,6 +520,29 @@ private Resource computeMaxAMResource() { getMaxShare().getVirtualCores())); } + QueueMetricsForCustomResources metricsForCustomResources = + scheduler.getRootQueueMetrics().getQueueMetricsForCustomResources(); + + if (metricsForCustomResources != null) { + QueueMetricsCustomResource availableResources = + metricsForCustomResources.getAvailable(); + + // We expect all custom resources contained in availableResources, + // so we will loop through all of them. + for (Map.Entry availableEntry : availableResources + .getValues().entrySet()) { + String resourceName = availableEntry.getKey(); + + // We only update the value if fairshare is 0 for that resource. + if (maxResource.getResourceValue(resourceName) == 0) { + Long availableValue = availableEntry.getValue(); + long value = Math.min(availableValue, + getMaxShare().getResourceValue(resourceName)); + maxResource.setResourceValue(resourceName, value); + } + } + } + // Round up to allow AM to run when there is only one vcore on the cluster return Resources.multiplyAndRoundUp(maxResource, maxAMShare); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java index cfea49291b..d0ddd4293d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java @@ -310,4 +310,8 @@ static FSQueueMetrics forQueue(MetricsSystem ms, String queueName, return (FSQueueMetrics)metrics; } + + FSQueueMetricsForCustomResources getCustomResources() { + return customResources; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 4a738ca07f..0cf1a7b416 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; @@ -42,19 +43,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsCustomResource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.util.Map; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; public class TestFSLeafQueue extends FairSchedulerTestBase { private final static String ALLOC_FILE = new File(TEST_DIR, TestFSLeafQueue.class.getName() + ".xml").getAbsolutePath(); private Resource maxResource = Resources.createResource(1024 * 8); + private static final float MAX_AM_SHARE = 0.5f; + private static final String CUSTOM_RESOURCE = "test1"; @Before public void setup() throws IOException { @@ -105,6 +113,8 @@ public void test() throws Exception { PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); out.println(""); + out.println("" + MAX_AM_SHARE + + ""); out.println(""); out.println(""); out.println(""); @@ -221,4 +231,128 @@ public void run() { assertTrue("Test failed with exception(s)" + exceptions, exceptions.isEmpty()); } + + @Test + public void testCanRunAppAMReturnsTrue() { + conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE); + ResourceUtils.resetResourceTypes(conf); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + Resource maxShare = Resource.newInstance(1024 * 8, 4, + ImmutableMap.of(CUSTOM_RESOURCE, 10L)); + + // Add a node to increase available memory and vcores in scheduler's + // root queue metrics + addNodeToScheduler(Resource.newInstance(4096, 10, + ImmutableMap.of(CUSTOM_RESOURCE, 25L))); + + FSLeafQueue queue = setupQueue(maxShare); + + //Min(availableMemory, maxShareMemory (maxResourceOverridden)) + // --> Min(4096, 8192) = 4096 + //Min(availableVCores, maxShareVCores (maxResourceOverridden)) + // --> Min(10, 4) = 4 + //Min(available test1, maxShare test1 (maxResourceOverridden)) + // --> Min(25, 10) = 10 + //MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE + // --> 2048 MB memory, 2 vcores, 5 test1 + Resource expectedAMShare = Resource.newInstance(2048, 2, + ImmutableMap.of(CUSTOM_RESOURCE, 5L)); + + Resource appAMResource = Resource.newInstance(2048, 2, + ImmutableMap.of(CUSTOM_RESOURCE, 3L)); + + Map customResourceValues = + verifyQueueMetricsForCustomResources(queue); + + boolean result = queue.canRunAppAM(appAMResource); + assertTrue("AM should have been allocated!", result); + + verifyAMShare(queue, expectedAMShare, customResourceValues); + } + + private FSLeafQueue setupQueue(Resource maxShare) { + String queueName = "root.queue1"; + FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null); + schedulable.setMaxShare(new ConfigurableResource(maxShare)); + schedulable.setMaxAMShare(MAX_AM_SHARE); + return schedulable; + } + + @Test + public void testCanRunAppAMReturnsFalse() { + conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE); + ResourceUtils.resetResourceTypes(conf); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + Resource maxShare = Resource.newInstance(1024 * 8, 4, + ImmutableMap.of(CUSTOM_RESOURCE, 10L)); + + // Add a node to increase available memory and vcores in scheduler's + // root queue metrics + addNodeToScheduler(Resource.newInstance(4096, 10, + ImmutableMap.of(CUSTOM_RESOURCE, 25L))); + + FSLeafQueue queue = setupQueue(maxShare); + + //Min(availableMemory, maxShareMemory (maxResourceOverridden)) + // --> Min(4096, 8192) = 4096 + //Min(availableVCores, maxShareVCores (maxResourceOverridden)) + // --> Min(10, 4) = 4 + //Min(available test1, maxShare test1 (maxResourceOverridden)) + // --> Min(25, 10) = 10 + //MaxAMResource: (4096 MB memory, 4 vcores, 10 test1) * MAX_AM_SHARE + // --> 2048 MB memory, 2 vcores, 5 test1 + Resource expectedAMShare = Resource.newInstance(2048, 2, + ImmutableMap.of(CUSTOM_RESOURCE, 5L)); + + Resource appAMResource = Resource.newInstance(2048, 2, + ImmutableMap.of(CUSTOM_RESOURCE, 6L)); + + Map customResourceValues = + verifyQueueMetricsForCustomResources(queue); + + boolean result = queue.canRunAppAM(appAMResource); + assertFalse("AM should not have been allocated!", result); + + verifyAMShare(queue, expectedAMShare, customResourceValues); + } + + private void addNodeToScheduler(Resource node1Resource) { + RMNode node1 = MockNodes.newNodeInfo(0, node1Resource, 1, "127.0.0.2"); + scheduler.handle(new NodeAddedSchedulerEvent(node1)); + } + + private void verifyAMShare(FSLeafQueue schedulable, + Resource expectedAMShare, Map customResourceValues) { + Resource actualAMShare = Resource.newInstance( + schedulable.getMetrics().getMaxAMShareMB(), + schedulable.getMetrics().getMaxAMShareVCores(), customResourceValues); + long customResourceValue = + actualAMShare.getResourceValue(CUSTOM_RESOURCE); + + //make sure to verify custom resource value explicitly! + assertEquals(5L, customResourceValue); + assertEquals("AM share is not the expected!", expectedAMShare, + actualAMShare); + } + + private Map verifyQueueMetricsForCustomResources( + FSLeafQueue schedulable) { + QueueMetricsCustomResource maxAMShareCustomResources = + schedulable.getMetrics().getCustomResources().getMaxAMShare(); + Map customResourceValues = maxAMShareCustomResources + .getValues(); + assertNotNull("Queue metrics for custom resources should not be null!", + maxAMShareCustomResources); + assertNotNull("Queue metrics for custom resources resource values " + + "should not be null!", customResourceValues); + return customResourceValues; + } }