diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c1fa8c0eca..e8e0467840 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3920,6 +3920,13 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = "yarnfederation/"; + public static final String FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY = + FEDERATION_PREFIX + "state-store.heartbeat.initial-delay"; + + // 30 secs + public static final int + DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY = 30; + public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = FEDERATION_PREFIX + "state-store.heartbeat-interval-secs"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 68d8ed92f0..313ac8b714 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3624,6 +3624,16 @@ yarn.federation.enabled false + + + Initial delay for federation state-store heartbeat service. Value is followed by a unit + specifier: ns, us, ms, s, m, h, d for nanoseconds, microseconds, milliseconds, seconds, + minutes, hours, days respectively. Values should provide units, + but seconds are assumed + + yarn.federation.state-store.heartbeat.initial-delay + 30s + Machine list file to be loaded by the FederationSubCluster Resolver diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java index 6fb43cad40..a473186ed6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java @@ -96,6 +96,7 @@ public class FederationStateStoreService extends AbstractService private FederationStateStore stateStoreClient = null; private SubClusterId subClusterId; private long heartbeatInterval; + private long heartbeatInitialDelay; private RMContext rmContext; public FederationStateStoreService(RMContext rmContext) { @@ -126,10 +127,24 @@ protected void serviceInit(Configuration conf) throws Exception { heartbeatInterval = conf.getLong( YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS, YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS); + if (heartbeatInterval <= 0) { heartbeatInterval = YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS; } + + heartbeatInitialDelay = conf.getTimeDuration( + YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, + TimeUnit.SECONDS); + + if (heartbeatInitialDelay <= 0) { + LOG.warn("{} configured value is wrong, must be > 0; using default value of {}", + YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY); + heartbeatInitialDelay = + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY; + } LOG.info("Initialized federation membership service."); super.serviceInit(conf); @@ -206,9 +221,9 @@ private void registerAndInitializeHeartbeat() { scheduledExecutorService = HadoopExecutors.newSingleThreadScheduledExecutor(); scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat, - heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS); - LOG.info("Started federation membership heartbeat with interval: {}", - heartbeatInterval); + heartbeatInitialDelay, heartbeatInterval, TimeUnit.SECONDS); + LOG.info("Started federation membership heartbeat with interval: {} and initial delay: {}", + heartbeatInterval, heartbeatInitialDelay); } @VisibleForTesting diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java index e5e156dcf7..e8ebdd5bed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; @@ -173,4 +174,37 @@ private String checkSubClusterInfo(SubClusterState state) return response.getCapability(); } + @Test + public void testFederationStateStoreServiceInitialHeartbeatDelay() throws Exception { + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10); + conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId()); + + GenericTestUtils.LogCapturer logCapture = + GenericTestUtils.LogCapturer.captureLogs(FederationStateStoreService.LOG); + + final MockRM rm = new MockRM(conf); + + // Initially there should be no entry for the sub-cluster + rm.init(conf); + stateStore = rm.getFederationStateStoreService().getStateStoreClient(); + GetSubClusterInfoResponse response = stateStore.getSubCluster(request); + Assert.assertNull(response); + + // Validate if sub-cluster is registered + rm.start(); + String capability = checkSubClusterInfo(SubClusterState.SC_NEW); + Assert.assertTrue(capability.isEmpty()); + + // Heartbeat to see if sub-cluster transitions to running + FederationStateStoreHeartbeat storeHeartbeat = + rm.getFederationStateStoreService().getStateStoreHeartbeatThread(); + storeHeartbeat.run(); + capability = checkSubClusterInfo(SubClusterState.SC_RUNNING); + checkClusterMetricsInfo(capability, 0); + + Assert.assertTrue(logCapture.getOutput().contains( + "Started federation membership heartbeat with interval: 300 and initial delay: 10")); + rm.stop(); + } }