YARN-10963. Split TestCapacityScheduler by test categories. Contributed by Tamas Domok

This commit is contained in:
Szilard Nemeth 2021-12-16 23:39:18 +01:00
parent a9a5830f31
commit aec9cdb467
6 changed files with 3007 additions and 2779 deletions

View File

@ -18,7 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.util.HashMap;
import java.util.Map;
@ -50,4 +54,52 @@ public static Configuration createBasicCSConfiguration() {
return createConfiguration(conf);
}
public static void setMinAllocMb(Configuration conf, int minAllocMb) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
minAllocMb);
}
public static void setMaxAllocMb(Configuration conf, int maxAllocMb) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
maxAllocMb);
}
public static void setMaxAllocMb(CapacitySchedulerConfiguration conf,
String queueName, int maxAllocMb) {
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ MAXIMUM_ALLOCATION_MB;
conf.setInt(propName, maxAllocMb);
}
public static void setMinAllocVcores(Configuration conf, int minAllocVcores) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
minAllocVcores);
}
public static void setMaxAllocVcores(Configuration conf, int maxAllocVcores) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
maxAllocVcores);
}
public static void setMaxAllocVcores(CapacitySchedulerConfiguration conf,
String queueName, int maxAllocVcores) {
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
conf.setInt(propName, maxAllocVcores);
}
public static void setMaxAllocation(CapacitySchedulerConfiguration conf,
String queueName, String maxAllocation) {
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ MAXIMUM_ALLOCATION;
conf.set(propName, maxAllocation);
}
public static void unsetMaxAllocation(CapacitySchedulerConfiguration conf,
String queueName) {
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ MAXIMUM_ALLOCATION;
conf.unset(propName);
}
}

View File

@ -18,14 +18,48 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;
import java.io.IOException;
import java.util.Set;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public final class CapacitySchedulerTestUtilities {
public static final int GB = 1024;
@ -69,4 +103,119 @@ public static void waitforNMRegistered(ResourceScheduler scheduler, int nodecoun
}
}
}
public static ResourceManager createResourceManager() throws Exception {
ResourceUtils.resetResourceTypes(new Configuration());
DefaultMetricsSystem.setMiniClusterMode(true);
ResourceManager resourceManager = new ResourceManager() {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}
};
CapacitySchedulerConfiguration csConf
= new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class, ResourceScheduler.class);
resourceManager.init(conf);
resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
return resourceManager;
}
public static RMContext createMockRMContext() {
RMContext mockContext = mock(RMContext.class);
when(mockContext.getConfigurationProvider()).thenReturn(
new LocalConfigurationProvider());
return mockContext;
}
public static void stopResourceManager(ResourceManager resourceManager) throws Exception {
if (resourceManager != null) {
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.shutdown();
resourceManager.stop();
}
}
public static ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs,
int clusterTs, int appId, String queue,
String user) {
ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, appId);
RMAppAttemptMetrics attemptMetric1 =
new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
RMAppImpl app1 = mock(RMAppImpl.class);
when(app1.getApplicationId()).thenReturn(appId1);
RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt1.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
rm.getRMContext().getRMApps().put(appId1, app1);
SchedulerEvent addAppEvent1 =
new AppAddedSchedulerEvent(appId1, queue, user);
cs.handle(addAppEvent1);
SchedulerEvent addAttemptEvent1 =
new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
cs.handle(addAttemptEvent1);
return appAttemptId1;
}
public static MockRM setUpMove() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
return setUpMove(conf);
}
public static MockRM setUpMove(Configuration config) {
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
return rm;
}
public static void nodeUpdate(ResourceManager rm, NodeManager nm) {
RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
rm.getResourceScheduler().handle(nodeUpdate);
}
public static NodeManager registerNode(ResourceManager rm, String hostName,
int containerManagerPort, int httpPort, String rackName,
Resource capability, NodeStatus nodeStatus)
throws IOException, YarnException {
NodeManager nm = new NodeManager(hostName,
containerManagerPort, httpPort, rackName, capability, rm, nodeStatus);
NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes()
.get(nm.getNodeId()));
rm.getResourceScheduler().handle(nodeAddEvent1);
return nm;
}
public static void checkApplicationResourceUsage(int expected, Application application) {
Assert.assertEquals(expected, application.getUsedResources().getMemorySize());
}
public static void checkNodeResourceUsage(int expected, NodeManager node) {
Assert.assertEquals(expected, node.getUsed().getMemorySize());
node.checkResourceUsage();
}
}

