From 5c02f21f2e878227521d3ae9e6101c580eea192d Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 21 Jun 2023 01:25:46 +0800 Subject: [PATCH] YARN-11510. [Federation] Fix NodeManager#TestFederationInterceptor Flaky Unit Test. (#5733) --- .../hadoop/yarn/conf/YarnConfiguration.java | 4 ++++ .../src/main/resources/yarn-default.xml | 10 ++++++++++ .../yarn/server/MockResourceManagerFacade.java | 8 +++++++- .../amrmproxy/FederationInterceptor.java | 16 ++++++++++++++++ .../amrmproxy/TestFederationInterceptor.java | 14 +++++++++++++- 5 files changed, 50 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c89ee2625d..52d6003a10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3075,6 +3075,10 @@ public static boolean isAclEnabled(Configuration conf) { + "amrmproxy.enabled"; public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false; + public static final String AMRM_PROXY_WAIT_UAM_REGISTER_DONE = + NM_PREFIX + "amrmproxy.wait.uam-register.done"; + public static final boolean DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE = false; + public static final String AMRM_PROXY_ADDRESS = NM_PREFIX + "amrmproxy.address"; public static final int DEFAULT_AMRM_PROXY_PORT = 8049; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 76ad98e5db..395984e530 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5354,6 +5354,16 @@ + + + Whether we wait for uam registration to complete. + The default value is false. If we set it to true, + the UAM needs to be registered before attempting to allocate a container. + + yarn.nodemanager.amrmproxy.wait.uam-register.done + false + + YARN Federation supports Non-HA mode. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 999e66a040..c0ca3b5d8a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server; +import java.io.Closeable; import java.io.IOException; import java.net.ConnectException; import java.util.ArrayList; @@ -183,7 +184,7 @@ * change the implementation with care. */ public class MockResourceManagerFacade implements ApplicationClientProtocol, - ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { + ApplicationMasterProtocol, ResourceManagerAdministrationProtocol, Closeable { private static final Logger LOG = LoggerFactory.getLogger(MockResourceManagerFacade.class); @@ -967,4 +968,9 @@ public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterReq public HashMap> getApplicationContainerIdMap() { return applicationContainerIdMap; } + + @Override + public void close() throws IOException { + LOG.info("MockResourceManagerFacade Close."); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index ae6765cfb4..14a2d60c2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -251,6 +251,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // the maximum wait time for the first async heart beat response private long heartbeatMaxWaitTimeMs; + private boolean waitUamRegisterDone; + private MonotonicClock clock = new MonotonicClock(); /** @@ -353,6 +355,8 @@ public void init(AMRMProxyApplicationContext appContext) { this.subClusterTimeOut = YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT; } + this.waitUamRegisterDone = conf.getBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE, + YarnConfiguration.DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE); } @Override @@ -1332,6 +1336,18 @@ public void run() { }); this.uamRegisterFutures.put(scId, future); } + + if (this.waitUamRegisterDone) { + for (Map.Entry> entry : this.uamRegisterFutures.entrySet()) { + SubClusterId subClusterId = entry.getKey(); + Future future = entry.getValue(); + while (!future.isDone()) { + LOG.info("subClusterId {} Wait Uam Register done.", subClusterId); + } + } + } + + return newSubClusters; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index b33521932f..9e87543723 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -178,6 +179,9 @@ protected YarnConfiguration createConfiguration() { conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, 500); + // Wait UAM Register Down + conf.setBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE, true); + return conf; } @@ -593,6 +597,10 @@ public Object run() throws Exception { interceptor.recover(recoveredDataMap); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Waiting for SC-1 to time out. + GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000); + // SC1 should be initialized to be timed out Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); @@ -851,7 +859,7 @@ public Object run() throws Exception { List containers = getContainersAndAssert(numberOfContainers, numberOfContainers * 2); for (Container c : containers) { - LOG.info("Allocated container " + c.getId()); + LOG.info("Allocated container {}", c.getId()); } Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); @@ -885,6 +893,10 @@ public Object run() throws Exception { int numberOfContainers = 3; // Should re-attach secondaries and get the three running containers Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + + // Waiting for SC-1 to time out. + GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000); + // SC1 should be initialized to be timed out Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); Assert.assertEquals(numberOfContainers,