YARN-9894. CapacitySchedulerPerf test for measuring hundreds of apps in a large number of queues. Contributed by Eric Payne
This commit is contained in:
parent
fddc3d55c3
commit
7b93575b92
@ -34,6 +34,7 @@
|
|||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||||
@ -58,6 +59,9 @@
|
|||||||
import java.util.PriorityQueue;
|
import java.util.PriorityQueue;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES;
|
import static org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@ -68,9 +72,22 @@ private String getResourceName(int idx) {
|
|||||||
return "resource-" + idx;
|
return "resource-" + idx;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This test is run only when when -DRunCapacitySchedulerPerfTests=true is set
|
||||||
|
// on the command line. In addition, this test has tunables for the following:
|
||||||
|
// Number of queues: -DNumberOfQueues (default=100)
|
||||||
|
// Number of total apps: -DNumberOfApplications (default=200)
|
||||||
|
// Percentage of queues with apps: -DPercentActiveQueues (default=100)
|
||||||
|
// E.G.:
|
||||||
|
// mvn test -Dtest=TestCapacitySchedulerPerf -Dsurefire.fork.timeout=1800 \
|
||||||
|
// -DRunCapacitySchedulerPerfTests=true -DNumberOfQueues=50 \
|
||||||
|
// -DNumberOfApplications=200 -DPercentActiveQueues=100
|
||||||
|
// Note that the surefire.fork.timeout flag is added because these tests could
|
||||||
|
// take longer than the surefire timeout.
|
||||||
private void testUserLimitThroughputWithNumberOfResourceTypes(
|
private void testUserLimitThroughputWithNumberOfResourceTypes(
|
||||||
int numOfResourceTypes)
|
int numOfResourceTypes, int numQueues, int pctActiveQueues, int appCount)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
Assume.assumeTrue(Boolean.valueOf(
|
||||||
|
System.getProperty("RunCapacitySchedulerPerfTests")));
|
||||||
if (numOfResourceTypes > 2) {
|
if (numOfResourceTypes > 2) {
|
||||||
// Initialize resource map
|
// Initialize resource map
|
||||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||||
@ -89,22 +106,16 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
|
|||||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Since this is more of a performance unit test, only run if
|
final int activeQueues = (int) (numQueues * (pctActiveQueues/100f));
|
||||||
// RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
|
final int totalApps = appCount + activeQueues;
|
||||||
Assume.assumeTrue(Boolean.valueOf(
|
// extra apps to get started with user limit
|
||||||
System.getProperty("RunCapacitySchedulerPerfTests")));
|
|
||||||
|
|
||||||
CapacitySchedulerConfiguration csconf =
|
CapacitySchedulerConfiguration csconf =
|
||||||
new CapacitySchedulerConfiguration();
|
createCSConfWithManyQueues(numQueues);
|
||||||
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
|
|
||||||
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
|
|
||||||
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
|
|
||||||
100.0f);
|
|
||||||
csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
|
|
||||||
csconf.setResourceComparator(DominantResourceCalculator.class);
|
|
||||||
|
|
||||||
YarnConfiguration conf = new YarnConfiguration(csconf);
|
YarnConfiguration conf = new YarnConfiguration(csconf);
|
||||||
// Don't reset resource types since we have already configured resource types
|
// Don't reset resource types since we have already configured resource
|
||||||
|
// types
|
||||||
conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
|
conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
|
||||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
ResourceScheduler.class);
|
ResourceScheduler.class);
|
||||||
@ -113,11 +124,16 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
|
|||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
LeafQueue qb = (LeafQueue)cs.getQueue("default");
|
|
||||||
|
|
||||||
// For now make user limit large so we can activate all applications
|
LeafQueue[] lqs = new LeafQueue[numQueues];
|
||||||
qb.setUserLimitFactor((float)100.0);
|
for (int i = 0; i < numQueues; i++) {
|
||||||
qb.setupConfigurableCapacities();
|
String queueName = String.format("%03d", i);
|
||||||
|
LeafQueue qb = (LeafQueue)cs.getQueue(queueName);
|
||||||
|
// For now make user limit large so we can activate all applications
|
||||||
|
qb.setUserLimitFactor((float)100.0);
|
||||||
|
qb.setupConfigurableCapacities();
|
||||||
|
lqs[i] = qb;
|
||||||
|
}
|
||||||
|
|
||||||
SchedulerEvent addAppEvent;
|
SchedulerEvent addAppEvent;
|
||||||
SchedulerEvent addAttemptEvent;
|
SchedulerEvent addAttemptEvent;
|
||||||
@ -125,13 +141,12 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
|
|||||||
ApplicationSubmissionContext submissionContext =
|
ApplicationSubmissionContext submissionContext =
|
||||||
mock(ApplicationSubmissionContext.class);
|
mock(ApplicationSubmissionContext.class);
|
||||||
|
|
||||||
final int appCount = 100;
|
ApplicationId[] appids = new ApplicationId[totalApps];
|
||||||
ApplicationId[] appids = new ApplicationId[appCount];
|
RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[totalApps];
|
||||||
RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
|
ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[totalApps];
|
||||||
ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
|
RMAppImpl[] apps = new RMAppImpl[totalApps];
|
||||||
RMAppImpl[] apps = new RMAppImpl[appCount];
|
RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[totalApps];
|
||||||
RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
|
for (int i=0; i<totalApps; i++) {
|
||||||
for (int i=0; i<appCount; i++) {
|
|
||||||
appids[i] = BuilderUtils.newApplicationId(100, i);
|
appids[i] = BuilderUtils.newApplicationId(100, i);
|
||||||
appAttemptIds[i] =
|
appAttemptIds[i] =
|
||||||
BuilderUtils.newApplicationAttemptId(appids[i], 1);
|
BuilderUtils.newApplicationAttemptId(appids[i], 1);
|
||||||
@ -148,34 +163,34 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
|
|||||||
when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
|
when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
|
||||||
|
|
||||||
rm.getRMContext().getRMApps().put(appids[i], apps[i]);
|
rm.getRMContext().getRMApps().put(appids[i], apps[i]);
|
||||||
|
String queueName = lqs[i % activeQueues].getQueueName();
|
||||||
addAppEvent =
|
addAppEvent =
|
||||||
new AppAddedSchedulerEvent(appids[i], "default", "user1");
|
new AppAddedSchedulerEvent(appids[i], queueName, "user1");
|
||||||
cs.handle(addAppEvent);
|
cs.handle(addAppEvent);
|
||||||
addAttemptEvent =
|
addAttemptEvent =
|
||||||
new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
|
new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
|
||||||
cs.handle(addAttemptEvent);
|
cs.handle(addAttemptEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
// add nodes to cluster, so cluster has 20GB and 20 vcores
|
// add nodes to cluster with enough resources to satisfy all apps
|
||||||
Resource nodeResource = Resource.newInstance(10 * GB, 10);
|
Resource newResource = Resource.newInstance(totalApps * GB, totalApps);
|
||||||
if (numOfResourceTypes > 2) {
|
if (numOfResourceTypes > 2) {
|
||||||
for (int i = 2; i < numOfResourceTypes; i++) {
|
for (int i = 2; i < numOfResourceTypes; i++) {
|
||||||
nodeResource.setResourceValue(getResourceName(i), 10);
|
newResource.setResourceValue(getResourceName(i), totalApps);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
|
||||||
RMNode node = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.1");
|
|
||||||
cs.handle(new NodeAddedSchedulerEvent(node));
|
cs.handle(new NodeAddedSchedulerEvent(node));
|
||||||
|
|
||||||
RMNode node2 = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.2");
|
RMNode node2 = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.2");
|
||||||
cs.handle(new NodeAddedSchedulerEvent(node2));
|
cs.handle(new NodeAddedSchedulerEvent(node2));
|
||||||
|
|
||||||
Priority u0Priority = TestUtils.createMockPriority(1);
|
Priority u0Priority = TestUtils.createMockPriority(1);
|
||||||
RecordFactory recordFactory =
|
RecordFactory recordFactory =
|
||||||
RecordFactoryProvider.getRecordFactory(null);
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
|
||||||
FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
|
FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[totalApps];
|
||||||
for (int i=0;i<appCount;i++) {
|
for (int i=0;i<totalApps;i++) {
|
||||||
fiCaApps[i] =
|
fiCaApps[i] =
|
||||||
cs.getSchedulerApplications().get(apps[i].getApplicationId())
|
cs.getSchedulerApplications().get(apps[i].getApplicationId())
|
||||||
.getCurrentAppAttempt();
|
.getCurrentAppAttempt();
|
||||||
@ -193,8 +208,30 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
|
|||||||
fiCaApps[i].updateResourceRequests(
|
fiCaApps[i].updateResourceRequests(
|
||||||
Collections.singletonList(resourceRequest));
|
Collections.singletonList(resourceRequest));
|
||||||
}
|
}
|
||||||
// Now force everything to be over user limit
|
// Now force everything to be at user limit
|
||||||
qb.setUserLimitFactor((float)0.0);
|
for (int i = 0; i < numQueues; i++) {
|
||||||
|
lqs[i].setUserLimitFactor((float)0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// allocate one container for each extra apps since
|
||||||
|
// LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(node));
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(node2));
|
||||||
|
|
||||||
|
// make sure only the extra apps have allocated containers
|
||||||
|
for (int i=0;i<totalApps;i++) {
|
||||||
|
boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
|
||||||
|
if (i < activeQueues) {
|
||||||
|
assertFalse(pending);
|
||||||
|
assertEquals(0,
|
||||||
|
fiCaApps[i].getTotalPendingRequestsPerPartition().size());
|
||||||
|
} else {
|
||||||
|
assertTrue(pending);
|
||||||
|
assertEquals(1*GB,
|
||||||
|
fiCaApps[i].getTotalPendingRequestsPerPartition()
|
||||||
|
.get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Quiet the loggers while measuring throughput
|
// Quiet the loggers while measuring throughput
|
||||||
GenericTestUtils.setRootLogLevel(Level.WARN);
|
GenericTestUtils.setRootLogLevel(Level.WARN);
|
||||||
@ -233,27 +270,86 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
|
|||||||
}
|
}
|
||||||
System.out.println(
|
System.out.println(
|
||||||
"#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
|
"#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
|
||||||
+ ": " + numerator / (timespent / entries));
|
+ ": " + numerator / (timespent / entries) + " ops/sec of "
|
||||||
|
+ appCount + " apps on " + pctActiveQueues + "% of " + numQueues
|
||||||
|
+ " queues.");
|
||||||
|
|
||||||
|
// make sure only the extra apps have allocated containers
|
||||||
|
for (int i=0;i<totalApps;i++) {
|
||||||
|
boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
|
||||||
|
if (i < activeQueues) {
|
||||||
|
assertFalse(pending);
|
||||||
|
assertEquals(0,
|
||||||
|
fiCaApps[i].getTotalPendingRequestsPerPartition().size());
|
||||||
|
} else {
|
||||||
|
assertTrue(pending);
|
||||||
|
assertEquals(1*GB,
|
||||||
|
fiCaApps[i].getTotalPendingRequestsPerPartition()
|
||||||
|
.get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rm.close();
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testUserLimitThroughputForTwoResources() throws Exception {
|
public void testUserLimitThroughputForTwoResources() throws Exception {
|
||||||
testUserLimitThroughputWithNumberOfResourceTypes(2);
|
testUserLimitThroughputWithNumberOfResourceTypes(2, 1, 100, 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testUserLimitThroughputForThreeResources() throws Exception {
|
public void testUserLimitThroughputForThreeResources() throws Exception {
|
||||||
testUserLimitThroughputWithNumberOfResourceTypes(3);
|
testUserLimitThroughputWithNumberOfResourceTypes(3, 1, 100, 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testUserLimitThroughputForFourResources() throws Exception {
|
public void testUserLimitThroughputForFourResources() throws Exception {
|
||||||
testUserLimitThroughputWithNumberOfResourceTypes(4);
|
testUserLimitThroughputWithNumberOfResourceTypes(4, 1, 100, 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 300000)
|
@Test(timeout = 300000)
|
||||||
public void testUserLimitThroughputForFiveResources() throws Exception {
|
public void testUserLimitThroughputForFiveResources() throws Exception {
|
||||||
testUserLimitThroughputWithNumberOfResourceTypes(5);
|
testUserLimitThroughputWithNumberOfResourceTypes(5, 1, 100, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 1800000)
|
||||||
|
public void testUserLimitThroughputWithManyQueues() throws Exception {
|
||||||
|
|
||||||
|
int numQueues = Integer.getInteger("NumberOfQueues", 40);
|
||||||
|
int pctActiveQueues = Integer.getInteger("PercentActiveQueues", 100);
|
||||||
|
int appCount = Integer.getInteger("NumberOfApplications", 100);
|
||||||
|
|
||||||
|
testUserLimitThroughputWithNumberOfResourceTypes(
|
||||||
|
2, numQueues, pctActiveQueues, appCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
CapacitySchedulerConfiguration createCSConfWithManyQueues(int numQueues)
|
||||||
|
throws Exception {
|
||||||
|
CapacitySchedulerConfiguration csconf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
csconf.setResourceComparator(DominantResourceCalculator.class);
|
||||||
|
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
|
||||||
|
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
|
||||||
|
csconf.setCapacity("root.default", 0.0f);
|
||||||
|
csconf.setOffSwitchPerHeartbeatLimit(numQueues);
|
||||||
|
|
||||||
|
float capacity = 100.0f / numQueues;
|
||||||
|
String[] subQueues = new String[numQueues];
|
||||||
|
for (int i = 0; i < numQueues; i++) {
|
||||||
|
String queueName = String.format("%03d", i);
|
||||||
|
String queuePath = "root." + queueName;
|
||||||
|
subQueues[i] = queueName;
|
||||||
|
csconf.setMaximumApplicationMasterResourcePerQueuePercent(
|
||||||
|
queuePath, 100.0f);
|
||||||
|
csconf.setMaximumAMResourcePercentPerPartition(queuePath, "", 100.0f);
|
||||||
|
csconf.setCapacity(queuePath, capacity);
|
||||||
|
csconf.setUserLimitFactor(queuePath, 100.0f);
|
||||||
|
csconf.setMaximumCapacity(queuePath, 100.0f);
|
||||||
|
}
|
||||||
|
|
||||||
|
csconf.setQueues("root", subQueues);
|
||||||
|
|
||||||
|
return csconf;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user