YARN-11169. Support moveApplicationAcrossQueues, getQueueInfo API's for Federation. (#4464)

This commit is contained in:
slfan1989 2022-07-05 11:24:29 -07:00 committed by GitHub
parent ea46f49b04
commit 161b1fac2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 341 additions and 4 deletions

View File

@ -83,6 +83,10 @@ public final class RouterMetrics {
private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved;
@Metric("# of signalToContainer failed to be retrieved")
private MutableGaugeInt numSignalToContainerFailedRetrieved;
@Metric("# of getQueueInfo failed to be retrieved")
private MutableGaugeInt numGetQueueInfoFailedRetrieved;
@Metric("# of moveApplicationAcrossQueues failed to be retrieved")
private MutableGaugeInt numMoveApplicationAcrossQueuesFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -130,6 +134,10 @@ public final class RouterMetrics {
private MutableRate totalSucceededUpdateAppTimeoutsRetrieved;
@Metric("Total number of successful Retrieved signalToContainer and latency(ms)")
private MutableRate totalSucceededSignalToContainerRetrieved;
@Metric("Total number of successful Retrieved getQueueInfo and latency(ms)")
private MutableRate totalSucceededGetQueueInfoRetrieved;
@Metric("Total number of successful Retrieved moveApplicationAcrossQueues and latency(ms)")
private MutableRate totalSucceededMoveApplicationAcrossQueuesRetrieved;
/**
* Provide quantile counters for all latencies.
@ -155,6 +163,8 @@ public final class RouterMetrics {
private MutableQuantiles updateAppPriorityLatency;
private MutableQuantiles updateAppTimeoutsLatency;
private MutableQuantiles signalToContainerLatency;
private MutableQuantiles getQueueInfoLatency;
private MutableQuantiles moveApplicationAcrossQueuesLatency;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
@ -237,6 +247,14 @@ private RouterMetrics() {
signalToContainerLatency =
registry.newQuantiles("signalToContainerLatency",
"latency of signal to container timeouts", "ops", "latency", 10);
getQueueInfoLatency =
registry.newQuantiles("getQueueInfoLatency",
"latency of get queue info timeouts", "ops", "latency", 10);
moveApplicationAcrossQueuesLatency =
registry.newQuantiles("moveApplicationAcrossQueuesLatency",
"latency of move application across queues timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -363,6 +381,16 @@ public long getNumSucceededSignalToContainerRetrieved() {
return totalSucceededSignalToContainerRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetQueueInfoRetrieved() {
return totalSucceededGetQueueInfoRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededMoveApplicationAcrossQueuesRetrieved() {
return totalSucceededMoveApplicationAcrossQueuesRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
@ -468,6 +496,16 @@ public double getLatencySucceededSignalToContainerRetrieved() {
return totalSucceededSignalToContainerRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetQueueInfoRetrieved() {
return totalSucceededGetQueueInfoRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededMoveApplicationAcrossQueuesRetrieved() {
return totalSucceededMoveApplicationAcrossQueuesRetrieved.lastStat().mean();
}
@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
@ -573,6 +611,14 @@ public int getSignalToContainerFailedRetrieved() {
return numSignalToContainerFailedRetrieved.value();
}
public int getQueueInfoFailedRetrieved() {
return numGetQueueInfoFailedRetrieved.value();
}
public int getMoveApplicationAcrossQueuesFailedRetrieved() {
return numMoveApplicationAcrossQueuesFailedRetrieved.value();
}
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@ -678,6 +724,16 @@ public void succeededSignalToContainerRetrieved(long duration) {
signalToContainerLatency.add(duration);
}
public void succeededGetQueueInfoRetrieved(long duration) {
totalSucceededGetQueueInfoRetrieved.add(duration);
getQueueInfoLatency.add(duration);
}
public void succeededMoveApplicationAcrossQueuesRetrieved(long duration) {
totalSucceededMoveApplicationAcrossQueuesRetrieved.add(duration);
moveApplicationAcrossQueuesLatency.add(duration);
}
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@ -761,4 +817,12 @@ public void incrUpdateApplicationTimeoutsRetrieved() {
public void incrSignalToContainerFailedRetrieved() {
numSignalToContainerFailedRetrieved.incr();
}
public void incrGetQueueInfoFailedRetrieved() {
numGetQueueInfoFailedRetrieved.incr();
}
public void incrMoveApplicationAcrossQueuesFailedRetrieved() {
numMoveApplicationAcrossQueuesFailedRetrieved.incr();
}
}

View File

@ -822,7 +822,27 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getQueueName() == null) {
routerMetrics.incrGetQueueInfoFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getQueueInfo request or queueName.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getQueueInfo",
new Class[]{GetQueueInfoRequest.class}, new Object[]{request});
Collection<GetQueueInfoResponse> queues = null;
try {
queues = invokeAppClientProtocolMethod(true, remoteMethod,
GetQueueInfoResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetQueueInfoFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get queue [" +
request.getQueueName() + "] to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetQueueInfoRetrieved(stopTime - startTime);
// Merge the GetQueueInfoResponse
return RouterYarnClientUtils.mergeQueues(queues);
}
@Override
@ -854,7 +874,44 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls(
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getApplicationId() == null || request.getTargetQueue() == null) {
routerMetrics.incrMoveApplicationAcrossQueuesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing moveApplicationAcrossQueues request or " +
"applicationId or target queue.", null);
}
long startTime = clock.getTime();
SubClusterId subClusterId = null;
ApplicationId applicationId = request.getApplicationId();
try {
subClusterId = federationFacade
.getApplicationHomeSubCluster(applicationId);
} catch (YarnException e) {
routerMetrics.incrMoveApplicationAcrossQueuesFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " +
applicationId + " does not exist in FederationStateStore.", e);
}
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
MoveApplicationAcrossQueuesResponse response = null;
try {
response = clientRMProxy.moveApplicationAcrossQueues(request);
} catch (Exception e) {
routerMetrics.incrAppAttemptsFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to moveApplicationAcrossQueues for " +
applicationId + " to SubCluster " + subClusterId.getId(), e);
}
if (response == null) {
LOG.error("No response when moveApplicationAcrossQueues "
+ "the applicationId {} to Queue {} In SubCluster {}.",
request.getApplicationId(), request.getTargetQueue(), subClusterId.getId());
}
long stopTime = clock.getTime();
routerMetrics.succeededMoveApplicationAcrossQueuesRetrieved(stopTime - startTime);
return response;
}
@Override

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -44,6 +45,7 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -365,5 +367,55 @@ public static GetAllResourceTypeInfoResponse mergeResourceTypes(
new ArrayList<>(resourceTypeInfoSet));
return resourceTypeInfoResponse;
}
/**
* Merges a list of GetQueueInfoResponse.
*
* @param responses a list of GetQueueInfoResponse to merge.
* @return the merged GetQueueInfoResponse.
*/
public static GetQueueInfoResponse mergeQueues(
Collection<GetQueueInfoResponse> responses) {
GetQueueInfoResponse queueResponse = Records.newRecord(
GetQueueInfoResponse.class);
QueueInfo queueInfo = null;
for (GetQueueInfoResponse response : responses) {
if (response != null && response.getQueueInfo() != null) {
if (queueInfo == null) {
queueInfo = response.getQueueInfo();
} else {
// set Capacity\MaximumCapacity\CurrentCapacity
queueInfo.setCapacity(queueInfo.getCapacity() + response.getQueueInfo().getCapacity());
queueInfo.setMaximumCapacity(
queueInfo.getMaximumCapacity() + response.getQueueInfo().getMaximumCapacity());
queueInfo.setCurrentCapacity(
queueInfo.getCurrentCapacity() + response.getQueueInfo().getCurrentCapacity());
// set childQueues
List<QueueInfo> childQueues = new ArrayList<>(queueInfo.getChildQueues());
childQueues.addAll(response.getQueueInfo().getChildQueues());
queueInfo.setChildQueues(childQueues);
// set applications
List<ApplicationReport> applicationReports = new ArrayList<>(queueInfo.getApplications());
applicationReports.addAll(response.getQueueInfo().getApplications());
queueInfo.setApplications(applicationReports);
// set accessibleNodeLabels
Set<String> accessibleNodeLabels = new HashSet<>();
if (queueInfo.getAccessibleNodeLabels() != null) {
accessibleNodeLabels.addAll(queueInfo.getAccessibleNodeLabels());
}
if (response.getQueueInfo() != null) {
accessibleNodeLabels.addAll(response.getQueueInfo().getAccessibleNodeLabels());
}
queueInfo.setAccessibleNodeLabels(accessibleNodeLabels);
}
}
}
queueResponse.setQueueInfo(queueInfo);
return queueResponse;
}
}

View File

@ -418,6 +418,16 @@ public void getSignalContainer() {
LOG.info("Mocked: failed signalContainer call");
metrics.incrSignalToContainerFailedRetrieved();
}
public void getQueueInfo() {
LOG.info("Mocked: failed getQueueInfo call");
metrics.incrGetQueueInfoFailedRetrieved();
}
public void moveApplicationAcrossQueuesFailed() {
LOG.info("Mocked: failed moveApplicationAcrossQueuesFailed call");
metrics.incrMoveApplicationAcrossQueuesFailedRetrieved();
}
}
// Records successes for all calls
@ -533,6 +543,16 @@ public void getSignalToContainerTimeouts(long duration) {
LOG.info("Mocked: successful signalToContainer call with duration {}", duration);
metrics.succeededSignalToContainerRetrieved(duration);
}
public void getQueueInfoRetrieved(long duration) {
LOG.info("Mocked: successful getQueueInfo call with duration {}", duration);
metrics.succeededGetQueueInfoRetrieved(duration);
}
public void moveApplicationAcrossQueuesRetrieved(long duration) {
LOG.info("Mocked: successful moveApplicationAcrossQueues call with duration {}", duration);
metrics.succeededMoveApplicationAcrossQueuesRetrieved(duration);
}
}
@Test
@ -839,4 +859,50 @@ public void testSignalToContainerFailed() {
metrics.getSignalToContainerFailedRetrieved());
}
@Test
public void testSucceededGetQueueInfoRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetQueueInfoRetrieved();
goodSubCluster.getQueueInfoRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetQueueInfoRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetQueueInfoRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getQueueInfoRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetQueueInfoRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetQueueInfoRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetQueueInfoFailed() {
long totalBadBefore = metrics.getQueueInfoFailedRetrieved();
badSubCluster.getQueueInfo();
Assert.assertEquals(totalBadBefore + 1,
metrics.getQueueInfoFailedRetrieved());
}
@Test
public void testSucceededMoveApplicationAcrossQueuesRetrieved() {
long totalGoodBefore = metrics.getNumSucceededMoveApplicationAcrossQueuesRetrieved();
goodSubCluster.moveApplicationAcrossQueuesRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededMoveApplicationAcrossQueuesRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededMoveApplicationAcrossQueuesRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.moveApplicationAcrossQueuesRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededMoveApplicationAcrossQueuesRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededMoveApplicationAcrossQueuesRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testMoveApplicationAcrossQueuesRetrievedFailed() {
long totalBadBefore = metrics.getMoveApplicationAcrossQueuesFailedRetrieved();
badSubCluster.moveApplicationAcrossQueuesFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getMoveApplicationAcrossQueuesFailedRetrieved());
}
}

