YARN-9290. Invalid SchedulingRequest not rejected in Scheduler PlacementConstraintsHandler. Contributed by Prabhu Joseph

This commit is contained in:
Szilard Nemeth 2019-11-26 22:04:07 +01:00
parent 828ab400ee
commit ef950b0863
16 changed files with 125 additions and 12 deletions

View File

@ -357,6 +357,9 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
response.setContainersFromPreviousAttempts( response.setContainersFromPreviousAttempts(
allocation.getPreviousAttemptContainers()); allocation.getPreviousAttemptContainers());
response.setRejectedSchedulingRequests(allocation.getRejectedRequest());
} }
private void handleInvalidResourceException(InvalidResourceRequestException e, private void handleInvalidResourceException(InvalidResourceRequestException e,

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
public class Allocation { public class Allocation {
@ -40,7 +41,7 @@ public class Allocation {
final List<Container> demotedContainers; final List<Container> demotedContainers;
private final List<Container> previousAttemptContainers; private final List<Container> previousAttemptContainers;
private Resource resourceLimit; private Resource resourceLimit;
private List<RejectedSchedulingRequest> rejectedRequest;
public Allocation(List<Container> containers, Resource resourceLimit, public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers, Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
@ -52,17 +53,17 @@ public class Allocation {
public Allocation(List<Container> containers, Resource resourceLimit, public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers, Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) { List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) {
this(containers, resourceLimit,strictContainers, fungibleContainers, this(containers, resourceLimit, strictContainers, fungibleContainers,
fungibleResources, nmTokens, null, null, null, null, null); fungibleResources, nmTokens, null, null, null, null, null, null);
} }
public Allocation(List<Container> containers, Resource resourceLimit, public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers, Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens, List<ResourceRequest> fungibleResources, List<NMToken> nmTokens,
List<Container> increasedContainers, List<Container> decreasedContainer) { List<Container> increasedContainers, List<Container> decreasedContainer) {
this(containers, resourceLimit,strictContainers, fungibleContainers, this(containers, resourceLimit, strictContainers, fungibleContainers,
fungibleResources, nmTokens, increasedContainers, decreasedContainer, fungibleResources, nmTokens, increasedContainers, decreasedContainer,
null, null, null); null, null, null, null);
} }
public Allocation(List<Container> containers, Resource resourceLimit, public Allocation(List<Container> containers, Resource resourceLimit,
@ -70,7 +71,8 @@ public class Allocation {
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens, List<ResourceRequest> fungibleResources, List<NMToken> nmTokens,
List<Container> increasedContainers, List<Container> decreasedContainer, List<Container> increasedContainers, List<Container> decreasedContainer,
List<Container> promotedContainers, List<Container> demotedContainer, List<Container> promotedContainers, List<Container> demotedContainer,
List<Container> previousAttemptContainers) { List<Container> previousAttemptContainers, List<RejectedSchedulingRequest>
rejectedRequest) {
this.containers = containers; this.containers = containers;
this.resourceLimit = resourceLimit; this.resourceLimit = resourceLimit;
this.strictContainers = strictContainers; this.strictContainers = strictContainers;
@ -82,6 +84,7 @@ public class Allocation {
this.promotedContainers = promotedContainers; this.promotedContainers = promotedContainers;
this.demotedContainers = demotedContainer; this.demotedContainers = demotedContainer;
this.previousAttemptContainers = previousAttemptContainers; this.previousAttemptContainers = previousAttemptContainers;
this.rejectedRequest = rejectedRequest;
} }
public List<Container> getContainers() { public List<Container> getContainers() {
@ -128,6 +131,10 @@ public class Allocation {
return previousAttemptContainers; return previousAttemptContainers;
} }
public List<RejectedSchedulingRequest> getRejectedRequest() {
return rejectedRequest;
}
@VisibleForTesting @VisibleForTesting
public void setResourceLimit(Resource resource) { public void setResourceLimit(Resource resource) {
this.resourceLimit = resource; this.resourceLimit = resource;

View File

@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -42,7 +43,10 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.RejectionReason;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -98,6 +102,7 @@ public class AppSchedulingInfo {
public final ContainerUpdateContext updateContext; public final ContainerUpdateContext updateContext;
private final Map<String, String> applicationSchedulingEnvs = new HashMap<>(); private final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
private final RMContext rmContext; private final RMContext rmContext;
private final int retryAttempts;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
Queue queue, AbstractUsersManager abstractUsersManager, long epoch, Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
@ -113,6 +118,9 @@ public class AppSchedulingInfo {
this.appResourceUsage = appResourceUsage; this.appResourceUsage = appResourceUsage;
this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs); this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs);
this.rmContext = rmContext; this.rmContext = rmContext;
this.retryAttempts = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_PLACEMENT_CONSTRAINTS_RETRY_ATTEMPTS);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
updateContext = new ContainerUpdateContext(this); updateContext = new ContainerUpdateContext(this);
@ -496,6 +504,20 @@ public class AppSchedulingInfo {
return ret; return ret;
} }
public List<RejectedSchedulingRequest> getRejectedRequest() {
this.readLock.lock();
try {
return schedulerKeyToAppPlacementAllocator.values().stream()
.filter(ap -> ap.getPlacementAttempt() >= retryAttempts)
.map(ap -> RejectedSchedulingRequest.newInstance(
RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
ap.getSchedulingRequest()))
.collect(Collectors.toList());
} finally {
this.readLock.unlock();
}
}
public PendingAsk getNextPendingAsk() { public PendingAsk getNextPendingAsk() {
readLock.lock(); readLock.lock();
try { try {
@ -780,8 +802,8 @@ public class AppSchedulingInfo {
try { try {
AppPlacementAllocator ap = AppPlacementAllocator ap =
schedulerKeyToAppPlacementAllocator.get(schedulerKey); schedulerKeyToAppPlacementAllocator.get(schedulerKey);
return (ap != null) && ap.precheckNode(schedulerNode, return (ap != null) && (ap.getPlacementAttempt() < retryAttempts) &&
schedulingMode, dcOpt); ap.precheckNode(schedulerNode, schedulingMode, dcOpt);
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
} }

View File

@ -821,7 +821,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
currentContPreemption, Collections.singletonList(rr), updatedNMTokens, currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
newlyIncreasedContainers, newlyDecreasedContainers, newlyIncreasedContainers, newlyDecreasedContainers,
newlyPromotedContainers, newlyDemotedContainers, newlyPromotedContainers, newlyDemotedContainers,
previousAttemptContainers); previousAttemptContainers, appSchedulingInfo.getRejectedRequest());
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }

View File

@ -951,7 +951,7 @@ public class FairScheduler extends
updatedNMTokens, null, null, updatedNMTokens, null, null,
application.pullNewlyPromotedContainers(), application.pullNewlyPromotedContainers(),
application.pullNewlyDemotedContainers(), application.pullNewlyDemotedContainers(),
previousAttemptContainers); previousAttemptContainers, null);
} }
private List<MaxResourceValidationResult> validateResourceRequests( private List<MaxResourceValidationResult> validateResourceRequests(

View File

@ -34,6 +34,7 @@ import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* <p> * <p>
@ -57,6 +58,7 @@ public abstract class AppPlacementAllocator<N extends SchedulerNode> {
protected AppSchedulingInfo appSchedulingInfo; protected AppSchedulingInfo appSchedulingInfo;
protected SchedulerRequestKey schedulerRequestKey; protected SchedulerRequestKey schedulerRequestKey;
protected RMContext rmContext; protected RMContext rmContext;
private AtomicInteger placementAttempt = new AtomicInteger(0);
/** /**
* Get iterator of preferred node depends on requirement and/or availability. * Get iterator of preferred node depends on requirement and/or availability.
@ -205,4 +207,12 @@ public abstract class AppPlacementAllocator<N extends SchedulerNode> {
* @return SchedulingRequest * @return SchedulingRequest
*/ */
public abstract SchedulingRequest getSchedulingRequest(); public abstract SchedulingRequest getSchedulingRequest();
public int getPlacementAttempt() {
return placementAttempt.get();
}
public void incrementPlacementAttempt() {
placementAttempt.getAndIncrement();
}
} }

View File

@ -363,6 +363,7 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
placementConstraintManager, allocationTagsManager, dcOpt); placementConstraintManager, allocationTagsManager, dcOpt);
} catch (InvalidAllocationTagsQueryException e) { } catch (InvalidAllocationTagsQueryException e) {
LOG.warn("Failed to query node cardinality:", e); LOG.warn("Failed to query node cardinality:", e);
this.incrementPlacementAttempt();
return false; return false;
} }
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
@ -44,10 +45,12 @@ public class TestAppSchedulingInfo {
ApplicationAttemptId.newInstance(appIdImpl, 1); ApplicationAttemptId.newInstance(appIdImpl, 1);
FSLeafQueue queue = mock(FSLeafQueue.class); FSLeafQueue queue = mock(FSLeafQueue.class);
RMContext rmContext = mock(RMContext.class);
doReturn("test").when(queue).getQueueName(); doReturn("test").when(queue).getQueueName();
doReturn(new YarnConfiguration()).when(rmContext).getYarnConfiguration();
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId, AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId,
"test", queue, null, 0, new ResourceUsage(), "test", queue, null, 0, new ResourceUsage(),
new HashMap<String, String>(), null); new HashMap<String, String>(), rmContext);
appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(), appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
new ArrayList<String>()); new ArrayList<String>());
@ -117,9 +120,11 @@ public class TestAppSchedulingInfo {
Queue queue = mock(Queue.class); Queue queue = mock(Queue.class);
doReturn(mock(QueueMetrics.class)).when(queue).getMetrics(); doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
RMContext rmContext = mock(RMContext.class);
doReturn(new YarnConfiguration()).when(rmContext).getYarnConfiguration();
AppSchedulingInfo info = new AppSchedulingInfo( AppSchedulingInfo info = new AppSchedulingInfo(
appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0, appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
new ResourceUsage(), new HashMap<>(), mock(RMContext.class)); new ResourceUsage(), new HashMap<>(), rmContext);
Assert.assertEquals(0, info.getSchedulerKeys().size()); Assert.assertEquals(0, info.getSchedulerKeys().size());
Priority pri1 = Priority.newInstance(1); Priority pri1 = Priority.newInstance(1);

View File

@ -75,6 +75,7 @@ public class TestSchedulerApplicationAttempt {
ApplicationAttemptId appAttId = createAppAttemptId(0, 0); ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L); when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, queue1, queue1.getAbstractUsersManager(), rmContext); user, queue1, queue1.getAbstractUsersManager(), rmContext);
@ -121,6 +122,7 @@ public class TestSchedulerApplicationAttempt {
ApplicationAttemptId appAttId = createAppAttemptId(0, 0); ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L); when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, oldQueue, oldQueue.getAbstractUsersManager(), rmContext); user, oldQueue, oldQueue.getAbstractUsersManager(), rmContext);
oldMetrics.submitApp(user); oldMetrics.submitApp(user);
@ -242,6 +244,7 @@ public class TestSchedulerApplicationAttempt {
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L); when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getScheduler()).thenReturn(scheduler); when(rmContext.getScheduler()).thenReturn(scheduler);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
final String user = "user1"; final String user = "user1";
Queue queue = createQueue("test", null); Queue queue = createQueue("test", null);
@ -300,6 +303,7 @@ public class TestSchedulerApplicationAttempt {
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L); when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getScheduler()).thenReturn(scheduler); when(rmContext.getScheduler()).thenReturn(scheduler);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
final String user = "user1"; final String user = "user1";
Queue queue = createQueue("test", null); Queue queue = createQueue("test", null);
@ -322,6 +326,7 @@ public class TestSchedulerApplicationAttempt {
Queue queue = createQueue("test", null); Queue queue = createQueue("test", null);
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L); when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt( SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(
attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext); attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext);
Priority priority = Priority.newInstance(1); Priority priority = Priority.newInstance(1);
@ -347,6 +352,7 @@ public class TestSchedulerApplicationAttempt {
Queue queue = createQueue("test", null); Queue queue = createQueue("test", null);
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L); when(rmContext.getEpoch()).thenReturn(3L);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt( SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(
attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext); attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext);

