From 3f767a61b1da42dc5a23c6465a7fb47a51340fa3 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Fri, 20 Jan 2023 09:13:55 +0800 Subject: [PATCH] YARN-8900. [Follow Up] Fix FederationInterceptorREST#invokeConcurrent Inaccurate Order of Subclusters. (#5260) --- .../webapp/FederationInterceptorREST.java | 45 +++++++++----- .../router/webapp/dao/SubClusterResult.java | 59 +++++++++++++++++++ .../webapp/TestFederationInterceptorREST.java | 28 +++++++++ 3 files changed, 118 insertions(+), 14 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/SubClusterResult.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 808aba156a..73b0c5f2af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -45,7 +45,6 @@ import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; @@ -123,6 +122,7 @@ import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo; +import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; @@ -2532,7 +2532,7 @@ private Map invokeConcurrent(Collection c // If there is a sub-cluster access error, // we should choose whether to throw exception information according to user configuration. // Send the requests in parallel. - CompletionService> compSvc = new ExecutorCompletionService<>(threadpool); + CompletionService> compSvc = new ExecutorCompletionService<>(threadpool); // This part of the code should be able to expose the accessed Exception information. // We use Pair to store related information. The left value of the Pair is the response, @@ -2548,36 +2548,41 @@ private Map invokeConcurrent(Collection c getMethod(request.getMethodName(), request.getTypes()); Object retObj = method.invoke(interceptor, request.getParams()); R ret = clazz.cast(retObj); - return Pair.of(ret, null); + return new SubClusterResult<>(info, ret, null); } catch (Exception e) { LOG.error("SubCluster {} failed to call {} method.", info.getSubClusterId(), request.getMethodName(), e); - return Pair.of(null, e); + return new SubClusterResult<>(info, null, e); } }); } - clusterIds.stream().forEach(clusterId -> { + for (int i = 0; i < clusterIds.size(); i++) { + SubClusterInfo subClusterInfo = null; try { - Future> future = compSvc.take(); - Pair pair = future.get(); - R response = pair.getKey(); + Future> future = compSvc.take(); + SubClusterResult result = future.get(); + subClusterInfo = result.getSubClusterInfo(); + + R response = result.getResponse(); if (response != null) { - results.put(clusterId, response); + results.put(subClusterInfo, response); } - Exception exception = pair.getValue(); + + Exception exception = result.getException(); + // If allowPartialResult=false, it means that if an exception occurs in a subCluster, // an exception will be thrown directly. if (!allowPartialResult && exception != null) { throw exception; } } catch (Throwable e) { - String msg = String.format("SubCluster %s failed to %s report.", - clusterId.getSubClusterId(), request.getMethodName()); - LOG.error(msg, e); + String subClusterId = subClusterInfo != null ? + subClusterInfo.getSubClusterId().getId() : "UNKNOWN"; + LOG.error("SubCluster {} failed to {} report.", subClusterId, request.getMethodName(), e); throw new YarnRuntimeException(e.getCause().getMessage(), e); } - }); + } return results; } @@ -2648,4 +2653,16 @@ public Map getInterceptors() { public void setAllowPartialResult(boolean allowPartialResult) { this.allowPartialResult = allowPartialResult; } + + @VisibleForTesting + public Map invokeConcurrentGetNodeLabel() + throws IOException, YarnException { + Map subClustersActive = getActiveSubclusters(); + Class[] argsClasses = new Class[]{String.class}; + Object[] args = new Object[]{null}; + ClientMethod remoteMethod = new ClientMethod("getNodes", argsClasses, args); + Map nodesMap = + invokeConcurrent(subClustersActive.values(), remoteMethod, NodesInfo.class); + return nodesMap; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/SubClusterResult.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/SubClusterResult.java new file mode 100644 index 0000000000..2a527c28d1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/SubClusterResult.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.router.webapp.dao; + +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +public class SubClusterResult { + private SubClusterInfo subClusterInfo; + private R response; + private Exception exception; + + public SubClusterResult() { + } + + public SubClusterResult(SubClusterInfo subCluster, R res, Exception ex) { + this.subClusterInfo = subCluster; + this.response = res; + this.exception = ex; + } + + public SubClusterInfo getSubClusterInfo() { + return subClusterInfo; + } + + public void setSubClusterInfo(SubClusterInfo subClusterInfo) { + this.subClusterInfo = subClusterInfo; + } + + public Exception getException() { + return exception; + } + + public void setException(Exception exception) { + this.exception = exception; + } + + public R getResponse() { + return response; + } + + public void setResponse(R response) { + this.response = response; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index 225edaa896..070c883615 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -1534,6 +1534,34 @@ public void testCheckFederationInterceptorRESTClient() { Assert.assertEquals(webAppAddress, interceptorREST.getWebAppAddress()); } + @Test + public void testInvokeConcurrent() throws IOException, YarnException { + + // We design such a test case, we call the interceptor's getNodes interface, + // this interface will generate the following test data + // subCluster0 Node 0 + // subCluster1 Node 1 + // subCluster2 Node 2 + // subCluster3 Node 3 + // We use the returned data to verify whether the subClusterId + // of the multi-thread call can match the node data + Map subClusterInfoNodesInfoMap = + interceptor.invokeConcurrentGetNodeLabel(); + Assert.assertNotNull(subClusterInfoNodesInfoMap); + Assert.assertEquals(4, subClusterInfoNodesInfoMap.size()); + + subClusterInfoNodesInfoMap.forEach((subClusterInfo, nodesInfo) -> { + String subClusterId = subClusterInfo.getSubClusterId().getId(); + List nodeInfos = nodesInfo.getNodes(); + Assert.assertNotNull(nodeInfos); + Assert.assertEquals(1, nodeInfos.size()); + + String expectNodeId = "Node " + subClusterId; + String nodeId = nodeInfos.get(0).getNodeId(); + Assert.assertEquals(expectNodeId, nodeId); + }); + } + @Test public void testGetSchedulerInfo() { // In this test case, we will get the return results of 4 sub-clusters.