View File

@ -0,0 +1,387 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.GB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.appHelper;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.nodeUpdate;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.registerNode;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.NULL_UPDATE_REQUESTS;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestCapacitySchedulerNodes {
private ResourceManager resourceManager = null;
@Before
public void setUp() throws Exception {
resourceManager = createResourceManager();
}
@After
public void tearDown() throws Exception {
stopResourceManager(resourceManager);
}
@Test
public void testReconnectedNode() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
cs.init(csConf);
cs.start();
cs.reinitialize(csConf, new RMContextImpl(null, null, null, null,
null, null, new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null));
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
cs.handle(new NodeAddedSchedulerEvent(n1));
cs.handle(new NodeAddedSchedulerEvent(n2));
Assert.assertEquals(6 * GB, cs.getClusterResource().getMemorySize());
// reconnect n1 with downgraded memory
n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1);
cs.handle(new NodeRemovedSchedulerEvent(n1));
cs.handle(new NodeAddedSchedulerEvent(n1));
Assert.assertEquals(4 * GB, cs.getClusterResource().getMemorySize());
cs.stop();
}
@Test
public void testBlackListNodes() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
String host = "127.0.0.1";
RMNode node =
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
cs.handle(new NodeAddedSchedulerEvent(node));
ApplicationAttemptId appAttemptId = appHelper(rm, cs, 100, 1, "default", "user");
// Verify the blacklist can be updated independent of requesting containers
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
Collections.<ContainerId>emptyList(),
Collections.singletonList(host), null, NULL_UPDATE_REQUESTS);
Assert.assertTrue(cs.getApplicationAttempt(appAttemptId)
.isPlaceBlacklisted(host));
cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), null,
Collections.<ContainerId>emptyList(), null,
Collections.singletonList(host), NULL_UPDATE_REQUESTS);
Assert.assertFalse(cs.getApplicationAttempt(appAttemptId)
.isPlaceBlacklisted(host));
rm.stop();
}
@Test
public void testNumClusterNodes() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(conf);
RMContext rmContext = TestUtils.getMockRMContext();
cs.setRMContext(rmContext);
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
cs.init(csConf);
cs.start();
assertEquals(0, cs.getNumClusterNodes());
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2);
cs.handle(new NodeAddedSchedulerEvent(n1));
cs.handle(new NodeAddedSchedulerEvent(n2));
assertEquals(2, cs.getNumClusterNodes());
cs.handle(new NodeRemovedSchedulerEvent(n1));
assertEquals(1, cs.getNumClusterNodes());
cs.handle(new NodeAddedSchedulerEvent(n1));
assertEquals(2, cs.getNumClusterNodes());
cs.handle(new NodeRemovedSchedulerEvent(n2));
cs.handle(new NodeRemovedSchedulerEvent(n1));
assertEquals(0, cs.getNumClusterNodes());
cs.stop();
}
@Test
public void testDefaultNodeLabelExpressionQueueConfig() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
conf.setDefaultNodeLabelExpression("root.a", " x");
conf.setDefaultNodeLabelExpression("root.b", " y ");
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
cs.init(conf);
cs.start();
QueueInfo queueInfoA = cs.getQueueInfo("a", true, false);
Assert.assertEquals("Queue Name should be a", "a",
queueInfoA.getQueueName());
Assert.assertEquals("Queue Path should be root.a", "root.a",
queueInfoA.getQueuePath());
Assert.assertEquals("Default Node Label Expression should be x", "x",
queueInfoA.getDefaultNodeLabelExpression());
QueueInfo queueInfoB = cs.getQueueInfo("b", true, false);
Assert.assertEquals("Queue Name should be b", "b",
queueInfoB.getQueueName());
Assert.assertEquals("Queue Path should be root.b", "root.b",
queueInfoB.getQueuePath());
Assert.assertEquals("Default Node Label Expression should be y", "y",
queueInfoB.getDefaultNodeLabelExpression());
}
@Test
public void testRemovedNodeDecommissioningNode() throws Exception {
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register nodemanager
NodeManager nm = registerNode(resourceManager, "host_decom", 1234, 2345,
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
mockNodeStatus);
RMNode node =
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
// force remove the node to simulate race condition
((CapacityScheduler) resourceManager.getResourceScheduler()).getNodeTracker().
removeNode(nm.getNodeId());
// Kick off another heartbeat with the node state mocked to decommissioning
RMNode spyNode =
Mockito.spy(resourceManager.getRMContext().getRMNodes()
.get(nm.getNodeId()));
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
resourceManager.getResourceScheduler().handle(
new NodeUpdateSchedulerEvent(spyNode));
}
@Test
public void testResourceUpdateDecommissioningNode() throws Exception {
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
// to have 0 available resource
RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler<Event>() {
@Override
public void handle(Event event) {
if (event instanceof RMNodeResourceUpdateEvent) {
RMNodeResourceUpdateEvent resourceEvent =
(RMNodeResourceUpdateEvent) event;
resourceManager
.getResourceScheduler()
.getSchedulerNode(resourceEvent.getNodeId())
.updateTotalResource(resourceEvent.getResourceOption().getResource());
}
}
});
Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
((CapacityScheduler) resourceManager.getResourceScheduler())
.setRMContext(spyContext);
((AsyncDispatcher) mockDispatcher).start();
NodeStatus mockNodeStatus = createMockNodeStatus();
// Register node
String host0 = "host_0";
NodeManager nm0 = registerNode(resourceManager, host0, 1234, 2345,
NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4),
mockNodeStatus);
// ResourceRequest priorities
Priority priority0 = Priority.newInstance(0);
// Submit an application
Application application0 =
new Application("user_0", "a1", resourceManager);
application0.submit();
application0.addNodeManager(host0, 1234, nm0);
Resource capability00 = Resources.createResource(1 * GB, 1);
application0.addResourceRequestSpec(priority0, capability00);
Task task00 =
new Task(application0, priority0, new String[]{host0});
application0.addTask(task00);
// Send resource requests to the scheduler
application0.schedule();
nodeUpdate(resourceManager, nm0);
// Kick off another heartbeat with the node state mocked to decommissioning
// This should update the schedulernodes to have 0 available resource
RMNode spyNode =
Mockito.spy(resourceManager.getRMContext().getRMNodes()
.get(nm0.getNodeId()));
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
resourceManager.getResourceScheduler().handle(
new NodeUpdateSchedulerEvent(spyNode));
// Get allocations from the scheduler
application0.schedule();
// Check the used resource is 1 GB 1 core
Assert.assertEquals(1 * GB, nm0.getUsed().getMemorySize());
Resource usedResource =
resourceManager.getResourceScheduler()
.getSchedulerNode(nm0.getNodeId()).getAllocatedResource();
Assert.assertEquals("Used Resource Memory Size should be 1GB", 1 * GB,
usedResource.getMemorySize());
Assert.assertEquals("Used Resource Virtual Cores should be 1", 1,
usedResource.getVirtualCores());
// Check total resource of scheduler node is also changed to 1 GB 1 core
Resource totalResource =
resourceManager.getResourceScheduler()
.getSchedulerNode(nm0.getNodeId()).getTotalResource();
Assert.assertEquals("Total Resource Memory Size should be 1GB", 1 * GB,
totalResource.getMemorySize());
Assert.assertEquals("Total Resource Virtual Cores should be 1", 1,
totalResource.getVirtualCores());
// Check the available resource is 0/0
Resource availableResource =
resourceManager.getResourceScheduler()
.getSchedulerNode(nm0.getNodeId()).getUnallocatedResource();
Assert.assertEquals("Available Resource Memory Size should be 0", 0,
availableResource.getMemorySize());
Assert.assertEquals("Available Resource Memory Size should be 0", 0,
availableResource.getVirtualCores());
// Kick off another heartbeat where the RMNodeResourceUpdateEvent would
// be skipped for DECOMMISSIONING state since the total resource is
// already equal to used resource from the previous heartbeat.
when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
resourceManager.getResourceScheduler().handle(
new NodeUpdateSchedulerEvent(spyNode));
verify(mockDispatcher, times(4)).getEventHandler();
}
@Test
public void testSchedulingOnRemovedNode() throws Exception {
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
false);
MockRM rm = new MockRM(conf);
rm.start();
RMApp app = MockRMAppSubmitter.submitWithMemory(100, rm);
rm.drainEvents();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10240, 10);
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
//remove nm2 to keep am alive
MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10240, 10);
am.allocate(ResourceRequest.ANY, 2048, 1, null);
CapacityScheduler scheduler =
(CapacityScheduler) rm.getRMContext().getScheduler();
FiCaSchedulerNode node =
(FiCaSchedulerNode)
scheduler.getNodeTracker().getNode(nm2.getNodeId());
scheduler.handle(new NodeRemovedSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
// schedulerNode is removed, try allocate a container
scheduler.allocateContainersToNode(new SimpleCandidateNodeSet<>(node),
true);
AppAttemptRemovedSchedulerEvent appRemovedEvent1 =
new AppAttemptRemovedSchedulerEvent(
am.getApplicationAttemptId(),
RMAppAttemptState.FINISHED, false);
scheduler.handle(appRemovedEvent1);
rm.stop();
}
}