View File

@ -4130,6 +4130,7 @@ public class TestLeafQueue {
RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class); RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
when(nlm.getResourceByLabel(any(), any())).thenReturn(res); when(nlm.getResourceByLabel(any(), any())).thenReturn(res);
when(rmContext.getNodeLabelManager()).thenReturn(nlm); when(rmContext.getNodeLabelManager()).thenReturn(nlm);
when(rmContext.getYarnConfiguration()).thenReturn(csConf);
// Queue "test" consumes 100% of the cluster, so its capacity and absolute // Queue "test" consumes 100% of the cluster, so its capacity and absolute
// capacity are both 1.0f. // capacity are both 1.0f.

View File

@ -448,6 +448,59 @@ public class TestSchedulingRequestContainerAllocation {
rm1.close(); rm1.close();
} }
@Test(timeout = 30000L)
public void testInvalidSchedulingRequest() throws Exception {
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(conf);
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
// 4 NMs.
MockNM[] nms = new MockNM[4];
RMNode[] rmNodes = new RMNode[4];
for (int i = 0; i < 4; i++) {
nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
}
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
// Constraint with Invalid Allocation Tag Namespace
PlacementConstraint constraint = targetNotIn("node",
allocationTagWithNamespace("invalid", "t1")).build();
SchedulingRequest sc = SchedulingRequest
.newInstance(1, Priority.newInstance(1),
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
ImmutableSet.of("t1"),
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)),
constraint);
AllocateRequest request = AllocateRequest.newBuilder()
.schedulingRequests(ImmutableList.of(sc)).build();
am1.allocate(request);
try {
GenericTestUtils.waitFor(() -> {
try {
doNodeHeartbeat(nms);
AllocateResponse response = am1.schedule();
return response.getRejectedSchedulingRequests().size() == 1;
} catch (Exception e) {
return false;
}
}, 500, 20000);
} catch (Exception e) {
Assert.fail("Failed to reject invalid scheduling request");
}
}
private static void doNodeHeartbeat(MockNM... nms) throws Exception { private static void doNodeHeartbeat(MockNM... nms) throws Exception {
for (MockNM nm : nms) { for (MockNM nm : nms) {
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);

View File

@ -121,6 +121,7 @@ public class TestUtils {
} }
}); });
rmContext.setYarnConfiguration(conf);
rmContext.setNodeLabelManager(nlm); rmContext.setNodeLabelManager(nlm);
rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class)); rmContext.setSystemMetricsPublisher(mock(SystemMetricsPublisher.class));
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class)); rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));

