From 22bd5e3b5353c3941ab3e1566d9b39fc1582d0ee Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 5 Oct 2022 00:17:00 +0800 Subject: [PATCH] YARN-11238. Optimizing FederationClientInterceptor Call with Parallelism. (#4904) --- .../impl/MemoryFederationStateStore.java | 10 + .../utils/FederationStateStoreFacade.java | 2 +- .../clientrm/FederationClientInterceptor.java | 193 +++++++----------- .../TestFederationClientInterceptor.java | 14 +- 4 files changed, 99 insertions(+), 120 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 4d545fb808..fe02afcd83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -497,4 +497,14 @@ private static DelegationKey getDelegationKeyByMasterKey(RouterMasterKey masterK public RouterRMDTSecretManagerState getRouterRMSecretManagerState() { return routerRMSecretManagerState; } + + @VisibleForTesting + public Map getMembership() { + return membership; + } + + @VisibleForTesting + public void setMembership(Map membership) { + this.membership = membership; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index b94e85bab4..2044f29099 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -727,7 +727,7 @@ public FederationStateStore getStateStore() { return stateStore; } - /** + /* * The Router Supports Store NewMasterKey (RouterMasterKey{@link RouterMasterKey}). * * @param newKey Key used for generating and verifying delegation tokens diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index ec6f5fbb0d..a027977e13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -18,10 +18,10 @@ package org.apache.hadoop.yarn.server.router.clientrm; -import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Random; import java.util.TreeMap; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -39,7 +40,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -661,14 +661,11 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request) RouterServerUtil.logAndThrowException("Missing getApplications request.", null); } long startTime = clock.getTime(); - Map subclusters = - federationFacade.getSubClusters(true); ClientMethod remoteMethod = new ClientMethod("getApplications", new Class[] {GetApplicationsRequest.class}, new Object[] {request}); - Map applications = null; + Collection applications = null; try { - applications = invokeConcurrent(subclusters.keySet(), remoteMethod, - GetApplicationsResponse.class); + applications = invokeConcurrent(remoteMethod, GetApplicationsResponse.class); } catch (Exception ex) { routerMetrics.incrMultipleAppsFailedRetrieved(); RouterServerUtil.logAndThrowException("Unable to get applications due to exception.", ex); @@ -676,7 +673,7 @@ public GetApplicationsResponse getApplications(GetApplicationsRequest request) long stopTime = clock.getTime(); routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime); // Merge the Application Reports - return RouterYarnClientUtils.mergeApplications(applications.values(), returnPartialReport); + return RouterYarnClientUtils.mergeApplications(applications, returnPartialReport); } @Override @@ -691,8 +688,7 @@ public GetClusterMetricsResponse getClusterMetrics( new Class[] {GetClusterMetricsRequest.class}, new Object[] {request}); Collection clusterMetrics = null; try { - clusterMetrics = invokeAppClientProtocolMethod( - true, remoteMethod, GetClusterMetricsResponse.class); + clusterMetrics = invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class); } catch (Exception ex) { routerMetrics.incrGetClusterMetricsFailedRetrieved(); RouterServerUtil.logAndThrowException("Unable to get cluster metrics due to exception.", ex); @@ -702,67 +698,62 @@ public GetClusterMetricsResponse getClusterMetrics( return RouterYarnClientUtils.merge(clusterMetrics); } - Map invokeConcurrent(ArrayList clusterIds, - ClientMethod request, Class clazz) throws YarnException, IOException { - List> callables = new ArrayList<>(); - List> futures = new ArrayList<>(); - Map exceptions = new TreeMap<>(); - for (SubClusterId subClusterId : clusterIds) { - callables.add(new Callable() { - @Override - public Object call() throws Exception { - ApplicationClientProtocol protocol = - getClientRMProxyForSubCluster(subClusterId); - Method method = ApplicationClientProtocol.class - .getMethod(request.getMethodName(), request.getTypes()); - return method.invoke(protocol, request.getParams()); - } + Collection invokeConcurrent(ClientMethod request, Class clazz) + throws YarnException { + + // Get Active SubClusters + Map subClusterInfo = federationFacade.getSubClusters(true); + Collection subClusterIds = subClusterInfo.keySet(); + + List>> callables = new ArrayList<>(); + List>> futures = new ArrayList<>(); + Map exceptions = new TreeMap<>(); + + // Generate parallel Callable tasks + for (SubClusterId subClusterId : subClusterIds) { + callables.add(() -> { + ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId); + String methodName = request.getMethodName(); + Class[] types = request.getTypes(); + Object[] params = request.getParams(); + Method method = ApplicationClientProtocol.class.getMethod(methodName, types); + Object result = method.invoke(protocol, params); + return Pair.of(subClusterId, result); }); } + + // Get results from multiple threads Map results = new TreeMap<>(); try { futures.addAll(executorService.invokeAll(callables)); - for (int i = 0; i < futures.size(); i++) { - SubClusterId subClusterId = clusterIds.get(i); + futures.stream().forEach(future -> { + SubClusterId subClusterId = null; try { - Future future = futures.get(i); - Object result = future.get(); + Pair pair = future.get(); + subClusterId = pair.getKey(); + Object result = pair.getValue(); results.put(subClusterId, clazz.cast(result)); - } catch (ExecutionException ex) { - Throwable cause = ex.getCause(); - LOG.debug("Cannot execute {} on {}: {}", request.getMethodName(), + } catch (InterruptedException | ExecutionException e) { + Throwable cause = e.getCause(); + LOG.error("Cannot execute {} on {}: {}", request.getMethodName(), subClusterId.getId(), cause.getMessage()); - IOException ioe; - if (cause instanceof IOException) { - ioe = (IOException) cause; - } else if (cause instanceof YarnException) { - throw (YarnException) cause; - } else { - ioe = new IOException( - "Unhandled exception while calling " + request.getMethodName() - + ": " + cause.getMessage(), cause); - } - // Store the exceptions - exceptions.put(subClusterId, ioe); + exceptions.put(subClusterId, e); } - } - if (results.isEmpty() && !clusterIds.isEmpty()) { - SubClusterId subClusterId = clusterIds.get(0); - IOException ioe = exceptions.get(subClusterId); - if (ioe != null) { - throw ioe; - } - } + }); } catch (InterruptedException e) { - throw new YarnException(e); + throw new YarnException("invokeConcurrent Failed.", e); } - return results; - } - Map invokeConcurrent(Collection clusterIds, - ClientMethod request, Class clazz) throws YarnException, IOException { - ArrayList clusterIdList = new ArrayList<>(clusterIds); - return invokeConcurrent(clusterIdList, request, clazz); + // All sub-clusters return results to be considered successful, + // otherwise an exception will be thrown. + if (exceptions != null && !exceptions.isEmpty()) { + Set subClusterIdSets = exceptions.keySet(); + throw new YarnException("invokeConcurrent Failed, An exception occurred in subClusterIds = " + + StringUtils.join(subClusterIdSets, ",")); + } + + // return result + return results.values(); } @Override @@ -773,24 +764,19 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) RouterServerUtil.logAndThrowException("Missing getClusterNodes request.", null); } long startTime = clock.getTime(); - Map subClusters = - federationFacade.getSubClusters(true); - Map clusterNodes = Maps.newHashMap(); - for (SubClusterId subClusterId : subClusters.keySet()) { - ApplicationClientProtocol client; - try { - client = getClientRMProxyForSubCluster(subClusterId); - GetClusterNodesResponse response = client.getClusterNodes(request); - clusterNodes.put(subClusterId, response); - } catch (Exception ex) { - routerMetrics.incrClusterNodesFailedRetrieved(); - RouterServerUtil.logAndThrowException("Unable to get cluster nodes due to exception.", ex); - } + ClientMethod remoteMethod = new ClientMethod("getClusterNodes", + new Class[]{GetClusterNodesRequest.class}, new Object[]{request}); + try { + Collection clusterNodes = + invokeConcurrent(remoteMethod, GetClusterNodesResponse.class); + long stopTime = clock.getTime(); + routerMetrics.succeededGetClusterNodesRetrieved(stopTime - startTime); + return RouterYarnClientUtils.mergeClusterNodesResponse(clusterNodes); + } catch (Exception ex) { + routerMetrics.incrClusterNodesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to get cluster nodes due to exception.", ex); } - long stopTime = clock.getTime(); - routerMetrics.succeededGetClusterNodesRetrieved(stopTime - startTime); - // Merge the NodesResponse - return RouterYarnClientUtils.mergeClusterNodesResponse(clusterNodes.values()); + throw new YarnException("Unable to get cluster nodes."); } @Override @@ -806,8 +792,7 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) new Class[]{GetQueueInfoRequest.class}, new Object[]{request}); Collection queues = null; try { - queues = invokeAppClientProtocolMethod(true, remoteMethod, - GetQueueInfoResponse.class); + queues = invokeConcurrent(remoteMethod, GetQueueInfoResponse.class); } catch (Exception ex) { routerMetrics.incrGetQueueInfoFailedRetrieved(); RouterServerUtil.logAndThrowException("Unable to get queue [" + @@ -831,8 +816,7 @@ public GetQueueUserAclsInfoResponse getQueueUserAcls( new Class[] {GetQueueUserAclsInfoRequest.class}, new Object[] {request}); Collection queueUserAcls = null; try { - queueUserAcls = invokeAppClientProtocolMethod(true, remoteMethod, - GetQueueUserAclsInfoResponse.class); + queueUserAcls = invokeConcurrent(remoteMethod, GetQueueUserAclsInfoResponse.class); } catch (Exception ex) { routerMetrics.incrQueueUserAclsFailedRetrieved(); RouterServerUtil.logAndThrowException("Unable to get queue user Acls due to exception.", ex); @@ -992,8 +976,7 @@ public ReservationListResponse listReservations( new Class[] {ReservationListRequest.class}, new Object[] {request}); Collection listResponses = null; try { - listResponses = invokeAppClientProtocolMethod(true, remoteMethod, - ReservationListResponse.class); + listResponses = invokeConcurrent(remoteMethod, ReservationListResponse.class); } catch (Exception ex) { routerMetrics.incrListReservationsFailedRetrieved(); RouterServerUtil.logAndThrowException( @@ -1072,24 +1055,6 @@ public ReservationDeleteResponse deleteReservation( throw new YarnException(msg); } - private Collection invokeAppClientProtocolMethod( - Boolean filterInactiveSubClusters, ClientMethod request, Class clazz) - throws YarnException, RuntimeException { - Map subClusters = - federationFacade.getSubClusters(filterInactiveSubClusters); - return subClusters.keySet().stream().map(subClusterId -> { - try { - ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId); - Method method = ApplicationClientProtocol.class. - getMethod(request.getMethodName(), request.getTypes()); - return clazz.cast(method.invoke(protocol, request.getParams())); - } catch (YarnException | NoSuchMethodException | - IllegalAccessException | InvocationTargetException ex) { - throw new RuntimeException(ex); - } - }).collect(Collectors.toList()); - } - @Override public GetNodesToLabelsResponse getNodeToLabels( GetNodesToLabelsRequest request) throws YarnException, IOException { @@ -1102,8 +1067,7 @@ public GetNodesToLabelsResponse getNodeToLabels( new Class[] {GetNodesToLabelsRequest.class}, new Object[] {request}); Collection clusterNodes = null; try { - clusterNodes = invokeAppClientProtocolMethod(true, remoteMethod, - GetNodesToLabelsResponse.class); + clusterNodes = invokeConcurrent(remoteMethod, GetNodesToLabelsResponse.class); } catch (Exception ex) { routerMetrics.incrNodeToLabelsFailedRetrieved(); RouterServerUtil.logAndThrowException("Unable to get node label due to exception.", ex); @@ -1126,8 +1090,7 @@ public GetLabelsToNodesResponse getLabelsToNodes( new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request}); Collection labelNodes = null; try { - labelNodes = invokeAppClientProtocolMethod(true, remoteMethod, - GetLabelsToNodesResponse.class); + labelNodes = invokeConcurrent(remoteMethod, GetLabelsToNodesResponse.class); } catch (Exception ex) { routerMetrics.incrLabelsToNodesFailedRetrieved(); RouterServerUtil.logAndThrowException("Unable to get label node due to exception.", ex); @@ -1150,8 +1113,7 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels( new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request}); Collection nodeLabels = null; try { - nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod, - GetClusterNodeLabelsResponse.class); + nodeLabels = invokeConcurrent(remoteMethod, GetClusterNodeLabelsResponse.class); } catch (Exception ex) { routerMetrics.incrClusterNodeLabelsFailedRetrieved(); RouterServerUtil.logAndThrowException("Unable to get cluster nodeLabels due to exception.", @@ -1563,8 +1525,7 @@ public GetAllResourceProfilesResponse getResourceProfiles( new Class[] {GetAllResourceProfilesRequest.class}, new Object[] {request}); Collection resourceProfiles = null; try { - resourceProfiles = invokeAppClientProtocolMethod(true, remoteMethod, - GetAllResourceProfilesResponse.class); + resourceProfiles = invokeConcurrent(remoteMethod, GetAllResourceProfilesResponse.class); } catch (Exception ex) { routerMetrics.incrGetResourceProfilesFailedRetrieved(); RouterServerUtil.logAndThrowException("Unable to get resource profiles due to exception.", @@ -1588,8 +1549,7 @@ public GetResourceProfileResponse getResourceProfile( new Class[] {GetResourceProfileRequest.class}, new Object[] {request}); Collection resourceProfile = null; try { - resourceProfile = invokeAppClientProtocolMethod(true, remoteMethod, - GetResourceProfileResponse.class); + resourceProfile = invokeConcurrent(remoteMethod, GetResourceProfileResponse.class); } catch (Exception ex) { routerMetrics.incrGetResourceProfileFailedRetrieved(); RouterServerUtil.logAndThrowException("Unable to get resource profile due to exception.", @@ -1612,8 +1572,7 @@ public GetAllResourceTypeInfoResponse getResourceTypeInfo( new Class[] {GetAllResourceTypeInfoRequest.class}, new Object[] {request}); Collection listResourceTypeInfo; try { - listResourceTypeInfo = invokeAppClientProtocolMethod(true, remoteMethod, - GetAllResourceTypeInfoResponse.class); + listResourceTypeInfo = invokeConcurrent(remoteMethod, GetAllResourceTypeInfoResponse.class); } catch (Exception ex) { routerMetrics.incrResourceTypeInfoFailedRetrieved(); LOG.error("Unable to get all resource type info node due to exception.", ex); @@ -1644,8 +1603,8 @@ public GetAttributesToNodesResponse getAttributesToNodes( new Class[] {GetAttributesToNodesRequest.class}, new Object[] {request}); Collection attributesToNodesResponses = null; try { - attributesToNodesResponses = invokeAppClientProtocolMethod(true, remoteMethod, - GetAttributesToNodesResponse.class); + attributesToNodesResponses = + invokeConcurrent(remoteMethod, GetAttributesToNodesResponse.class); } catch (Exception ex) { routerMetrics.incrGetAttributesToNodesFailedRetrieved(); RouterServerUtil.logAndThrowException("Unable to get attributes to nodes due to exception.", @@ -1668,7 +1627,7 @@ public GetClusterNodeAttributesResponse getClusterNodeAttributes( new Class[] {GetClusterNodeAttributesRequest.class}, new Object[] {request}); Collection clusterNodeAttributesResponses = null; try { - clusterNodeAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod, + clusterNodeAttributesResponses = invokeConcurrent(remoteMethod, GetClusterNodeAttributesResponse.class); } catch (Exception ex) { routerMetrics.incrGetClusterNodeAttributesFailedRetrieved(); @@ -1693,7 +1652,7 @@ public GetNodesToAttributesResponse getNodesToAttributes( new Class[] {GetNodesToAttributesRequest.class}, new Object[] {request}); Collection nodesToAttributesResponses = null; try { - nodesToAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod, + nodesToAttributesResponses = invokeConcurrent(remoteMethod, GetNodesToAttributesResponse.class); } catch (Exception ex) { routerMetrics.incrGetNodesToAttributesFailedRetrieved(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index ac980b4858..1fc1e92033 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.test.LambdaTestUtils; @@ -127,6 +128,7 @@ import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -579,12 +581,20 @@ public void testGetClusterMetricsRequest() throws Exception { Assert.assertEquals(subClusters.size(), response.getClusterMetrics().getNumNodeManagers()); + // Clear Membership + Map membership = new HashMap<>(); + membership.putAll(stateStore.getMembership()); + stateStore.getMembership().clear(); + ClientMethod remoteMethod = new ClientMethod("getClusterMetrics", new Class[] {GetClusterMetricsRequest.class}, new Object[] {GetClusterMetricsRequest.newInstance()}); - Map clusterMetrics = interceptor. - invokeConcurrent(new ArrayList<>(), remoteMethod, GetClusterMetricsResponse.class); + Collection clusterMetrics = interceptor.invokeConcurrent( + remoteMethod, GetClusterMetricsResponse.class); Assert.assertTrue(clusterMetrics.isEmpty()); + + // Restore membership + stateStore.setMembership(membership); } /**