View File

@ -74,6 +74,10 @@
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -86,6 +90,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@ -1101,4 +1106,67 @@ public void testSignalContainer() throws Exception {
Assert.assertNotNull(signalContainerResponse);
}
@Test
public void testMoveApplicationAcrossQueues() throws Exception {
LOG.info("Test FederationClientInterceptor : MoveApplication AcrossQueues request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing moveApplicationAcrossQueues request " +
"or applicationId or target queue.", () -> interceptor.moveApplicationAcrossQueues(null));
// normal request
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
Assert.assertNotNull(subClusterId);
MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
mockRM.waitForState(appId, RMAppState.ACCEPTED);
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
MockNM nm = interceptor.getMockNMs().get(subClusterId);
nm.nodeHeartbeat(true);
mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
MoveApplicationAcrossQueuesRequest acrossQueuesRequest =
MoveApplicationAcrossQueuesRequest.newInstance(appId, "root.target");
MoveApplicationAcrossQueuesResponse acrossQueuesResponse =
interceptor.moveApplicationAcrossQueues(acrossQueuesRequest);
Assert.assertNotNull(acrossQueuesResponse);
}
@Test
public void testGetQueueInfo() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Queue Info request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getQueueInfo request or queueName.",
() -> interceptor.getQueueInfo(null));
// normal request
GetQueueInfoResponse response = interceptor.getQueueInfo(
GetQueueInfoRequest.newInstance("root", true, true, true));
Assert.assertNotNull(response);
QueueInfo queueInfo = response.getQueueInfo();
Assert.assertNotNull(queueInfo);
Assert.assertEquals(queueInfo.getQueueName(), "root");
Assert.assertEquals(queueInfo.getCapacity(), 4.0, 0);
Assert.assertEquals(queueInfo.getCurrentCapacity(), 0.0, 0);
Assert.assertEquals(queueInfo.getChildQueues().size(), 12, 0);
Assert.assertEquals(queueInfo.getAccessibleNodeLabels().size(), 1);
}
}

View File

@ -45,7 +45,7 @@
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,decided</value>
<value>default,decided,target</value>
<description>
The queues at the this level (root is the root queue).
</description>
@ -53,10 +53,16 @@
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>100</value>
<value>90</value>
<description>Default queue target capacity.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.target.capacity</name>
<value>10</value>
<description>target queue capacity.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.user-limit-factor</name>
<value>1</value>
@ -81,6 +87,14 @@
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.target.state</name>
<value>RUNNING</value>
<description>
The state of the target queue. State can be one of RUNNING or STOPPED.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.acl_submit_applications</name>
<value>*</value>
@ -89,6 +103,14 @@
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.target.acl_submit_applications</name>
<value>*</value>
<description>
The ACL of who can submit jobs to the target queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.acl_administer_queue</name>
<value>*</value>
@ -97,6 +119,14 @@
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.target.acl_administer_queue</name>
<value>*</value>
<description>
The ACL of who can administer jobs on the target queue.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.decided.reservable</name>
<value>true</value>