From edeb99548ab92430ae4f390d18d20848ee448542 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Tue, 26 Jul 2022 01:05:45 +0800 Subject: [PATCH] YARN-11161. Support getAttributesToNodes, getClusterNodeAttributes, getNodesToAttributes API's for Federation (#4610) --- .../yarn/server/router/RouterMetrics.java | 107 ++++++++++++- .../clientrm/FederationClientInterceptor.java | 98 +++++++++--- .../clientrm/RouterYarnClientUtils.java | 59 ++++++- .../yarn/server/router/TestRouterMetrics.java | 99 ++++++++++++ .../TestFederationClientInterceptor.java | 88 ++++++++++ .../clientrm/TestRouterYarnClientUtils.java | 151 ++++++++++++++++++ .../TestableFederationClientInterceptor.java | 34 ++++ 7 files changed, 604 insertions(+), 32 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 42a22600d2..d6ce729b16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -91,6 +91,12 @@ public final class RouterMetrics { private MutableGaugeInt numGetResourceProfilesFailedRetrieved; @Metric("# of getResourceProfile failed to be retrieved") private MutableGaugeInt numGetResourceProfileFailedRetrieved; + @Metric("# of getAttributesToNodes failed to be retrieved") + private MutableGaugeInt numGetAttributesToNodesFailedRetrieved; + @Metric("# of getClusterNodeAttributes failed to be retrieved") + private MutableGaugeInt numGetClusterNodeAttributesFailedRetrieved; + @Metric("# of getNodesToAttributes failed to be retrieved") + private MutableGaugeInt numGetNodesToAttributesFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -101,14 +107,11 @@ public final class RouterMetrics { private MutableRate totalSucceededAppsCreated; @Metric("Total number of successful Retrieved app reports and latency(ms)") private MutableRate totalSucceededAppsRetrieved; - @Metric("Total number of successful Retrieved multiple apps reports and " - + "latency(ms)") + @Metric("Total number of successful Retrieved multiple apps reports and latency(ms)") private MutableRate totalSucceededMultipleAppsRetrieved; - @Metric("Total number of successful Retrieved " + - "appAttempt reports and latency(ms)") + @Metric("Total number of successful Retrieved appAttempt reports and latency(ms)") private MutableRate totalSucceededAppAttemptsRetrieved; - @Metric("Total number of successful Retrieved getClusterMetrics and " - + "latency(ms)") + @Metric("Total number of successful Retrieved getClusterMetrics and latency(ms)") private MutableRate totalSucceededGetClusterMetricsRetrieved; @Metric("Total number of successful Retrieved getClusterNodes and latency(ms)") private MutableRate totalSucceededGetClusterNodesRetrieved; @@ -144,9 +147,14 @@ public final class RouterMetrics { private MutableRate totalSucceededMoveApplicationAcrossQueuesRetrieved; @Metric("Total number of successful Retrieved getResourceProfiles and latency(ms)") private MutableRate totalSucceededGetResourceProfilesRetrieved; - @Metric("Total number of successful Retrieved getResourceProfile and latency(ms)") private MutableRate totalSucceededGetResourceProfileRetrieved; + @Metric("Total number of successful Retrieved getAttributesToNodes and latency(ms)") + private MutableRate totalSucceededGetAttributesToNodesRetrieved; + @Metric("Total number of successful Retrieved getClusterNodeAttributes and latency(ms)") + private MutableRate totalSucceededGetClusterNodeAttributesRetrieved; + @Metric("Total number of successful Retrieved getNodesToAttributes and latency(ms)") + private MutableRate totalSucceededGetNodesToAttributesRetrieved; /** * Provide quantile counters for all latencies. @@ -176,6 +184,10 @@ public final class RouterMetrics { private MutableQuantiles moveApplicationAcrossQueuesLatency; private MutableQuantiles getResourceProfilesLatency; private MutableQuantiles getResourceProfileLatency; + private MutableQuantiles getAttributesToNodesLatency; + private MutableQuantiles getClusterNodeAttributesLatency; + + private MutableQuantiles getNodesToAttributesLatency; private static volatile RouterMetrics instance = null; private static MetricsRegistry registry; @@ -274,6 +286,18 @@ private RouterMetrics() { getResourceProfileLatency = registry.newQuantiles("getResourceProfileLatency", "latency of get resource profile timeouts", "ops", "latency", 10); + + getAttributesToNodesLatency = + registry.newQuantiles("getAttributesToNodesLatency", + "latency of get attributes to nodes timeouts", "ops", "latency", 10); + + getClusterNodeAttributesLatency = + registry.newQuantiles("getClusterNodeAttributesLatency", + "latency of get cluster node attributes timeouts", "ops", "latency", 10); + + getNodesToAttributesLatency = + registry.newQuantiles("getNodesToAttributesLatency", + "latency of get nodes to attributes timeouts", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -420,6 +444,21 @@ public long getNumSucceededGetResourceProfileRetrieved() { return totalSucceededGetResourceProfileRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededGetAttributesToNodesRetrieved() { + return totalSucceededGetAttributesToNodesRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededGetClusterNodeAttributesRetrieved() { + return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededGetNodesToAttributesRetrieved() { + return totalSucceededGetNodesToAttributesRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public double getLatencySucceededAppsCreated() { return totalSucceededAppsCreated.lastStat().mean(); @@ -545,6 +584,21 @@ public double getLatencySucceededGetResourceProfileRetrieved() { return totalSucceededGetResourceProfileRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededGetAttributesToNodesRetrieved() { + return totalSucceededGetAttributesToNodesRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededGetClusterNodeAttributesRetrieved() { + return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededGetNodesToAttributesRetrieved() { + return totalSucceededGetNodesToAttributesRetrieved.lastStat().mean(); + } + @VisibleForTesting public int getAppsFailedCreated() { return numAppsFailedCreated.value(); @@ -666,6 +720,18 @@ public int getResourceProfileFailedRetrieved() { return numGetResourceProfileFailedRetrieved.value(); } + public int getAttributesToNodesFailedRetrieved() { + return numGetAttributesToNodesFailedRetrieved.value(); + } + + public int getClusterNodeAttributesFailedRetrieved() { + return numGetClusterNodeAttributesFailedRetrieved.value(); + } + + public int getNodesToAttributesFailedRetrieved() { + return numGetNodesToAttributesFailedRetrieved.value(); + } + public void succeededAppsCreated(long duration) { totalSucceededAppsCreated.add(duration); getNewApplicationLatency.add(duration); @@ -791,6 +857,21 @@ public void succeededGetResourceProfileRetrieved(long duration) { getResourceProfileLatency.add(duration); } + public void succeededGetAttributesToNodesRetrieved(long duration) { + totalSucceededGetAttributesToNodesRetrieved.add(duration); + getAttributesToNodesLatency.add(duration); + } + + public void succeededGetClusterNodeAttributesRetrieved(long duration) { + totalSucceededGetClusterNodeAttributesRetrieved.add(duration); + getClusterNodeAttributesLatency.add(duration); + } + + public void succeededGetNodesToAttributesRetrieved(long duration) { + totalSucceededGetNodesToAttributesRetrieved.add(duration); + getNodesToAttributesLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -890,4 +971,16 @@ public void incrGetResourceProfilesFailedRetrieved() { public void incrGetResourceProfileFailedRetrieved() { numGetResourceProfileFailedRetrieved.incr(); } + + public void incrGetAttributesToNodesFailedRetrieved() { + numGetAttributesToNodesFailedRetrieved.incr(); + } + + public void incrGetClusterNodeAttributesFailedRetrieved() { + numGetClusterNodeAttributesFailedRetrieved.incr(); + } + + public void incrGetNodesToAttributesFailedRetrieved() { + numGetNodesToAttributesFailedRetrieved.incr(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 45cec64150..7fd1003552 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -175,7 +175,6 @@ public void init(String userName) { federationFacade = FederationStateStoreFacade.getInstance(); rand = new Random(System.currentTimeMillis()); - int numThreads = getConf().getInt( YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE, YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE); @@ -195,12 +194,11 @@ public void init(String userName) { LOG.error(e.getMessage()); } - numSubmitRetries = - conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, - YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); + numSubmitRetries = conf.getInt( + YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); - clientRMProxies = - new ConcurrentHashMap(); + clientRMProxies = new ConcurrentHashMap<>(); routerMetrics = RouterMetrics.getMetrics(); returnPartialReport = conf.getBoolean( @@ -227,19 +225,17 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( ApplicationClientProtocol clientRMProxy = null; try { boolean serviceAuthEnabled = getConf().getBoolean( - CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false); UserGroupInformation realUser = user; if (serviceAuthEnabled) { - realUser = UserGroupInformation.createProxyUser( - user.getShortUserName(), UserGroupInformation.getLoginUser()); + realUser = UserGroupInformation.createProxyUser(user.getShortUserName(), + UserGroupInformation.getLoginUser()); } clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(), ApplicationClientProtocol.class, subClusterId, realUser); } catch (Exception e) { RouterServerUtil.logAndThrowException( - "Unable to create the interface to reach the SubCluster " - + subClusterId, - e); + "Unable to create the interface to reach the SubCluster " + subClusterId, e); } clientRMProxies.put(subClusterId, clientRMProxy); @@ -287,8 +283,7 @@ public GetNewApplicationResponse getNewApplication( for (int i = 0; i < numSubmitRetries; ++i) { SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); - LOG.debug( - "getNewApplication try #{} on SubCluster {}", i, subClusterId); + LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId); ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId); GetNewApplicationResponse response = null; @@ -410,7 +405,7 @@ public SubmitApplicationResponse submitApplication( ApplicationId applicationId = request.getApplicationSubmissionContext().getApplicationId(); - List blacklist = new ArrayList(); + List blacklist = new ArrayList<>(); for (int i = 0; i < numSubmitRetries; ++i) { @@ -561,8 +556,8 @@ public KillApplicationResponse forceKillApplication( } if (response == null) { - LOG.error("No response when attempting to kill the application " - + applicationId + " to SubCluster " + subClusterId.getId()); + LOG.error("No response when attempting to kill the application {} to SubCluster {}.", + applicationId, subClusterId.getId()); } long stopTime = clock.getTime(); @@ -1015,7 +1010,7 @@ public GetLabelsToNodesResponse getLabelsToNodes( } long startTime = clock.getTime(); ClientMethod remoteMethod = new ClientMethod("getLabelsToNodes", - new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request}); + new Class[] {GetLabelsToNodesRequest.class}, new Object[] {request}); Collection labelNodes; try { labelNodes = invokeAppClientProtocolMethod(true, remoteMethod, @@ -1040,7 +1035,7 @@ public GetClusterNodeLabelsResponse getClusterNodeLabels( } long startTime = clock.getTime(); ClientMethod remoteMethod = new ClientMethod("getClusterNodeLabels", - new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request}); + new Class[] {GetClusterNodeLabelsRequest.class}, new Object[] {request}); Collection nodeLabels; try { nodeLabels = invokeAppClientProtocolMethod(true, remoteMethod, @@ -1528,20 +1523,75 @@ public void shutdown() { @Override public GetAttributesToNodesResponse getAttributesToNodes( GetAttributesToNodesRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getNodeAttributes() == null) { + routerMetrics.incrGetAttributesToNodesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getAttributesToNodes request " + + "or nodeAttributes.", null); + } + long startTime = clock.getTime(); + ClientMethod remoteMethod = new ClientMethod("getAttributesToNodes", + new Class[] {GetAttributesToNodesRequest.class}, new Object[] {request}); + Collection attributesToNodesResponses = null; + try { + attributesToNodesResponses = invokeAppClientProtocolMethod(true, remoteMethod, + GetAttributesToNodesResponse.class); + } catch (Exception ex) { + routerMetrics.incrGetAttributesToNodesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to get attributes to nodes due to exception.", + ex); + } + long stopTime = clock.getTime(); + routerMetrics.succeededGetAttributesToNodesRetrieved(stopTime - startTime); + return RouterYarnClientUtils.mergeAttributesToNodesResponse(attributesToNodesResponses); } @Override public GetClusterNodeAttributesResponse getClusterNodeAttributes( - GetClusterNodeAttributesRequest request) - throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + GetClusterNodeAttributesRequest request) throws YarnException, IOException { + if (request == null) { + routerMetrics.incrGetClusterNodeAttributesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getClusterNodeAttributes request.", null); + } + long startTime = clock.getTime(); + ClientMethod remoteMethod = new ClientMethod("getClusterNodeAttributes", + new Class[] {GetClusterNodeAttributesRequest.class}, new Object[] {request}); + Collection clusterNodeAttributesResponses = null; + try { + clusterNodeAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod, + GetClusterNodeAttributesResponse.class); + } catch (Exception ex) { + routerMetrics.incrGetClusterNodeAttributesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to get cluster node attributes due " + + " to exception.", ex); + } + long stopTime = clock.getTime(); + routerMetrics.succeededGetClusterNodeAttributesRetrieved(stopTime - startTime); + return RouterYarnClientUtils.mergeClusterNodeAttributesResponse(clusterNodeAttributesResponses); } @Override public GetNodesToAttributesResponse getNodesToAttributes( GetNodesToAttributesRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + if (request == null || request.getHostNames() == null) { + routerMetrics.incrGetNodesToAttributesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing getNodesToAttributes request or " + + "hostNames.", null); + } + long startTime = clock.getTime(); + ClientMethod remoteMethod = new ClientMethod("getNodesToAttributes", + new Class[] {GetNodesToAttributesRequest.class}, new Object[] {request}); + Collection nodesToAttributesResponses = null; + try { + nodesToAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod, + GetNodesToAttributesResponse.class); + } catch (Exception ex) { + routerMetrics.incrGetNodesToAttributesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Unable to get nodes to attributes due " + + " to exception.", ex); + } + long stopTime = clock.getTime(); + routerMetrics.succeededGetNodesToAttributesRetrieved(stopTime - startTime); + return RouterYarnClientUtils.mergeNodesToAttributesResponse(nodesToAttributesResponses); } protected SubClusterId getApplicationHomeSubCluster( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java index d72e72a6cf..e70d5521ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterYarnClientUtils.java @@ -37,6 +37,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; @@ -49,9 +52,12 @@ import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeAttributeKey; +import org.apache.hadoop.yarn.api.records.NodeToAttributeValue; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeInfo; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.apache.hadoop.yarn.util.Records; -import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -468,5 +474,56 @@ public static GetResourceProfileResponse mergeClusterResourceProfileResponse( profileResponse.setResource(resource); return profileResponse; } + + /** + * Merges a list of GetAttributesToNodesResponse. + * + * @param responses a list of GetAttributesToNodesResponse to merge. + * @return the merged GetAttributesToNodesResponse. + */ + public static GetAttributesToNodesResponse mergeAttributesToNodesResponse( + Collection responses) { + Map> nodeAttributeMap = new HashMap<>(); + for (GetAttributesToNodesResponse response : responses) { + if (response != null && response.getAttributesToNodes() != null) { + nodeAttributeMap.putAll(response.getAttributesToNodes()); + } + } + return GetAttributesToNodesResponse.newInstance(nodeAttributeMap); + } + + /** + * Merges a list of GetClusterNodeAttributesResponse. + * + * @param responses a list of GetClusterNodeAttributesResponse to merge. + * @return the merged GetClusterNodeAttributesResponse. + */ + public static GetClusterNodeAttributesResponse mergeClusterNodeAttributesResponse( + Collection responses) { + Set nodeAttributeInfo = new HashSet<>(); + for (GetClusterNodeAttributesResponse response : responses) { + if (response != null && response.getNodeAttributes() != null) { + nodeAttributeInfo.addAll(response.getNodeAttributes()); + } + } + return GetClusterNodeAttributesResponse.newInstance(nodeAttributeInfo); + } + + /** + * Merges a list of GetNodesToAttributesResponse. + * + * @param responses a list of GetNodesToAttributesResponse to merge. + * @return the merged GetNodesToAttributesResponse. + */ + public static GetNodesToAttributesResponse mergeNodesToAttributesResponse( + Collection responses) { + Map> attributesMap = new HashMap<>(); + for (GetNodesToAttributesResponse response : responses) { + if (response != null && response.getNodeToAttributes() != null) { + attributesMap.putAll(response.getNodeToAttributes()); + } + } + return GetNodesToAttributesResponse.newInstance(attributesMap); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 61fcd5385a..455cb229e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -438,6 +438,21 @@ public void getResourceProfileFailed() { LOG.info("Mocked: failed getResourceProfileFailed call"); metrics.incrGetResourceProfileFailedRetrieved(); } + + public void getAttributesToNodesFailed() { + LOG.info("Mocked: failed getAttributesToNodesFailed call"); + metrics.incrGetAttributesToNodesFailedRetrieved(); + } + + public void getClusterNodeAttributesFailed() { + LOG.info("Mocked: failed getClusterNodeAttributesFailed call"); + metrics.incrGetClusterNodeAttributesFailedRetrieved(); + } + + public void getNodesToAttributesFailed() { + LOG.info("Mocked: failed getNodesToAttributesFailed call"); + metrics.incrGetNodesToAttributesFailedRetrieved(); + } } // Records successes for all calls @@ -573,6 +588,21 @@ public void getResourceProfileRetrieved(long duration) { LOG.info("Mocked: successful getResourceProfile call with duration {}", duration); metrics.succeededGetResourceProfileRetrieved(duration); } + + public void getAttributesToNodesRetrieved(long duration) { + LOG.info("Mocked: successful getAttributesToNodes call with duration {}", duration); + metrics.succeededGetAttributesToNodesRetrieved(duration); + } + + public void getClusterNodeAttributesRetrieved(long duration) { + LOG.info("Mocked: successful getClusterNodeAttributes call with duration {}", duration); + metrics.succeededGetClusterNodeAttributesRetrieved(duration); + } + + public void getNodesToAttributesRetrieved(long duration) { + LOG.info("Mocked: successful getNodesToAttributes call with duration {}", duration); + metrics.succeededGetNodesToAttributesRetrieved(duration); + } } @Test @@ -970,4 +1000,73 @@ public void testGetResourceProfileRetrievedFailed() { Assert.assertEquals(totalBadBefore + 1, metrics.getResourceProfileFailedRetrieved()); } + + @Test + public void testSucceededGetAttributesToNodesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededGetAttributesToNodesRetrieved(); + goodSubCluster.getAttributesToNodesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetAttributesToNodesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getAttributesToNodesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetAttributesToNodesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededGetAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetAttributesToNodesRetrievedFailed() { + long totalBadBefore = metrics.getAttributesToNodesFailedRetrieved(); + badSubCluster.getAttributesToNodesFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getAttributesToNodesFailedRetrieved()); + } + + @Test + public void testGetClusterNodeAttributesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededGetClusterNodeAttributesRetrieved(); + goodSubCluster.getClusterNodeAttributesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetClusterNodeAttributesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetClusterNodeAttributesRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getClusterNodeAttributesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetClusterNodeAttributesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededGetClusterNodeAttributesRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetClusterNodeAttributesRetrievedFailed() { + long totalBadBefore = metrics.getClusterNodeAttributesFailedRetrieved(); + badSubCluster.getClusterNodeAttributesFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getClusterNodeAttributesFailedRetrieved()); + } + + @Test + public void testGetNodesToAttributesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededGetNodesToAttributesRetrieved(); + goodSubCluster.getNodesToAttributesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetNodesToAttributesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetNodesToAttributesRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getNodesToAttributesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetNodesToAttributesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededGetNodesToAttributesRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetNodesToAttributesRetrievedFailed() { + long totalBadBefore = metrics.getNodesToAttributesFailedRetrieved(); + badSubCluster.getNodesToAttributesFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getNodesToAttributesFailedRetrieved()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 49872e5a41..f0aa48082b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -82,6 +82,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -96,6 +102,11 @@ import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.NodeAttributeKey; +import org.apache.hadoop.yarn.api.records.NodeToAttributeValue; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeInfo; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; @@ -1233,4 +1244,81 @@ public void testGetResourceProfile() throws Exception { Assert.assertEquals(4096, response3.getResource().getMemorySize()); Assert.assertEquals(4, response3.getResource().getVirtualCores()); } + + @Test + public void testGetAttributesToNodes() throws Exception { + LOG.info("Test FederationClientInterceptor : Get AttributesToNodes request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing getAttributesToNodes request " + + "or nodeAttributes.", () -> interceptor.getAttributesToNodes(null)); + + // normal request + GetAttributesToNodesResponse response = + interceptor.getAttributesToNodes(GetAttributesToNodesRequest.newInstance()); + + Assert.assertNotNull(response); + Map> attrs = response.getAttributesToNodes(); + Assert.assertNotNull(attrs); + Assert.assertEquals(4, attrs.size()); + + NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvidia"); + NodeToAttributeValue attributeValue1 = + NodeToAttributeValue.newInstance("0-host1", gpu.getAttributeValue()); + NodeAttributeKey gpuKey = gpu.getAttributeKey(); + Assert.assertTrue(attrs.get(gpuKey).contains(attributeValue1)); + } + + @Test + public void testClusterNodeAttributes() throws Exception { + LOG.info("Test FederationClientInterceptor : Get ClusterNodeAttributes request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeAttributes request.", + () -> interceptor.getClusterNodeAttributes(null)); + + // normal request + GetClusterNodeAttributesResponse response = + interceptor.getClusterNodeAttributes(GetClusterNodeAttributesRequest.newInstance()); + + Assert.assertNotNull(response); + Set nodeAttributeInfos = response.getNodeAttributes(); + Assert.assertNotNull(nodeAttributeInfos); + Assert.assertEquals(4, nodeAttributeInfos.size()); + + NodeAttributeInfo nodeAttributeInfo1 = + NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"), + NodeAttributeType.STRING); + Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1)); + + NodeAttributeInfo nodeAttributeInfo2 = + NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("OS"), + NodeAttributeType.STRING); + Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2)); + } + + @Test + public void testNodesToAttributes() throws Exception { + LOG.info("Test FederationClientInterceptor : Get NodesToAttributes request."); + + // null request + LambdaTestUtils.intercept(YarnException.class, + "Missing getNodesToAttributes request or hostNames.", + () -> interceptor.getNodesToAttributes(null)); + + // normal request + Set hostNames = Collections.singleton("0-host1"); + GetNodesToAttributesResponse response = + interceptor.getNodesToAttributes(GetNodesToAttributesRequest.newInstance(hostNames)); + Assert.assertNotNull(response); + + Map> nodeAttributeMap = response.getNodeToAttributes(); + Assert.assertNotNull(nodeAttributeMap); + Assert.assertEquals(1, nodeAttributeMap.size()); + + NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvida"); + Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java index d586c4827e..33cae61230 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestRouterYarnClientUtils.java @@ -37,6 +37,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; @@ -53,6 +56,11 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.apache.hadoop.yarn.api.records.NodeAttributeKey; +import org.apache.hadoop.yarn.api.records.NodeToAttributeValue; +import org.apache.hadoop.yarn.api.records.NodeAttributeInfo; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager; import org.junit.Assert; @@ -610,4 +618,147 @@ public void testMergeResourceProfile() { Assert.assertEquals(3, resource.getVirtualCores()); Assert.assertEquals(3072, resource.getMemorySize()); } + + @Test + public void testMergeAttributesToNodesResponse() { + // normal response1 + NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvidia"); + Map> map1 = new HashMap<>(); + List lists1 = new ArrayList<>(); + NodeToAttributeValue attributeValue1 = + NodeToAttributeValue.newInstance("node1", gpu.getAttributeValue()); + lists1.add(attributeValue1); + map1.put(gpu.getAttributeKey(), lists1); + GetAttributesToNodesResponse response1 = GetAttributesToNodesResponse.newInstance(map1); + + // normal response2 + NodeAttribute docker = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER", + NodeAttributeType.STRING, "docker0"); + Map> map2 = new HashMap<>(); + List lists2 = new ArrayList<>(); + NodeToAttributeValue attributeValue2 = + NodeToAttributeValue.newInstance("node2", docker.getAttributeValue()); + lists2.add(attributeValue2); + map2.put(docker.getAttributeKey(), lists2); + GetAttributesToNodesResponse response2 = GetAttributesToNodesResponse.newInstance(map2); + + // empty response3 + GetAttributesToNodesResponse response3 = + GetAttributesToNodesResponse.newInstance(new HashMap<>()); + + // null response4 + GetAttributesToNodesResponse response4 = null; + + List responses = new ArrayList<>(); + responses.add(response1); + responses.add(response2); + responses.add(response3); + responses.add(response4); + + GetAttributesToNodesResponse response = + RouterYarnClientUtils.mergeAttributesToNodesResponse(responses); + + Assert.assertNotNull(response); + Assert.assertEquals(2, response.getAttributesToNodes().size()); + + Map> attrs = response.getAttributesToNodes(); + + NodeAttributeKey gpuKey = gpu.getAttributeKey(); + Assert.assertEquals(attributeValue1.toString(), attrs.get(gpuKey).get(0).toString()); + + NodeAttributeKey dockerKey = docker.getAttributeKey(); + Assert.assertEquals(attributeValue2.toString(), attrs.get(dockerKey).get(0).toString()); + } + + @Test + public void testMergeClusterNodeAttributesResponse() { + // normal response1 + NodeAttributeInfo nodeAttributeInfo1 = + NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"), + NodeAttributeType.STRING); + Set attributes1 = new HashSet<>(); + attributes1.add(nodeAttributeInfo1); + GetClusterNodeAttributesResponse response1 = + GetClusterNodeAttributesResponse.newInstance(attributes1); + + // normal response2 + NodeAttributeInfo nodeAttributeInfo2 = + NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("CPU"), + NodeAttributeType.STRING); + Set attributes2 = new HashSet<>(); + attributes2.add(nodeAttributeInfo2); + GetClusterNodeAttributesResponse response2 = + GetClusterNodeAttributesResponse.newInstance(attributes2); + + // empty response3 + GetClusterNodeAttributesResponse response3 = + GetClusterNodeAttributesResponse.newInstance(new HashSet<>()); + + // null response4 + GetClusterNodeAttributesResponse response4 = null; + + List responses = new ArrayList<>(); + responses.add(response1); + responses.add(response2); + responses.add(response3); + responses.add(response4); + + GetClusterNodeAttributesResponse response = + RouterYarnClientUtils.mergeClusterNodeAttributesResponse(responses); + + Assert.assertNotNull(response); + + Set nodeAttributeInfos = response.getNodeAttributes(); + Assert.assertEquals(2, nodeAttributeInfos.size()); + Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1)); + Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2)); + } + + @Test + public void testMergeNodesToAttributesResponse() { + // normal response1 + NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvida"); + NodeAttribute os = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS", + NodeAttributeType.STRING, "windows64"); + NodeAttribute dist = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION", + NodeAttributeType.STRING, "3_0_2"); + Map> node1Map = new HashMap<>(); + node1Map.put("node1", ImmutableSet.of(gpu, os, dist)); + GetNodesToAttributesResponse response1 = GetNodesToAttributesResponse.newInstance(node1Map); + + // normal response2 + NodeAttribute docker = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER", + NodeAttributeType.STRING, "docker0"); + Map> node2Map = new HashMap<>(); + node2Map.put("node2", ImmutableSet.of(docker)); + GetNodesToAttributesResponse response2 = GetNodesToAttributesResponse.newInstance(node2Map); + + // empty response3 + GetNodesToAttributesResponse response3 = + GetNodesToAttributesResponse.newInstance(new HashMap<>()); + + // null response4 + GetNodesToAttributesResponse response4 = null; + + List responses = new ArrayList<>(); + responses.add(response1); + responses.add(response2); + responses.add(response3); + responses.add(response4); + + GetNodesToAttributesResponse response = + RouterYarnClientUtils.mergeNodesToAttributesResponse(responses); + + Assert.assertNotNull(response); + + Map> hostToAttrs = response.getNodeToAttributes(); + Assert.assertNotNull(hostToAttrs); + Assert.assertEquals(2, hostToAttrs.size()); + Assert.assertTrue(hostToAttrs.get("node1").contains(dist)); + Assert.assertTrue(hostToAttrs.get("node1").contains(gpu)); + Assert.assertTrue(hostToAttrs.get("node1").contains(os)); + Assert.assertTrue(hostToAttrs.get("node2").contains(docker)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java index af1f45924c..7c82476ec4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -24,12 +24,19 @@ import java.net.ConnectException; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.NodeAttribute; +import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -82,6 +89,7 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( } mockRMs.put(subClusterId, mockRM); } + initNodeAttributes(subClusterId, mockRM); return mockRM.getClientRMService(); } } @@ -127,4 +135,30 @@ public ConcurrentHashMap getMockRMs() { public ConcurrentHashMap getMockNMs() { return mockNMs; } + + private void initNodeAttributes(SubClusterId subClusterId, MockRM mockRM) { + String node1 = subClusterId.getId() +"-host1"; + String node2 = subClusterId.getId() +"-host2"; + NodeAttributesManager mgr = mockRM.getRMContext().getNodeAttributesManager(); + NodeAttribute gpu = + NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU", + NodeAttributeType.STRING, "nvidia"); + NodeAttribute os = + NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS", + NodeAttributeType.STRING, "windows64"); + NodeAttribute docker = + NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER", + NodeAttributeType.STRING, "docker0"); + NodeAttribute dist = + NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION", + NodeAttributeType.STRING, "3_0_2"); + Map> nodes = new HashMap<>(); + nodes.put(node1, ImmutableSet.of(gpu, os, dist)); + nodes.put(node2, ImmutableSet.of(docker, dist)); + try { + mgr.addNodeAttributes(nodes); + } catch (IOException e) { + throw new RuntimeException(e); + } + } }