View File

@ -0,0 +1,888 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocMb;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocVcores;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocation;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.unsetMaxAllocation;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.checkQueueStructureCapacities;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ExpectedCapacities;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.getDefaultCapacities;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfWithoutChildrenOfB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithB1AsParentQueue;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB1;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createMockRMContext;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.createResourceManager;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
public class TestCapacitySchedulerQueues {
private static final Logger LOG =
LoggerFactory.getLogger(TestCapacitySchedulerQueues.class);
private ResourceManager resourceManager = null;
private RMContext mockContext;
@Before
public void setUp() throws Exception {
resourceManager = createResourceManager();
mockContext = createMockRMContext();
}
@After
public void tearDown() throws Exception {
stopResourceManager(resourceManager);
}
/**
* Test that parseQueue throws an exception when two leaf queues have the
* same name.
*
* @throws IOException
*/
@Test(expected = IOException.class)
public void testParseQueue() throws IOException {
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.init(conf);
cs.start();
conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[]{"b1"});
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null));
}
@Test
public void testRefreshQueues() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
cs.init(conf);
cs.start();
cs.reinitialize(conf, rmContext);
checkQueueStructureCapacities(cs);
conf.setCapacity(A, 80f);
conf.setCapacity(B, 20f);
cs.reinitialize(conf, mockContext);
checkQueueStructureCapacities(cs, getDefaultCapacities(80f / 100.0f, 20f / 100.0f));
cs.stop();
}
@Test
public void testRefreshQueuesWithNewQueue() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
cs.init(conf);
cs.start();
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null));
checkQueueStructureCapacities(cs);
// Add a new queue b4
final String b4 = B + ".b4";
final float b4Capacity = 10;
final float modifiedB3Capacity = B3_CAPACITY - b4Capacity;
try {
conf.setCapacity(A, 80f);
conf.setCapacity(B, 20f);
conf.setQueues(B, new String[]{"b1", "b2", "b3", "b4"});
conf.setCapacity(B1, B1_CAPACITY);
conf.setCapacity(B2, B2_CAPACITY);
conf.setCapacity(B3, modifiedB3Capacity);
conf.setCapacity(b4, b4Capacity);
cs.reinitialize(conf, mockContext);
final float capA = 80f / 100.0f;
final float capB = 20f / 100.0f;
Map<String, ExpectedCapacities> expectedCapacities =
getDefaultCapacities(capA, capB);
expectedCapacities.put(B3,
new ExpectedCapacities(modifiedB3Capacity / 100.0f, capB));
expectedCapacities.put(b4, new ExpectedCapacities(b4Capacity / 100.0f, capB));
checkQueueStructureCapacities(cs, expectedCapacities);
// Verify parent for B4
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueB = findQueue(rootQueue, B);
CSQueue queueB4 = findQueue(queueB, b4);
assertEquals(queueB, queueB4.getParent());
} finally {
cs.stop();
}
}
@Test
public void testRefreshQueuesMaxAllocationRefresh() throws Exception {
// queue refresh should not allow changing the maximum allocation setting
// per queue to be smaller than previous setting
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
cs.init(conf);
cs.start();
cs.reinitialize(conf, mockContext);
checkQueueStructureCapacities(cs);
assertEquals("max allocation in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max allocation for A1",
Resources.none(),
conf.getQueueMaximumAllocation(A1));
assertEquals("max allocation",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueA = findQueue(rootQueue, A);
CSQueue queueA1 = findQueue(queueA, A1);
assertEquals("queue max allocation", ((LeafQueue) queueA1)
.getMaximumAllocation().getMemorySize(), 8192);
setMaxAllocMb(conf, A1, 4096);
try {
cs.reinitialize(conf, mockContext);
fail("should have thrown exception");
} catch (IOException e) {
assertTrue("max allocation exception",
e.getCause().toString().contains("not be decreased"));
}
setMaxAllocMb(conf, A1, 8192);
cs.reinitialize(conf, mockContext);
setMaxAllocVcores(conf, A1,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES - 1);
try {
cs.reinitialize(conf, mockContext);
fail("should have thrown exception");
} catch (IOException e) {
assertTrue("max allocation exception",
e.getCause().toString().contains("not be decreased"));
}
}
@Test
public void testRefreshQueuesMaxAllocationPerQueueLarge() throws Exception {
// verify we can't set the allocation per queue larger then cluster setting
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.init(conf);
cs.start();
// change max allocation for B3 queue to be larger then cluster max
setMaxAllocMb(conf, B3,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 2048);
try {
cs.reinitialize(conf, mockContext);
fail("should have thrown exception");
} catch (IOException e) {
assertTrue("maximum allocation exception",
e.getCause().getMessage().contains("maximum allocation"));
}
setMaxAllocMb(conf, B3,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
cs.reinitialize(conf, mockContext);
setMaxAllocVcores(conf, B3,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
try {
cs.reinitialize(conf, mockContext);
fail("should have thrown exception");
} catch (IOException e) {
assertTrue("maximum allocation exception",
e.getCause().getMessage().contains("maximum allocation"));
}
}
@Test
public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
// queue refresh should allow max allocation per queue to go larger
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
setMaxAllocMb(conf,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
setMaxAllocVcores(conf,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
setMaxAllocMb(conf, A1, 4096);
setMaxAllocVcores(conf, A1, 2);
cs.init(conf);
cs.start();
cs.reinitialize(conf, mockContext);
checkQueueStructureCapacities(cs);
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueA = findQueue(rootQueue, A);
CSQueue queueA1 = findQueue(queueA, A1);
assertEquals("max capability MB in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max capability vcores in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("max allocation MB A1",
4096,
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1",
2,
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("cluster max allocation MB",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
assertEquals("cluster max allocation vcores",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
assertEquals("queue max allocation", 4096,
queueA1.getMaximumAllocation().getMemorySize());
setMaxAllocMb(conf, A1, 6144);
setMaxAllocVcores(conf, A1, 3);
cs.reinitialize(conf, null);
// conf will have changed but we shouldn't be able to change max allocation
// for the actual queue
assertEquals("max allocation MB A1", 6144,
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1", 3,
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max allocation MB cluster",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
assertEquals("max allocation vcores cluster",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
assertEquals("queue max allocation MB", 6144,
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("queue max allocation vcores", 3,
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max capability MB cluster",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("cluster max capability vcores",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumResourceCapability().getVirtualCores());
}
@Test
public void testRefreshQueuesMaxAllocationCSError() throws Exception {
// Try to refresh the cluster level max allocation size to be smaller
// and it should error out
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
setMaxAllocMb(conf, 10240);
setMaxAllocVcores(conf, 10);
setMaxAllocMb(conf, A1, 4096);
setMaxAllocVcores(conf, A1, 4);
cs.init(conf);
cs.start();
cs.reinitialize(conf, mockContext);
checkQueueStructureCapacities(cs);
assertEquals("max allocation MB in CS", 10240,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max allocation vcores in CS", 10,
cs.getMaximumResourceCapability().getVirtualCores());
setMaxAllocMb(conf, 6144);
try {
cs.reinitialize(conf, mockContext);
fail("should have thrown exception");
} catch (IOException e) {
assertTrue("max allocation exception",
e.getCause().toString().contains("not be decreased"));
}
setMaxAllocMb(conf, 10240);
cs.reinitialize(conf, mockContext);
setMaxAllocVcores(conf, 8);
try {
cs.reinitialize(conf, mockContext);
fail("should have thrown exception");
} catch (IOException e) {
assertTrue("max allocation exception",
e.getCause().toString().contains("not be decreased"));
}
}
@Test
public void testRefreshQueuesMaxAllocationCSLarger() throws Exception {
// Try to refresh the cluster level max allocation size to be larger
// and verify that if there is no setting per queue it uses the
// cluster level setting.
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
setMaxAllocMb(conf, 10240);
setMaxAllocVcores(conf, 10);
setMaxAllocMb(conf, A1, 4096);
setMaxAllocVcores(conf, A1, 4);
cs.init(conf);
cs.start();
cs.reinitialize(conf, mockContext);
checkQueueStructureCapacities(cs);
assertEquals("max allocation MB in CS", 10240,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max allocation vcores in CS", 10,
cs.getMaximumResourceCapability().getVirtualCores());
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueA = findQueue(rootQueue, A);
CSQueue queueB = findQueue(rootQueue, B);
CSQueue queueA1 = findQueue(queueA, A1);
CSQueue queueA2 = findQueue(queueA, A2);
CSQueue queueB2 = findQueue(queueB, B2);
assertEquals("queue A1 max allocation MB", 4096,
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("queue A1 max allocation vcores", 4,
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("queue A2 max allocation MB", 10240,
queueA2.getMaximumAllocation().getMemorySize());
assertEquals("queue A2 max allocation vcores", 10,
queueA2.getMaximumAllocation().getVirtualCores());
assertEquals("queue B2 max allocation MB", 10240,
queueB2.getMaximumAllocation().getMemorySize());
assertEquals("queue B2 max allocation vcores", 10,
queueB2.getMaximumAllocation().getVirtualCores());
setMaxAllocMb(conf, 12288);
setMaxAllocVcores(conf, 12);
cs.reinitialize(conf, null);
// cluster level setting should change and any queues without
// per queue setting
assertEquals("max allocation MB in CS", 12288,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max allocation vcores in CS", 12,
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("queue A1 max MB allocation", 4096,
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("queue A1 max vcores allocation", 4,
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("queue A2 max MB allocation", 12288,
queueA2.getMaximumAllocation().getMemorySize());
assertEquals("queue A2 max vcores allocation", 12,
queueA2.getMaximumAllocation().getVirtualCores());
assertEquals("queue B2 max MB allocation", 12288,
queueB2.getMaximumAllocation().getMemorySize());
assertEquals("queue B2 max vcores allocation", 12,
queueB2.getMaximumAllocation().getVirtualCores());
}
/**
* Test for queue deletion.
*
* @throws Exception
*/
@Test
public void testRefreshQueuesWithQueueDelete() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
cs.init(conf);
cs.start();
cs.reinitialize(conf, rmContext);
checkQueueStructureCapacities(cs);
// test delete leaf queue when there is application running.
Map<String, CSQueue> queues =
cs.getCapacitySchedulerQueueManager().getShortNameQueues();
String b1QTobeDeleted = "b1";
LeafQueue csB1Queue = Mockito.spy((LeafQueue) queues.get(b1QTobeDeleted));
when(csB1Queue.getState()).thenReturn(QueueState.DRAINING)
.thenReturn(QueueState.STOPPED);
cs.getCapacitySchedulerQueueManager().addQueue(b1QTobeDeleted, csB1Queue);
conf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithoutB1(conf);
try {
cs.reinitialize(conf, mockContext);
fail("Expected to throw exception when refresh queue tries to delete a"
+ " queue with running apps");
} catch (IOException e) {
// ignore
}
// test delete leaf queue(root.b.b1) when there is no application running.
conf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithoutB1(conf);
try {
cs.reinitialize(conf, mockContext);
} catch (IOException e) {
LOG.error(
"Expected to NOT throw exception when refresh queue tries to delete"
+ " a queue WITHOUT running apps",
e);
fail("Expected to NOT throw exception when refresh queue tries to delete"
+ " a queue WITHOUT running apps");
}
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueB = findQueue(rootQueue, B);
CSQueue queueB3 = findQueue(queueB, B1);
assertNull("Refresh needs to support delete of leaf queue ", queueB3);
// reset back to default configuration for testing parent queue delete
conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.reinitialize(conf, rmContext);
checkQueueStructureCapacities(cs);
// set the configurations such that it fails once but should be successfull
// next time
queues = cs.getCapacitySchedulerQueueManager().getShortNameQueues();
CSQueue bQueue = Mockito.spy((ParentQueue) queues.get("b"));
when(bQueue.getState()).thenReturn(QueueState.DRAINING)
.thenReturn(QueueState.STOPPED);
cs.getCapacitySchedulerQueueManager().addQueue("b", bQueue);
bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
when(bQueue.getState()).thenReturn(QueueState.STOPPED);
cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);
bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
when(bQueue.getState()).thenReturn(QueueState.STOPPED);
cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);
bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
when(bQueue.getState()).thenReturn(QueueState.STOPPED);
cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);
// test delete Parent queue when there is application running.
conf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithoutB(conf);
try {
cs.reinitialize(conf, mockContext);
fail("Expected to throw exception when refresh queue tries to delete a"
+ " parent queue with running apps in children queue");
} catch (IOException e) {
// ignore
}
// test delete Parent queue when there is no application running.
conf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithoutB(conf);
try {
cs.reinitialize(conf, mockContext);
} catch (IOException e) {
fail("Expected to not throw exception when refresh queue tries to delete"
+ " a queue without running apps");
}
rootQueue = cs.getRootQueue();
queueB = findQueue(rootQueue, B);
String message =
"Refresh needs to support delete of Parent queue and its children.";
assertNull(message, queueB);
assertNull(message,
cs.getCapacitySchedulerQueueManager().getQueues().get("b"));
assertNull(message,
cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
assertNull(message,
cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
cs.stop();
}
/**
* Test for all child queue deletion and thus making parent queue a child.
*
* @throws Exception
*/
@Test
public void testRefreshQueuesWithAllChildQueuesDeleted() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
cs.init(conf);
cs.start();
cs.reinitialize(conf, rmContext);
checkQueueStructureCapacities(cs);
// test delete all leaf queues when there is no application running.
Map<String, CSQueue> queues =
cs.getCapacitySchedulerQueueManager().getShortNameQueues();
CSQueue bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
when(bQueue.getState()).thenReturn(QueueState.RUNNING)
.thenReturn(QueueState.STOPPED);
cs.getCapacitySchedulerQueueManager().addQueue("b1", bQueue);
bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
when(bQueue.getState()).thenReturn(QueueState.STOPPED);
cs.getCapacitySchedulerQueueManager().addQueue("b2", bQueue);
bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
when(bQueue.getState()).thenReturn(QueueState.STOPPED);
cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue);
conf = new CapacitySchedulerConfiguration();
setupQueueConfWithoutChildrenOfB(conf);
// test convert parent queue to leaf queue(root.b) when there is no
// application running.
try {
cs.reinitialize(conf, mockContext);
fail("Expected to throw exception when refresh queue tries to make parent"
+ " queue a child queue when one of its children is still running.");
} catch (IOException e) {
//do not do anything, expected exception
}
// test delete leaf queues(root.b.b1,b2,b3) when there is no application
// running.
try {
cs.reinitialize(conf, mockContext);
} catch (IOException e) {
e.printStackTrace();
fail("Expected to NOT throw exception when refresh queue tries to delete"
+ " all children of a parent queue(without running apps).");
}
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueB = findQueue(rootQueue, B);
assertNotNull("Parent Queue B should not be deleted", queueB);
Assert.assertTrue("As Queue'B children are not deleted",
queueB instanceof LeafQueue);
String message =
"Refresh needs to support delete of all children of Parent queue.";
assertNull(message,
cs.getCapacitySchedulerQueueManager().getQueues().get("b3"));
assertNull(message,
cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
assertNull(message,
cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
cs.stop();
}
/**
* Test if we can convert a leaf queue to a parent queue.
*
* @throws Exception
*/
@Test(timeout = 10000)
public void testConvertLeafQueueToParentQueue() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
cs.init(conf);
cs.start();
cs.reinitialize(conf, rmContext);
checkQueueStructureCapacities(cs);
String targetQueue = "b1";
CSQueue b1 = cs.getQueue(targetQueue);
Assert.assertEquals(QueueState.RUNNING, b1.getState());
// test if we can convert a leaf queue which is in RUNNING state
conf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithB1AsParentQueue(conf);
try {
cs.reinitialize(conf, mockContext);
fail("Expected to throw exception when refresh queue tries to convert"
+ " a child queue to a parent queue.");
} catch (IOException e) {
// ignore
}
// now set queue state for b1 to STOPPED
conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
conf.set("yarn.scheduler.capacity.root.b.b1.state", "STOPPED");
cs.reinitialize(conf, mockContext);
Assert.assertEquals(QueueState.STOPPED, b1.getState());
// test if we can convert a leaf queue which is in STOPPED state
conf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithB1AsParentQueue(conf);
try {
cs.reinitialize(conf, mockContext);
} catch (IOException e) {
fail("Expected to NOT throw exception when refresh queue tries"
+ " to convert a leaf queue WITHOUT running apps");
}
b1 = cs.getQueue(targetQueue);
Assert.assertTrue(b1 instanceof ParentQueue);
Assert.assertEquals(QueueState.RUNNING, b1.getState());
Assert.assertTrue(!b1.getChildQueues().isEmpty());
}
@Test
public void testQueuesMaxAllocationInheritance() throws Exception {
// queue level max allocation is set by the queue configuration explicitly
// or inherits from the parent.
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
setMaxAllocMb(conf,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
setMaxAllocVcores(conf,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
// Test the child queue overrides
setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
"memory-mb=4096,vcores=2");
setMaxAllocation(conf, A1, "memory-mb=6144,vcores=2");
setMaxAllocation(conf, B, "memory-mb=5120, vcores=2");
setMaxAllocation(conf, B2, "memory-mb=1024, vcores=2");
cs.init(conf);
cs.start();
cs.reinitialize(conf, mockContext);
checkQueueStructureCapacities(cs);
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueA = findQueue(rootQueue, A);
CSQueue queueB = findQueue(rootQueue, B);
CSQueue queueA1 = findQueue(queueA, A1);
CSQueue queueA2 = findQueue(queueA, A2);
CSQueue queueB1 = findQueue(queueB, B1);
CSQueue queueB2 = findQueue(queueB, B2);
assertEquals("max capability MB in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max capability vcores in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("max allocation MB A1",
6144,
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1",
2,
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max allocation MB A2", 4096,
queueA2.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A2",
2,
queueA2.getMaximumAllocation().getVirtualCores());
assertEquals("max allocation MB B", 5120,
queueB.getMaximumAllocation().getMemorySize());
assertEquals("max allocation MB B1", 5120,
queueB1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation MB B2", 1024,
queueB2.getMaximumAllocation().getMemorySize());
// Test get the max-allocation from different parent
unsetMaxAllocation(conf, A1);
unsetMaxAllocation(conf, B);
unsetMaxAllocation(conf, B1);
setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
"memory-mb=6144,vcores=2");
setMaxAllocation(conf, A, "memory-mb=8192,vcores=2");
cs.reinitialize(conf, mockContext);
assertEquals("max capability MB in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max capability vcores in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("max allocation MB A1",
8192,
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1",
2,
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max allocation MB B1",
6144,
queueB1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores B1",
2,
queueB1.getMaximumAllocation().getVirtualCores());
// Test the default
unsetMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT);
unsetMaxAllocation(conf, A);
unsetMaxAllocation(conf, A1);
cs.reinitialize(conf, mockContext);
assertEquals("max capability MB in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max capability vcores in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("max allocation MB A1",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max allocation MB A2",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
queueA2.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A2",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
queueA2.getMaximumAllocation().getVirtualCores());
}
@Test
public void testVerifyQueuesMaxAllocationConf() throws Exception {
// queue level max allocation can't exceed the cluster setting
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
setMaxAllocMb(conf,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
setMaxAllocVcores(conf,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
long largerMem =
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1024;
long largerVcores =
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 10;
cs.init(conf);
cs.start();
cs.reinitialize(conf, mockContext);
checkQueueStructureCapacities(cs);
setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
"memory-mb=" + largerMem + ",vcores=2");
try {
cs.reinitialize(conf, mockContext);
fail("Queue Root maximum allocation can't exceed the cluster setting");
} catch (Exception e) {
assertTrue("maximum allocation exception",
e.getCause().getMessage().contains("maximum allocation"));
}
setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
"memory-mb=4096,vcores=2");
setMaxAllocation(conf, A, "memory-mb=6144,vcores=2");
setMaxAllocation(conf, A1, "memory-mb=" + largerMem + ",vcores=2");
try {
cs.reinitialize(conf, mockContext);
fail("Queue A1 maximum allocation can't exceed the cluster setting");
} catch (Exception e) {
assertTrue("maximum allocation exception",
e.getCause().getMessage().contains("maximum allocation"));
}
setMaxAllocation(conf, A1, "memory-mb=8192" + ",vcores=" + largerVcores);
try {
cs.reinitialize(conf, mockContext);
fail("Queue A1 maximum allocation can't exceed the cluster setting");
} catch (Exception e) {
assertTrue("maximum allocation exception",
e.getCause().getMessage().contains("maximum allocation"));
}
}
}