YARN-11620. [Federation] Improve FederationClientInterceptor To Return Partial Results of subClusters. (#6289) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
slfan1989 2023-11-29 07:11:35 +08:00 committed by GitHub
parent d72cdf7205
commit 478c4ced5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 2 deletions

View File

@ -208,6 +208,7 @@ public class FederationClientInterceptor
private final Clock clock = new MonotonicClock(); private final Clock clock = new MonotonicClock();
private boolean returnPartialReport; private boolean returnPartialReport;
private long submitIntervalTime; private long submitIntervalTime;
private boolean allowPartialResult;
@Override @Override
public void init(String userName) { public void init(String userName) {
@ -263,6 +264,10 @@ public void init(String userName) {
returnPartialReport = conf.getBoolean( returnPartialReport = conf.getBoolean(
YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED, YarnConfiguration.ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED); YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PARTIAL_RESULTS_ENABLED);
allowPartialResult = conf.getBoolean(
YarnConfiguration.ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED,
YarnConfiguration.DEFAULT_ROUTER_INTERCEPTOR_ALLOW_PARTIAL_RESULT_ENABLED);
} }
@Override @Override
@ -895,8 +900,10 @@ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
// All sub-clusters return results to be considered successful, // All sub-clusters return results to be considered successful,
// otherwise an exception will be thrown. // otherwise an exception will be thrown.
if (exceptions != null && !exceptions.isEmpty()) { if (exceptions != null && !exceptions.isEmpty()) {
throw new YarnException("invokeConcurrent Failed = " + if (!allowPartialResult || exceptions.keySet().size() == subClusterIds.size()) {
StringUtils.join(exceptions.values(), ",")); throw new YarnException("invokeConcurrent Failed = " +
StringUtils.join(exceptions.values(), ","));
}
} }
// return result // return result
@ -2350,4 +2357,9 @@ protected int getNumMaxThreads(Configuration conf) {
public void setNumSubmitRetries(int numSubmitRetries) { public void setNumSubmitRetries(int numSubmitRetries) {
this.numSubmitRetries = numSubmitRetries; this.numSubmitRetries = numSubmitRetries;
} }
@VisibleForTesting
public void setAllowPartialResult(boolean allowPartialResult) {
this.allowPartialResult = allowPartialResult;
}
} }

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@ -410,4 +411,21 @@ public void testGetClusterMetricsOneBadNodeWithRealError() throws Exception {
"subClusterId 1 exec getClusterMetrics error RM is stopped.", "subClusterId 1 exec getClusterMetrics error RM is stopped.",
() -> interceptor.getClusterMetrics(request)); () -> interceptor.getClusterMetrics(request));
} }
@Test
public void testGetClusterMetricsOneBadOneGoodNodeWithRealError() throws Exception {
LOG.info("Test getClusterMetrics with one bad and one good SubCluster.");
setupCluster(Arrays.asList(bad1, good));
GetClusterMetricsRequest request = GetClusterMetricsRequest.newInstance();
GetClusterMetricsResponse clusterMetrics = interceptor.getClusterMetrics(request);
Assert.assertNotNull(clusterMetrics);
// If partial results are not allowed to be returned, an exception will be thrown.
interceptor.setAllowPartialResult(false);
LambdaTestUtils.intercept(YarnException.class,
"subClusterId 1 exec getClusterMetrics error RM is stopped.",
() -> interceptor.getClusterMetrics(request));
interceptor.setAllowPartialResult(true);
}
} }