diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7ed46109b3..28c96de829 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -284,6 +284,10 @@ private static void addDeprecatedKeys() { public static final boolean DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME = false; + /** Configure default application placement allocator. */ + public static final String APPLICATION_PLACEMENT_TYPE_CLASS = + YARN_PREFIX + "scheduler.app-placement-allocator.class"; + /** Configured scheduler queue placement rules. */ public static final String QUEUE_PLACEMENT_RULES = YARN_PREFIX + "scheduler.queue-placement-rules"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 2b2f832465..203709b4cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,6 +106,8 @@ public class AppSchedulingInfo { private final int retryAttempts; private boolean unmanagedAM; + private final String defaultResourceRequestAppPlacementType; + public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, AbstractUsersManager abstractUsersManager, long epoch, ResourceUsage appResourceUsage, @@ -129,6 +132,31 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, updateContext = new ContainerUpdateContext(this); readLock = lock.readLock(); writeLock = lock.writeLock(); + + this.defaultResourceRequestAppPlacementType = + getDefaultResourceRequestAppPlacementType(); + } + + /** + * Set default App Placement Allocator. + * + * @return app placement class. + */ + public String getDefaultResourceRequestAppPlacementType() { + if (this.rmContext != null + && this.rmContext.getYarnConfiguration() != null) { + + String appPlacementClass = applicationSchedulingEnvs.get( + ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS); + if (null != appPlacementClass) { + return appPlacementClass; + } else { + Configuration conf = rmContext.getYarnConfiguration(); + return conf.get( + YarnConfiguration.APPLICATION_PLACEMENT_TYPE_CLASS); + } + } + return null; } public ApplicationId getApplicationId() { @@ -331,8 +359,7 @@ private boolean internalAddResourceRequests( SchedulerRequestKey schedulerRequestKey = entry.getKey(); AppPlacementAllocator appPlacementAllocator = getAndAddAppPlacementAllocatorIfNotExist(schedulerRequestKey, - applicationSchedulingEnvs.get( - ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS)); + defaultResourceRequestAppPlacementType); // Update AppPlacementAllocator PendingAskUpdateResult pendingAmountChanges = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java index 391649d409..a7c7ce8590 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java @@ -20,9 +20,11 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.*; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; @@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.junit.Assert; import org.junit.Test; @@ -175,4 +178,52 @@ public void testSchedulerKeyAccounting() { info.updateResourceRequests(reqs, false); Assert.assertEquals(0, info.getSchedulerKeys().size()); } + + @Test + public void testApplicationPlacementType() { + String DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = + LocalityAppPlacementAllocator.class.getName(); + Configuration conf = new Configuration(); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getYarnConfiguration()).thenReturn(conf); + ApplicationId appIdImpl = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appIdImpl, 1); + Queue queue = mock(Queue.class); + AppSchedulingInfo info = new AppSchedulingInfo(appAttemptId, "test", queue, + mock(ActiveUsersManager.class), 0, new ResourceUsage(), new HashMap<>(), + rmContext, false); + Assert.assertEquals(info.getApplicationSchedulingEnvs(), new HashMap<>()); + // This should return null as nothing is set in the conf. + Assert.assertNull(info.getDefaultResourceRequestAppPlacementType()); + conf = new Configuration(); + conf.set(YarnConfiguration.APPLICATION_PLACEMENT_TYPE_CLASS, + DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS); + when(rmContext.getYarnConfiguration()).thenReturn(conf); + info = new AppSchedulingInfo(appAttemptId, "test", queue, + mock(ActiveUsersManager.class), 0, new ResourceUsage(), new HashMap<>(), + rmContext, false); + Assert.assertEquals(info.getDefaultResourceRequestAppPlacementType(), + DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS); + } + + @Test + public void testApplicationPlacementTypeNotConfigured() { + Configuration conf = new Configuration(); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getYarnConfiguration()).thenReturn(conf); + ApplicationId appIdImpl = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appIdImpl, 1); + Queue queue = mock(Queue.class); + HashMap applicationSchedulingEnvs = new HashMap<>(); + applicationSchedulingEnvs.put("APPLICATION_PLACEMENT_TYPE_CLASS", + LocalityAppPlacementAllocator.class.getName()); + AppSchedulingInfo info = new AppSchedulingInfo(appAttemptId, "test", queue, + mock(ActiveUsersManager.class), 0, new ResourceUsage(), + applicationSchedulingEnvs, rmContext, false); + // This should be set from applicationSchedulingEnvs + Assert.assertEquals(info.getDefaultResourceRequestAppPlacementType(), + LocalityAppPlacementAllocator.class.getName()); + } }