View File

@ -375,6 +375,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
Mockito.when(rmApp.getApplicationSubmissionContext()) Mockito.when(rmApp.getApplicationSubmissionContext())
.thenReturn(appContext); .thenReturn(appContext);
Mockito.when(rmContext.getRMApps()).thenReturn(rmApps); Mockito.when(rmContext.getRMApps()).thenReturn(rmApps);
Mockito.when(rmContext.getYarnConfiguration()).thenReturn(conf);
FSAppAttempt schedulerApp = FSAppAttempt schedulerApp =
new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue, new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue,
null, rmContext); null, rmContext);

View File

@ -52,6 +52,7 @@ public class TestMaxRunningAppsEnforcer {
rmContext = mock(RMContext.class); rmContext = mock(RMContext.class);
when(rmContext.getQueuePlacementManager()).thenReturn(placementManager); when(rmContext.getQueuePlacementManager()).thenReturn(placementManager);
when(rmContext.getEpoch()).thenReturn(0L); when(rmContext.getEpoch()).thenReturn(0L);
when(rmContext.getYarnConfiguration()).thenReturn(conf);
clock = new ControlledClock(); clock = new ControlledClock();
scheduler = mock(FairScheduler.class); scheduler = mock(FairScheduler.class);
when(scheduler.getConf()).thenReturn(conf); when(scheduler.getConf()).thenReturn(conf);

View File

@ -571,6 +571,7 @@ public class TestQueueManager {
ActiveUsersManager activeUsersManager = ActiveUsersManager activeUsersManager =
Mockito.mock(ActiveUsersManager.class); Mockito.mock(ActiveUsersManager.class);
RMContext rmContext = Mockito.mock(RMContext.class); RMContext rmContext = Mockito.mock(RMContext.class);
doReturn(scheduler.getConfig()).when(rmContext).getYarnConfiguration();
// the appAttempt is created // the appAttempt is created
// removeEmptyDynamicQueues() should not remove the queue // removeEmptyDynamicQueues() should not remove the queue

View File

@ -193,6 +193,7 @@ public class TestFifoScheduler {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
((RMContextImpl) rmContext).setScheduler(scheduler); ((RMContextImpl) rmContext).setScheduler(scheduler);
((RMContextImpl) rmContext).setYarnConfiguration(conf);
scheduler.setRMContext(rmContext); scheduler.setRMContext(rmContext);
scheduler.init(conf); scheduler.init(conf);
scheduler.start(); scheduler.start();