From 926993cb73f957eb191c0a830c6b5560585f95d8 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Tue, 28 Mar 2023 00:27:21 +0800 Subject: [PATCH] =?UTF-8?q?YARN-11376.=20[Federation]=20Support=20updateNo?= =?UTF-8?q?deResource=E3=80=81refreshNodesResources=20API's=20for=20Federa?= =?UTF-8?q?tion.=20(#5496)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../RefreshNodesResourcesRequest.java | 27 ++++++++ .../UpdateNodeResourceRequest.java | 30 +++++++- ...erver_resourcemanager_service_protos.proto | 2 + .../RefreshNodesResourcesRequestPBImpl.java | 24 +++++++ .../pb/UpdateNodeResourceRequestPBImpl.java | 16 +++++ .../yarn/server/router/RouterMetrics.java | 62 +++++++++++++++++ .../rmadmin/FederationRMAdminInterceptor.java | 69 ++++++++++++++++++- .../yarn/server/router/TestRouterMetrics.java | 64 +++++++++++++++++ .../TestFederationRMAdminInterceptor.java | 68 ++++++++++++++++++ .../TestableFederationRMAdminInterceptor.java | 3 +- 10 files changed, 360 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesResourcesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesResourcesRequest.java index f8c91f6437..bcffeb74ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesResourcesRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesResourcesRequest.java @@ -36,4 +36,31 @@ public static RefreshNodesResourcesRequest newInstance() { Records.newRecord(RefreshNodesResourcesRequest.class); return request; } + + @Public + @Evolving + public static RefreshNodesResourcesRequest newInstance(String subClusterId) { + RefreshNodesResourcesRequest request = + Records.newRecord(RefreshNodesResourcesRequest.class); + request.setSubClusterId(subClusterId); + return request; + } + + /** + * Get the subClusterId. + * + * @return subClusterId. + */ + @Public + @Evolving + public abstract String getSubClusterId(); + + /** + * Set the subClusterId. + * + * @param subClusterId subCluster Id. + */ + @Public + @Evolving + public abstract void setSubClusterId(String subClusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/UpdateNodeResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/UpdateNodeResourceRequest.java index d540ccebb4..cfe9313549 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/UpdateNodeResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/UpdateNodeResourceRequest.java @@ -51,7 +51,18 @@ public static UpdateNodeResourceRequest newInstance( request.setNodeResourceMap(nodeResourceMap); return request; } - + + @Public + @Evolving + public static UpdateNodeResourceRequest newInstance( + Map nodeResourceMap, String subClusterId) { + UpdateNodeResourceRequest request = + Records.newRecord(UpdateNodeResourceRequest.class); + request.setNodeResourceMap(nodeResourceMap); + request.setSubClusterId(subClusterId); + return request; + } + /** * Get the map from NodeId to ResourceOption. * @return the map of {@code } @@ -68,4 +79,21 @@ public static UpdateNodeResourceRequest newInstance( @Evolving public abstract void setNodeResourceMap(Map nodeResourceMap); + /** + * Get the subClusterId. + * + * @return subClusterId. + */ + @Public + @Evolving + public abstract String getSubClusterId(); + + /** + * Set the subClusterId. + * + * @param subClusterId subCluster Id. + */ + @Public + @Evolving + public abstract void setSubClusterId(String subClusterId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index 4050a5b356..132f937e15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -79,12 +79,14 @@ message GetGroupsForUserResponseProto { message UpdateNodeResourceRequestProto { repeated NodeResourceMapProto node_resource_map = 1; + optional string sub_cluster_id = 2; } message UpdateNodeResourceResponseProto { } message RefreshNodesResourcesRequestProto { + optional string sub_cluster_id = 1; } message RefreshNodesResourcesResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesResourcesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesResourcesRequestPBImpl.java index 203fca19ee..1e866e608d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesResourcesRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesResourcesRequestPBImpl.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResourcesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResourcesRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest; import org.apache.hadoop.thirdparty.protobuf.TextFormat; @@ -69,4 +70,27 @@ public boolean equals(Object other) { public String toString() { return TextFormat.shortDebugString(getProto()); } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = RefreshNodesResourcesRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String getSubClusterId() { + RefreshNodesResourcesRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasSubClusterId()) ? p.getSubClusterId() : null; + } + + @Override + public void setSubClusterId(String subClusterId) { + maybeInitBuilder(); + if (subClusterId == null) { + builder.clearSubClusterId(); + return; + } + builder.setSubClusterId(subClusterId); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/UpdateNodeResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/UpdateNodeResourceRequestPBImpl.java index 0e05e731ad..512462cd19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/UpdateNodeResourceRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/UpdateNodeResourceRequestPBImpl.java @@ -66,6 +66,22 @@ public void setNodeResourceMap(Map nodeResourceMap) { this.nodeResourceMap.putAll(nodeResourceMap); } + @Override + public String getSubClusterId() { + UpdateNodeResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasSubClusterId()) ? p.getSubClusterId() : null; + } + + @Override + public void setSubClusterId(String subClusterId) { + maybeInitBuilder(); + if (subClusterId == null) { + builder.clearSubClusterId(); + return; + } + builder.setSubClusterId(subClusterId); + } + public UpdateNodeResourceRequestProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 3338013eba..3a581dfbd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -163,6 +163,10 @@ public final class RouterMetrics { private MutableGaugeInt numGetClusterInfoFailedRetrieved; @Metric("# of getClusterUserInfo failed to be retrieved") private MutableGaugeInt numGetClusterUserInfoFailedRetrieved; + @Metric("# of updateNodeResource failed to be retrieved") + private MutableGaugeInt numUpdateNodeResourceFailedRetrieved; + @Metric("# of refreshNodesResources failed to be retrieved") + private MutableGaugeInt numRefreshNodesResourcesFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -287,6 +291,10 @@ public final class RouterMetrics { private MutableRate totalSucceededGetClusterInfoRetrieved; @Metric("Total number of successful Retrieved GetClusterUserInfoRetrieved and latency(ms)") private MutableRate totalSucceededGetClusterUserInfoRetrieved; + @Metric("Total number of successful Retrieved UpdateNodeResource and latency(ms)") + private MutableRate totalSucceededUpdateNodeResourceRetrieved; + @Metric("Total number of successful Retrieved RefreshNodesResources and latency(ms)") + private MutableRate totalSucceededRefreshNodesResourcesRetrieved; /** * Provide quantile counters for all latencies. @@ -352,6 +360,8 @@ public final class RouterMetrics { private MutableQuantiles removeFromClusterNodeLabelsLatency; private MutableQuantiles getClusterInfoLatency; private MutableQuantiles getClusterUserInfoLatency; + private MutableQuantiles updateNodeResourceLatency; + private MutableQuantiles refreshNodesResourcesLatency; private static volatile RouterMetrics instance = null; private static MetricsRegistry registry; @@ -567,6 +577,12 @@ private RouterMetrics() { getClusterUserInfoLatency = registry.newQuantiles("getClusterUserInfoLatency", "latency of get cluster user info timeouts", "ops", "latency", 10); + + updateNodeResourceLatency = registry.newQuantiles("updateNodeResourceLatency", + "latency of update node resource timeouts", "ops", "latency", 10); + + refreshNodesResourcesLatency = registry.newQuantiles("refreshNodesResourcesLatency", + "latency of refresh nodes resources timeouts", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -873,6 +889,16 @@ public long getNumSucceededGetClusterUserInfoRetrieved() { return totalSucceededGetClusterUserInfoRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededUpdateNodeResourceRetrieved() { + return totalSucceededUpdateNodeResourceRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededRefreshNodesResourcesRetrieved() { + return totalSucceededRefreshNodesResourcesRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() { return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples(); @@ -1173,6 +1199,16 @@ public double getLatencySucceededGetClusterUserInfoRetrieved() { return totalSucceededGetClusterUserInfoRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededUpdateNodeResourceRetrieved() { + return totalSucceededUpdateNodeResourceRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededRefreshNodesResourcesRetrieved() { + return totalSucceededRefreshNodesResourcesRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() { return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean(); @@ -1426,6 +1462,14 @@ public int getClusterUserInfoFailedRetrieved() { return numGetClusterUserInfoFailedRetrieved.value(); } + public int getUpdateNodeResourceFailedRetrieved() { + return numUpdateNodeResourceFailedRetrieved.value(); + } + + public int getRefreshNodesResourcesFailedRetrieved() { + return numRefreshNodesResourcesFailedRetrieved.value(); + } + public int getDelegationTokenFailedRetrieved() { return numGetDelegationTokenFailedRetrieved.value(); } @@ -1739,6 +1783,16 @@ public void succeededGetClusterUserInfoRetrieved(long duration) { getClusterUserInfoLatency.add(duration); } + public void succeededUpdateNodeResourceRetrieved(long duration) { + totalSucceededUpdateNodeResourceRetrieved.add(duration); + updateNodeResourceLatency.add(duration); + } + + public void succeededRefreshNodesResourcesRetrieved(long duration) { + totalSucceededRefreshNodesResourcesRetrieved.add(duration); + refreshNodesResourcesLatency.add(duration); + } + public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) { totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration); refreshSuperUserGroupsConfLatency.add(duration); @@ -1967,6 +2021,14 @@ public void incrGetClusterUserInfoFailedRetrieved() { numGetClusterUserInfoFailedRetrieved.incr(); } + public void incrUpdateNodeResourceFailedRetrieved() { + numUpdateNodeResourceFailedRetrieved.incr(); + } + + public void incrRefreshNodesResourcesFailedRetrieved() { + numRefreshNodesResourcesFailedRetrieved.incr(); + } + public void incrGetDelegationTokenFailedRetrieved() { numGetDelegationTokenFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index 93e864bb98..c930459559 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -20,6 +20,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -112,7 +113,7 @@ public void init(String userName) { @VisibleForTesting protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster( - SubClusterId subClusterId) throws YarnException { + SubClusterId subClusterId) throws Exception { if (adminRMProxies.containsKey(subClusterId)) { return adminRMProxies.get(subClusterId); @@ -438,13 +439,75 @@ public RefreshServiceAclsResponse refreshServiceAcls(RefreshServiceAclsRequest r @Override public UpdateNodeResourceResponse updateNodeResource(UpdateNodeResourceRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + + // parameter verification. + if (request == null) { + routerMetrics.incrUpdateNodeResourceFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing UpdateNodeResource request.", null); + } + + String subClusterId = request.getSubClusterId(); + if (StringUtils.isBlank(subClusterId)) { + routerMetrics.incrUpdateNodeResourceFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing UpdateNodeResource SubClusterId.", null); + } + + try { + long startTime = clock.getTime(); + RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod( + new Class[]{UpdateNodeResourceRequest.class}, new Object[]{request}); + Collection updateNodeResourceResps = + remoteMethod.invokeConcurrent(this, UpdateNodeResourceResponse.class, subClusterId); + if (CollectionUtils.isNotEmpty(updateNodeResourceResps)) { + long stopTime = clock.getTime(); + routerMetrics.succeededUpdateNodeResourceRetrieved(stopTime - startTime); + return UpdateNodeResourceResponse.newInstance(); + } + } catch (YarnException e) { + routerMetrics.incrUpdateNodeResourceFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to updateNodeResource due to exception. " + e.getMessage()); + } + + routerMetrics.incrUpdateNodeResourceFailedRetrieved(); + throw new YarnException("Unable to updateNodeResource."); } @Override public RefreshNodesResourcesResponse refreshNodesResources(RefreshNodesResourcesRequest request) throws YarnException, IOException { - throw new NotImplementedException(); + + // parameter verification. + if (request == null) { + routerMetrics.incrRefreshNodesResourcesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing RefreshNodesResources request.", null); + } + + String subClusterId = request.getSubClusterId(); + if (StringUtils.isBlank(subClusterId)) { + routerMetrics.incrRefreshNodesResourcesFailedRetrieved(); + RouterServerUtil.logAndThrowException("Missing RefreshNodesResources SubClusterId.", null); + } + + try { + long startTime = clock.getTime(); + RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod( + new Class[]{RefreshNodesResourcesRequest.class}, new Object[]{request}); + Collection refreshNodesResourcesResps = + remoteMethod.invokeConcurrent(this, RefreshNodesResourcesResponse.class, subClusterId); + if (CollectionUtils.isNotEmpty(refreshNodesResourcesResps)) { + long stopTime = clock.getTime(); + routerMetrics.succeededRefreshNodesResourcesRetrieved(stopTime - startTime); + return RefreshNodesResourcesResponse.newInstance(); + } + } catch (YarnException e) { + routerMetrics.incrRefreshNodesResourcesFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to refreshNodesResources due to exception. " + e.getMessage()); + } + + routerMetrics.incrRefreshNodesResourcesFailedRetrieved(); + throw new YarnException("Unable to refreshNodesResources."); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 955948c91c..4af7e8c7f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -578,6 +578,16 @@ public void getClusterUserInfoFailed() { LOG.info("Mocked: failed getClusterUserInfo call"); metrics.incrGetClusterUserInfoFailedRetrieved(); } + + public void getUpdateNodeResourceFailed() { + LOG.info("Mocked: failed getClusterUserInfo call"); + metrics.incrUpdateNodeResourceFailedRetrieved(); + } + + public void getRefreshNodesResourcesFailed() { + LOG.info("Mocked: failed refreshNodesResources call"); + metrics.incrRefreshNodesResourcesFailedRetrieved(); + } } // Records successes for all calls @@ -858,6 +868,16 @@ public void getClusterUserInfoRetrieved(long duration) { LOG.info("Mocked: successful GetClusterUserInfoRetrieved call with duration {}", duration); metrics.succeededGetClusterUserInfoRetrieved(duration); } + + public void getUpdateNodeResourceRetrieved(long duration) { + LOG.info("Mocked: successful UpdateNodeResourceRetrieved call with duration {}", duration); + metrics.succeededUpdateNodeResourceRetrieved(duration); + } + + public void getRefreshNodesResourcesRetrieved(long duration) { + LOG.info("Mocked: successful RefreshNodesResourcesRetrieved call with duration {}", duration); + metrics.succeededRefreshNodesResourcesRetrieved(duration); + } } @Test @@ -1912,4 +1932,48 @@ public void testGetClusterUserInfoRetrieved() { Assert.assertEquals(225, metrics.getLatencySucceededGetClusterUserInfoRetrieved(), ASSERT_DOUBLE_DELTA); } + + @Test + public void testUpdateNodeResourceRetrievedFailed() { + long totalBadBefore = metrics.getUpdateNodeResourceFailedRetrieved(); + badSubCluster.getUpdateNodeResourceFailed(); + Assert.assertEquals(totalBadBefore + 1, metrics.getUpdateNodeResourceFailedRetrieved()); + } + + @Test + public void testUpdateNodeResourceRetrieved() { + long totalGoodBefore = metrics.getNumSucceededGetClusterUserInfoRetrieved(); + goodSubCluster.getUpdateNodeResourceRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededUpdateNodeResourceRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededUpdateNodeResourceRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getUpdateNodeResourceRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededUpdateNodeResourceRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededUpdateNodeResourceRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testRefreshNodesResourcesRetrievedFailed() { + long totalBadBefore = metrics.getRefreshNodesResourcesFailedRetrieved(); + badSubCluster.getRefreshNodesResourcesFailed(); + Assert.assertEquals(totalBadBefore + 1, metrics.getRefreshNodesResourcesFailedRetrieved()); + } + + @Test + public void testRefreshNodesResourcesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededRefreshNodesResourcesRetrieved(); + goodSubCluster.getRefreshNodesResourcesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededRefreshNodesResourcesRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededRefreshNodesResourcesRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getRefreshNodesResourcesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededRefreshNodesResourcesRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededRefreshNodesResourcesRetrieved(), ASSERT_DOUBLE_DELTA); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java index 60a782bd8a..7449c8474d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java @@ -22,6 +22,9 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.yarn.api.records.DecommissionType; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -35,6 +38,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; @@ -45,7 +52,9 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.Assert.assertNotNull; @@ -63,6 +72,7 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest { //////////////////////////////// private final static String USER_NAME = "test-user"; private final static int NUM_SUBCLUSTER = 4; + private final static int GB = 1024; private TestableFederationRMAdminInterceptor interceptor; private FederationStateStoreFacade facade; @@ -320,4 +330,62 @@ public void testSC1RefreshServiceAcls() throws Exception { LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.", () -> interceptor.refreshServiceAcls(request1)); } + + @Test + public void testUpdateNodeResourceEmptyRequest() throws Exception { + // null request1. + LambdaTestUtils.intercept(YarnException.class, "Missing UpdateNodeResource request.", + () -> interceptor.updateNodeResource(null)); + + // null request2. + Map nodeResourceMap = new HashMap<>(); + UpdateNodeResourceRequest request = UpdateNodeResourceRequest.newInstance(nodeResourceMap); + LambdaTestUtils.intercept(YarnException.class, "Missing UpdateNodeResource SubClusterId.", + () -> interceptor.updateNodeResource(request)); + } + + @Test + public void testUpdateNodeResourceNormalRequest() throws Exception { + // case 1, test the existing subCluster (SC-1). + Map nodeResourceMap = new HashMap<>(); + NodeId nodeId = NodeId.newInstance("127.0.0.1", 1); + ResourceOption resourceOption = + ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1); + nodeResourceMap.put(nodeId, resourceOption); + UpdateNodeResourceRequest request = + UpdateNodeResourceRequest.newInstance(nodeResourceMap, "SC-1"); + UpdateNodeResourceResponse response = interceptor.updateNodeResource(request); + assertNotNull(response); + + // case 2, test the non-exist subCluster. + UpdateNodeResourceRequest request1 = + UpdateNodeResourceRequest.newInstance(nodeResourceMap, "SC-NON"); + LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.", + () -> interceptor.updateNodeResource(request1)); + } + + @Test + public void testRefreshNodesResourcesEmptyRequest() throws Exception { + // null request1. + LambdaTestUtils.intercept(YarnException.class, "Missing RefreshNodesResources request.", + () -> interceptor.refreshNodesResources(null)); + + // null request2. + RefreshNodesResourcesRequest request = RefreshNodesResourcesRequest.newInstance(); + LambdaTestUtils.intercept(YarnException.class, "Missing RefreshNodesResources SubClusterId.", + () -> interceptor.refreshNodesResources(request)); + } + + @Test + public void testRefreshNodesResourcesNormalRequest() throws Exception { + // case 1, test the existing subCluster (SC-1). + RefreshNodesResourcesRequest request = RefreshNodesResourcesRequest.newInstance("SC-1"); + RefreshNodesResourcesResponse response = interceptor.refreshNodesResources(request); + assertNotNull(response); + + // case 2, test the non-exist subCluster. + RefreshNodesResourcesRequest request1 = RefreshNodesResourcesRequest.newInstance("SC-NON"); + LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.", + () -> interceptor.refreshNodesResources(request1)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java index b95bcd4a62..29d06385e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestableFederationRMAdminInterceptor.java @@ -52,7 +52,7 @@ public class TestableFederationRMAdminInterceptor extends FederationRMAdminInter @Override protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster( - SubClusterId subClusterId) throws YarnException { + SubClusterId subClusterId) throws Exception { MockRM mockRM; synchronized (this) { if (mockRMs.containsKey(subClusterId)) { @@ -66,6 +66,7 @@ protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster( } mockRM.init(config); mockRM.start(); + mockRM.registerNode("127.0.0.1:1", 102400, 100); mockRMs.put(subClusterId, mockRM); } return mockRM.getAdminService();