diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index afa468bca8..99deb1abf3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -179,11 +179,22 @@ private ContainerAllocation preCheckForNodeCandidateSet( // This is to make sure non-partitioned-resource-request will prefer // to be allocated to non-partitioned nodes int missedNonPartitionedRequestSchedulingOpportunity = 0; + AppPlacementAllocator appPlacementAllocator = + appInfo.getAppPlacementAllocator(schedulerKey); + if (null == appPlacementAllocator){ + // This is possible when #pending resource decreased by a different + // thread. + ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation( + activitiesManager, node, application, priority, + ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST); + return ContainerAllocation.PRIORITY_SKIPPED; + } + String requestPartition = + appPlacementAllocator.getPrimaryRequestedNodePartition(); + // Only do this when request associated with given scheduler key accepts // NO_LABEL under RESPECT_EXCLUSIVITY mode - if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, - appInfo.getAppPlacementAllocator(schedulerKey) - .getPrimaryRequestedNodePartition())) { + if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, requestPartition)) { missedNonPartitionedRequestSchedulingOpportunity = application.addMissedNonPartitionedRequestSchedulingOpportunity( schedulerKey); @@ -261,12 +272,9 @@ ContainerAllocation tryAllocateOnNode(Resource clusterResource, return result; } - public float getLocalityWaitFactor( - SchedulerRequestKey schedulerKey, int clusterNodes) { + public float getLocalityWaitFactor(int uniqAsks, int clusterNodes) { // Estimate: Required unique resources (i.e. hosts + racks) - int requiredResources = Math.max( - application.getAppPlacementAllocator(schedulerKey) - .getUniqueLocationAsks() - 1, 0); + int requiredResources = Math.max(uniqAsks - 1, 0); // waitFactor can't be more than '1' // i.e. no point skipping more than clustersize opportunities @@ -296,10 +304,16 @@ private boolean canAssign(SchedulerRequestKey schedulerKey, if (rmContext.getScheduler().getNumClusterNodes() == 0) { return false; } + + int uniqLocationAsks = 0; + AppPlacementAllocator appPlacementAllocator = + application.getAppPlacementAllocator(schedulerKey); + if (appPlacementAllocator != null) { + uniqLocationAsks = appPlacementAllocator.getUniqueLocationAsks(); + } // If we have only ANY requests for this schedulerKey, we should not // delay its scheduling. - if (application.getAppPlacementAllocator(schedulerKey) - .getUniqueLocationAsks() == 1) { + if (uniqLocationAsks == 1) { return true; } @@ -313,7 +327,7 @@ private boolean canAssign(SchedulerRequestKey schedulerKey, } else { long requiredContainers = application.getOutstandingAsksCount(schedulerKey); - float localityWaitFactor = getLocalityWaitFactor(schedulerKey, + float localityWaitFactor = getLocalityWaitFactor(uniqLocationAsks, rmContext.getScheduler().getNumClusterNodes()); // Cap the delay by the number of nodes in the cluster. return (Math.min(rmContext.getScheduler().getNumClusterNodes(), @@ -806,6 +820,12 @@ private ContainerAllocation allocate(Resource clusterResource, application.getAppSchedulingInfo().getAppPlacementAllocator( schedulerKey); + // This could be null when #pending request decreased by another thread. + if (schedulingPS == null) { + return new ContainerAllocation(reservedContainer, null, + AllocationState.QUEUE_SKIPPED); + } + result = ContainerAllocation.PRIORITY_SKIPPED; Iterator iter = schedulingPS.getPreferredNodeIterator(