From f71fd885be48bfa1f5bd0686519bed90a2fda561 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 7 Dec 2022 00:17:05 +0800 Subject: [PATCH] YARN-11373. [Federation] Support refreshQueues refreshNodes API's for Federation. (#5146) --- .../protocolrecords/RefreshNodesRequest.java | 25 +++++++ .../protocolrecords/RefreshQueuesRequest.java | 17 +++++ ...erver_resourcemanager_service_protos.proto | 2 + .../filecontroller/ifile/package-info.java | 6 +- .../filecontroller/tfile/package-info.java | 6 +- .../yarn/security/admin/package-info.java | 6 +- .../impl/pb/RefreshNodesRequestPBImpl.java | 22 +++++- .../impl/pb/RefreshQueuesRequestPBImpl.java | 33 ++++++++- .../yarn/server/router/RouterMetrics.java | 38 +++++++++- .../rmadmin/FederationRMAdminInterceptor.java | 73 ++++++++++++++++++- .../router/rmadmin/RMAdminProtocolMethod.java | 62 +++++++++++++++- .../TestFederationRMAdminInterceptor.java | 70 +++++++++++++++++- 12 files changed, 330 insertions(+), 30 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java index fcbef039f2..1675e3ace4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java @@ -53,6 +53,17 @@ public static RefreshNodesRequest newInstance( return request; } + @Private + @Unstable + public static RefreshNodesRequest newInstance( + DecommissionType decommissionType, Integer timeout, String subClusterId) { + RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class); + request.setDecommissionType(decommissionType); + request.setDecommissionTimeout(timeout); + request.setSubClusterId(subClusterId); + return request; + } + /** * Set the DecommissionType. * @@ -80,4 +91,18 @@ public static RefreshNodesRequest newInstance( * @return decommissionTimeout */ public abstract Integer getDecommissionTimeout(); + + /** + * Get the subClusterId. + * + * @return subClusterId. + */ + public abstract String getSubClusterId(); + + /** + * Set the subClusterId. + * + * @param subClusterId subCluster Id. + */ + public abstract void setSubClusterId(String subClusterId); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshQueuesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshQueuesRequest.java index eff4b7f4d2..ba332ad40c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshQueuesRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshQueuesRequest.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.util.Records; @@ -33,4 +34,20 @@ public static RefreshQueuesRequest newInstance() { Records.newRecord(RefreshQueuesRequest.class); return request; } + + @Public + @Stable + public static RefreshQueuesRequest newInstance(String subClusterId) { + RefreshQueuesRequest request = Records.newRecord(RefreshQueuesRequest.class); + request.setSubClusterId(subClusterId); + return request; + } + + @Public + @Unstable + public abstract String getSubClusterId(); + + @Private + @Unstable + public abstract void setSubClusterId(String subClusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index 3f9913b989..e1bf9edfcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -32,6 +32,7 @@ package hadoop.yarn; import "yarn_protos.proto"; message RefreshQueuesRequestProto { + optional string sub_cluster_id = 1; } message RefreshQueuesResponseProto { } @@ -39,6 +40,7 @@ message RefreshQueuesResponseProto { message RefreshNodesRequestProto { optional DecommissionTypeProto decommissionType = 1 [default = NORMAL]; optional int32 decommissionTimeout = 2; + optional string sub_cluster_id = 3; } message RefreshNodesResponseProto { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java index 08ddecef5d..9cbc99baad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@InterfaceAudience.Public +@Public package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; -import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Public; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java index b2e91ab48a..e014350ec2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@InterfaceAudience.Public +@Public package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile; -import org.apache.hadoop.classification.InterfaceAudience; \ No newline at end of file +import org.apache.hadoop.classification.InterfaceAudience.Public; \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/admin/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/admin/package-info.java index c66be222ae..99b857ac2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/admin/package-info.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/admin/package-info.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -@InterfaceAudience.Public +@Public package org.apache.hadoop.yarn.security.admin; -import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Public; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java index 62a82912b5..a14aae74f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java @@ -31,9 +31,9 @@ @Private @Unstable public class RefreshNodesRequestPBImpl extends RefreshNodesRequest { - RefreshNodesRequestProto proto = RefreshNodesRequestProto.getDefaultInstance(); - RefreshNodesRequestProto.Builder builder = null; - boolean viaProto = false; + private RefreshNodesRequestProto proto = RefreshNodesRequestProto.getDefaultInstance(); + private RefreshNodesRequestProto.Builder builder = null; + private boolean viaProto = false; private DecommissionType decommissionType; public RefreshNodesRequestPBImpl() { @@ -123,6 +123,22 @@ public synchronized Integer getDecommissionTimeout() { return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null; } + @Override + public synchronized String getSubClusterId() { + RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasSubClusterId()) ? p.getSubClusterId() : null; + } + + @Override + public synchronized void setSubClusterId(String subClusterId) { + maybeInitBuilder(); + if (subClusterId == null) { + builder.clearSubClusterId(); + return; + } + builder.setSubClusterId(subClusterId); + } + private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) { return DecommissionType.valueOf(p.name()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshQueuesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshQueuesRequestPBImpl.java index c21ec6d362..2c174ad18f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshQueuesRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshQueuesRequestPBImpl.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; import org.apache.hadoop.thirdparty.protobuf.TextFormat; @@ -29,9 +30,9 @@ @Unstable public class RefreshQueuesRequestPBImpl extends RefreshQueuesRequest { - RefreshQueuesRequestProto proto = RefreshQueuesRequestProto.getDefaultInstance(); - RefreshQueuesRequestProto.Builder builder = null; - boolean viaProto = false; + private RefreshQueuesRequestProto proto = RefreshQueuesRequestProto.getDefaultInstance(); + private RefreshQueuesRequestProto.Builder builder = null; + private boolean viaProto = false; public RefreshQueuesRequestPBImpl() { builder = RefreshQueuesRequestProto.newBuilder(); @@ -55,8 +56,9 @@ public int hashCode() { @Override public boolean equals(Object other) { - if (other == null) + if (other == null) { return false; + } if (other.getClass().isAssignableFrom(this.getClass())) { return this.getProto().equals(this.getClass().cast(other).getProto()); } @@ -67,4 +69,27 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RefreshQueuesRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String getSubClusterId() { + RefreshQueuesRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasSubClusterId()) ? p.getSubClusterId() : null; + } + + @Override + public void setSubClusterId(String clusterId) { + maybeInitBuilder(); + if (clusterId == null) { + builder.clearSubClusterId(); + return; + } + builder.setSubClusterId(clusterId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 31d838d1b3..3268889c95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -127,6 +127,8 @@ public final class RouterMetrics { private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved; @Metric("# of checkUserAccessToQueue failed to be retrieved") private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved; + @Metric("# of refreshNodes failed to be retrieved") + private MutableGaugeInt numRefreshNodesFailedRetrieved; @Metric("# of getDelegationToken failed to be retrieved") private MutableGaugeInt numGetDelegationTokenFailedRetrieved; @Metric("# of renewDelegationToken failed to be retrieved") @@ -221,6 +223,8 @@ public final class RouterMetrics { private MutableRate totalSucceededGetRMNodeLabelsRetrieved; @Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)") private MutableRate totalSucceededCheckUserAccessToQueueRetrieved; + @Metric("Total number of successful Retrieved RefreshNodes and latency(ms)") + private MutableRate totalSucceededRefreshNodesRetrieved; @Metric("Total number of successful Retrieved GetDelegationToken and latency(ms)") private MutableRate totalSucceededGetDelegationTokenRetrieved; @Metric("Total number of successful Retrieved RenewDelegationToken and latency(ms)") @@ -271,9 +275,10 @@ public final class RouterMetrics { private MutableQuantiles getUpdateQueueLatency; private MutableQuantiles getAppTimeoutLatency; private MutableQuantiles getAppTimeoutsLatency; - private MutableQuantiles getRefreshQueuesLatency; + private MutableQuantiles refreshQueuesLatency; private MutableQuantiles getRMNodeLabelsLatency; private MutableQuantiles checkUserAccessToQueueLatency; + private MutableQuantiles refreshNodesLatency; private MutableQuantiles getDelegationTokenLatency; private MutableQuantiles renewDelegationTokenLatency; private MutableQuantiles cancelDelegationTokenLatency; @@ -430,7 +435,7 @@ private RouterMetrics() { getAppTimeoutsLatency = registry.newQuantiles("getAppTimeoutsLatency", "latency of get apptimeouts timeouts", "ops", "latency", 10); - getRefreshQueuesLatency = registry.newQuantiles("getRefreshQueuesLatency", + refreshQueuesLatency = registry.newQuantiles("refreshQueuesLatency", "latency of get refresh queues timeouts", "ops", "latency", 10); getRMNodeLabelsLatency = registry.newQuantiles("getRMNodeLabelsLatency", @@ -439,6 +444,9 @@ private RouterMetrics() { checkUserAccessToQueueLatency = registry.newQuantiles("checkUserAccessToQueueLatency", "latency of get apptimeouts timeouts", "ops", "latency", 10); + refreshNodesLatency = registry.newQuantiles("refreshNodesLatency", + "latency of get refresh nodes timeouts", "ops", "latency", 10); + getDelegationTokenLatency = registry.newQuantiles("getDelegationTokenLatency", "latency of get delegation token timeouts", "ops", "latency", 10); @@ -447,6 +455,7 @@ private RouterMetrics() { cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency", "latency of cancel delegation token timeouts", "ops", "latency", 10); + } public static RouterMetrics getMetrics() { @@ -673,6 +682,11 @@ public long getNumSucceededRefreshQueuesRetrieved() { return totalSucceededRefreshQueuesRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededRefreshNodesRetrieved() { + return totalSucceededRefreshNodesRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededGetRMNodeLabelsRetrieved() { return totalSucceededGetRMNodeLabelsRetrieved.lastStat().numSamples(); @@ -903,6 +917,11 @@ public double getLatencySucceededRefreshQueuesRetrieved() { return totalSucceededRefreshQueuesRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededRefreshNodesRetrieved() { + return totalSucceededRefreshNodesRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededGetRMNodeLabelsRetrieved() { return totalSucceededGetRMNodeLabelsRetrieved.lastStat().mean(); @@ -1122,6 +1141,10 @@ public int getCheckUserAccessToQueueFailedRetrieved() { return numCheckUserAccessToQueueFailedRetrieved.value(); } + public int getNumRefreshNodesFailedRetrieved() { + return numRefreshNodesFailedRetrieved.value(); + } + public int getDelegationTokenFailedRetrieved() { return numGetDelegationTokenFailedRetrieved.value(); } @@ -1336,7 +1359,12 @@ public void succeededGetAppTimeoutsRetrieved(long duration) { public void succeededRefreshQueuesRetrieved(long duration) { totalSucceededRefreshQueuesRetrieved.add(duration); - getRefreshQueuesLatency.add(duration); + refreshQueuesLatency.add(duration); + } + + public void succeededRefreshNodesRetrieved(long duration) { + totalSucceededRefreshNodesRetrieved.add(duration); + refreshNodesLatency.add(duration); } public void succeededGetRMNodeLabelsRetrieved(long duration) { @@ -1536,6 +1564,10 @@ public void incrCheckUserAccessToQueueFailedRetrieved() { numCheckUserAccessToQueueFailedRetrieved.incr(); } + public void incrRefreshNodesFailedRetrieved() { + numRefreshNodesFailedRetrieved.incr(); + } + public void incrGetDelegationTokenFailedRetrieved() { numGetDelegationTokenFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index 4564f8d8b8..22ace295c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -145,6 +145,23 @@ public void setNextInterceptor(RMAdminRequestInterceptor next) { + "is correct"); } + /** + * Refresh queue requests. + * + * The Router supports refreshing all SubCluster queues at once, + * and also supports refreshing queues by SubCluster. + * + * @param request RefreshQueuesRequest, If subClusterId is not empty, + * it means that we want to refresh the queue of the specified subClusterId. + * If subClusterId is empty, it means we want to refresh all queues. + * + * @return RefreshQueuesResponse, There is no specific information in the response, + * as long as it is not empty, it means that the request is successful. + * + * @throws StandbyException exception thrown by non-active server. + * @throws YarnException indicates exceptions from yarn servers. + * @throws IOException io error occurs. + */ @Override public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) throws StandbyException, YarnException, IOException { @@ -161,8 +178,9 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod( new Class[] {RefreshQueuesRequest.class}, new Object[] {request}); + String subClusterId = request.getSubClusterId(); Collection refreshQueueResps = - remoteMethod.invokeConcurrent(this, RefreshQueuesResponse.class); + remoteMethod.invokeConcurrent(this, RefreshQueuesResponse.class, subClusterId); // If we get the return result from refreshQueueResps, // it means that the call has been successful, @@ -172,19 +190,66 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) routerMetrics.succeededRefreshQueuesRetrieved(stopTime - startTime); return RefreshQueuesResponse.newInstance(); } - } catch (Exception e) { + } catch (YarnException e) { routerMetrics.incrRefreshQueuesFailedRetrieved(); - RouterServerUtil.logAndThrowException("Unable to refreshQueue to exception.", e); + RouterServerUtil.logAndThrowException(e, "Unable to refreshQueue due to exception."); } routerMetrics.incrRefreshQueuesFailedRetrieved(); throw new YarnException("Unable to refreshQueue."); } + /** + * Refresh node requests. + * + * The Router supports refreshing all SubCluster nodes at once, + * and also supports refreshing node by SubCluster. + * + * @param request RefreshNodesRequest, If subClusterId is not empty, + * it means that we want to refresh the node of the specified subClusterId. + * If subClusterId is empty, it means we want to refresh all nodes. + * + * @return RefreshNodesResponse, There is no specific information in the response, + * as long as it is not empty, it means that the request is successful. + * + * @throws StandbyException exception thrown by non-active server. + * @throws YarnException indicates exceptions from yarn servers. + * @throws IOException io error occurs. + */ @Override public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws StandbyException, YarnException, IOException { - throw new NotImplementedException(); + + // parameter verification. + // We will not check whether the DecommissionType is empty, + // because this parameter has a default value at the proto level. + if (request == null) { + routerMetrics.incrRefreshNodesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing RefreshNodes request.", null); + } + + // call refreshNodes of activeSubClusters. + try { + long startTime = clock.getTime(); + RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod( + new Class[] {RefreshNodesRequest.class}, new Object[] {request}); + + String subClusterId = request.getSubClusterId(); + Collection refreshNodesResps = + remoteMethod.invokeConcurrent(this, RefreshNodesResponse.class, subClusterId); + + if (CollectionUtils.isNotEmpty(refreshNodesResps)) { + long stopTime = clock.getTime(); + routerMetrics.succeededRefreshNodesRetrieved(stopTime - startTime); + return RefreshNodesResponse.newInstance(); + } + } catch (YarnException e) { + routerMetrics.incrRefreshNodesFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, "Unable to refreshNodes due to exception."); + } + + routerMetrics.incrRefreshNodesFailedRetrieved(); + throw new YarnException("Unable to refreshNodes."); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java index e1aa806ff8..1a5b038f19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java @@ -37,12 +37,12 @@ import java.util.TreeMap; import java.util.List; import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; - /** * Class to define admin method, params and arguments. */ @@ -61,11 +61,15 @@ public RMAdminProtocolMethod(Class[] pTypes, Object... pParams) } public Collection invokeConcurrent(FederationRMAdminInterceptor interceptor, - Class clazz) throws YarnException { + Class clazz, String subClusterId) throws YarnException { this.rmAdminInterceptor = interceptor; this.federationFacade = FederationStateStoreFacade.getInstance(); this.configuration = interceptor.getConf(); - return invokeConcurrent(clazz); + if (StringUtils.isNotBlank(subClusterId)) { + return invoke(clazz, subClusterId); + } else { + return invokeConcurrent(clazz); + } } @Override @@ -107,7 +111,10 @@ protected Collection invokeConcurrent(Class clazz) throws YarnExceptio Pair pair = future.get(); subClusterId = pair.getKey(); Object result = pair.getValue(); - results.put(subClusterId, clazz.cast(result)); + if (result != null) { + R rResult = clazz.cast(result); + results.put(subClusterId, rResult); + } } catch (InterruptedException | ExecutionException e) { Throwable cause = e.getCause(); LOG.error("Cannot execute {} on {}: {}", methodName, subClusterId, cause.getMessage()); @@ -129,4 +136,51 @@ protected Collection invokeConcurrent(Class clazz) throws YarnExceptio // return result return results.values(); } + + /** + * Call the method in the protocol according to the subClusterId. + * + * @param clazz return type + * @param subClusterId subCluster Id + * @param Generic R + * @return response collection. + * @throws YarnException yarn exception. + */ + protected Collection invoke(Class clazz, String subClusterId) throws YarnException { + + // Get the method name to call + String methodName = Thread.currentThread().getStackTrace()[3].getMethodName(); + this.setMethodName(methodName); + + // Get Active SubClusters + Map subClusterInfoMap = + federationFacade.getSubClusters(true); + + // According to subCluster of string type, convert to SubClusterId type + SubClusterId subClusterIdKey = SubClusterId.newInstance(subClusterId); + + // If the provided subCluster is not Active or does not exist, + // an exception will be returned directly. + if (!subClusterInfoMap.containsKey(subClusterIdKey)) { + throw new YarnException("subClusterId = " + subClusterId + " is not an active subCluster."); + } + + // Call the method in the protocol and convert it according to clazz. + try { + ResourceManagerAdministrationProtocol protocol = + rmAdminInterceptor.getAdminRMProxyForSubCluster(subClusterIdKey); + Class[] types = this.getTypes(); + Object[] params = this.getParams(); + Method method = ResourceManagerAdministrationProtocol.class.getMethod(methodName, types); + Object result = method.invoke(protocol, params); + if (result != null) { + return Collections.singletonList(clazz.cast(result)); + } + } catch (Exception e) { + throw new YarnException("invoke Failed, An exception occurred in subClusterId = " + + subClusterId, e); + } + throw new YarnException("invoke Failed, An exception occurred in subClusterId = " + + subClusterId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java index 3aa61a68a3..e68e9dda3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java @@ -19,8 +19,11 @@ package org.apache.hadoop.yarn.server.router.rmadmin; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.yarn.api.records.DecommissionType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -31,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -77,7 +79,7 @@ public void setUp() { subClusters = new ArrayList<>(); try { for (int i = 0; i < NUM_SUBCLUSTER; i++) { - SubClusterId sc = SubClusterId.newInstance(Integer.toString(i)); + SubClusterId sc = SubClusterId.newInstance("SC-" + i); stateStoreUtil.registerSubCluster(sc); subClusters.add(sc); } @@ -114,8 +116,70 @@ public void tearDown() { } @Test - public void testRefreshQueues() throws IOException, YarnException { + public void testRefreshQueues() throws Exception { + // We will test 2 cases: + // case 1, request is null. + // case 2, normal request. + // If the request is null, a Missing RefreshQueues request exception will be thrown. + + // null request. + LambdaTestUtils.intercept(YarnException.class, "Missing RefreshQueues request.", + () -> interceptor.refreshQueues(null)); + + // normal request. RefreshQueuesRequest request = RefreshQueuesRequest.newInstance(); interceptor.refreshQueues(request); } + + @Test + public void testSC1RefreshQueues() throws Exception { + // We will test 2 cases: + // case 1, test the existing subCluster (SC-1). + // case 2, test the non-exist subCluster. + + String existSubCluster = "SC-1"; + RefreshQueuesRequest request = RefreshQueuesRequest.newInstance(existSubCluster); + interceptor.refreshQueues(request); + + String notExistsSubCluster = "SC-NON"; + RefreshQueuesRequest request1 = RefreshQueuesRequest.newInstance(notExistsSubCluster); + LambdaTestUtils.intercept(YarnException.class, + "subClusterId = SC-NON is not an active subCluster.", + () -> interceptor.refreshQueues(request1)); + } + + @Test + public void testRefreshNodes() throws Exception { + // We will test 2 cases: + // case 1, request is null. + // case 2, normal request. + // If the request is null, a Missing RefreshNodes request exception will be thrown. + + // null request. + LambdaTestUtils.intercept(YarnException.class, + "Missing RefreshNodes request.", () -> interceptor.refreshNodes(null)); + + // normal request. + RefreshNodesRequest request = RefreshNodesRequest.newInstance(DecommissionType.NORMAL); + interceptor.refreshNodes(request); + } + + @Test + public void testSC1RefreshNodes() throws Exception { + + // We will test 2 cases: + // case 1, test the existing subCluster (SC-1). + // case 2, test the non-exist subCluster. + + RefreshNodesRequest request = + RefreshNodesRequest.newInstance(DecommissionType.NORMAL, 10, "SC-1"); + interceptor.refreshNodes(request); + + String notExistsSubCluster = "SC-NON"; + RefreshNodesRequest request1 = RefreshNodesRequest.newInstance( + DecommissionType.NORMAL, 10, notExistsSubCluster); + LambdaTestUtils.intercept(YarnException.class, + "subClusterId = SC-NON is not an active subCluster.", + () -> interceptor.refreshNodes(request1)); + } }