From 825108d08a9b49d7c19915624f9167101528cb95 Mon Sep 17 00:00:00 2001 From: Subru Krishnan Date: Thu, 13 Jul 2017 18:51:06 -0700 Subject: [PATCH] YARN-6815. FederationStateStoreFacade return behavior should be consistent irrespective of whether caching is enabled or not. (cherry picked from commit 8820693cd769065eed83193b673ec5d919d69500) --- .../policies/RouterPolicyFacade.java | 53 +++++++++++-------- .../store/FederationMembershipStateStore.java | 7 +-- .../store/FederationPolicyStore.java | 3 +- .../impl/MemoryFederationStateStore.java | 9 ++-- .../store/impl/SQLFederationStateStore.java | 11 ++-- .../utils/FederationStateStoreFacade.java | 20 ++++--- .../impl/FederationStateStoreBaseTest.java | 21 +++----- .../utils/FederationStateStoreTestUtil.java | 1 + .../utils/TestFederationStateStoreFacade.java | 16 +++++- 9 files changed, 84 insertions(+), 57 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java index 52c290553c..bbf08e0961 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java @@ -23,13 +23,10 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; @@ -38,6 +35,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -47,8 +46,8 @@ */ public class RouterPolicyFacade { - private static final Log LOG = - LogFactory.getLog(LocalityMulticastAMRMProxyPolicy.class); + private static final Logger LOG = + LoggerFactory.getLogger(RouterPolicyFacade.class); private final SubClusterResolver subClusterResolver; private final FederationStateStoreFacade federationFacade; @@ -68,10 +67,10 @@ public RouterPolicyFacade(Configuration conf, this.globalPolicyMap = new ConcurrentHashMap<>(); // load default behavior from store if possible - String defaulKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + String defaultKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; SubClusterPolicyConfiguration configuration = null; try { - configuration = federationFacade.getPolicyConfiguration(defaulKey); + configuration = federationFacade.getPolicyConfiguration(defaultKey); } catch (YarnException e) { LOG.warn("No fallback behavior defined in store, defaulting to XML " + "configuration fallback behavior."); @@ -88,7 +87,7 @@ public RouterPolicyFacade(Configuration conf, ByteBuffer defaultPolicyParam = ByteBuffer .wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8)); - configuration = SubClusterPolicyConfiguration.newInstance(defaulKey, + configuration = SubClusterPolicyConfiguration.newInstance(defaultKey, defaultFederationPolicyManager, defaultPolicyParam); } @@ -98,12 +97,12 @@ public RouterPolicyFacade(Configuration conf, subClusterResolver, federationFacade, homeSubcluster); FederationPolicyManager fallbackPolicyManager = FederationPolicyUtils.instantiatePolicyManager(configuration.getType()); - fallbackPolicyManager.setQueue(defaulKey); + fallbackPolicyManager.setQueue(defaultKey); // add to the cache the fallback behavior - globalConfMap.put(defaulKey, + globalConfMap.put(defaultKey, fallbackContext.getSubClusterPolicyConfiguration()); - globalPolicyMap.put(defaulKey, + globalPolicyMap.put(defaultKey, fallbackPolicyManager.getRouterPolicy(fallbackContext, null)); } @@ -155,29 +154,37 @@ public SubClusterId getHomeSubcluster( try { configuration = federationFacade.getPolicyConfiguration(queue); } catch (YarnException e) { - LOG.debug(e); + String errMsg = "There is no policy configured for the queue: " + queue + + ", falling back to defaults."; + LOG.warn(errMsg, e); } // If there is no policy configured for this queue, fallback to the baseline // policy that is configured either in the store or via XML config (and // cached) if (configuration == null) { - try { - LOG.warn("There is no policies configured for queue: " + queue + " we" - + " fallback to default policy for: " - + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + LOG.warn("There is no policies configured for queue: " + queue + " we" + + " fallback to default policy for: " + + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); - queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; - configuration = federationFacade.getPolicyConfiguration( - YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; + try { + configuration = federationFacade.getPolicyConfiguration(queue); } catch (YarnException e) { - // the fallback is not configure via store, but via XML, using - // previously loaded configuration. - configuration = - cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + String errMsg = "Cannot retrieve policy configured for the queue: " + + queue + ", falling back to defaults."; + LOG.warn(errMsg, e); + } } + // the fallback is not configure via store, but via XML, using + // previously loaded configuration. + if (configuration == null) { + configuration = + cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + } + // if the configuration has changed since last loaded, reinit the policy // based on current configuration if (!cachedConfs.containsKey(queue) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationMembershipStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationMembershipStateStore.java index 7778d5f3c1..49ec3bf31d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationMembershipStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationMembershipStateStore.java @@ -70,7 +70,7 @@ SubClusterRegisterResponse registerSubCluster( */ SubClusterDeregisterResponse deregisterSubCluster( SubClusterDeregisterRequest subClusterDeregisterRequest) - throws YarnException; + throws YarnException; /** * Periodic heartbeat from a ResourceManager participating in @@ -86,7 +86,7 @@ SubClusterDeregisterResponse deregisterSubCluster( */ SubClusterHeartbeatResponse subClusterHeartbeat( SubClusterHeartbeatRequest subClusterHeartbeatRequest) - throws YarnException; + throws YarnException; /** * Get the membership information of subcluster as identified by @@ -94,7 +94,8 @@ SubClusterHeartbeatResponse subClusterHeartbeat( * endpoint and current capabilities as represented by {@code SubClusterInfo}. * * @param subClusterRequest the subcluster whose information is required - * @return the {@code SubClusterInfo} + * @return the {@code SubClusterInfo}, or {@code null} if there is no mapping + * for the subcluster * @throws YarnException if the request is invalid/fails */ GetSubClusterInfoResponse getSubCluster( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationPolicyStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationPolicyStore.java index 9d9bd9bdbb..b0e03a6f33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationPolicyStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/FederationPolicyStore.java @@ -45,7 +45,8 @@ public interface FederationPolicyStore { * * @param request the queue whose {@code SubClusterPolicyConfiguration} is * required - * @return the {@code SubClusterPolicyConfiguration} for the specified queue + * @return the {@code SubClusterPolicyConfiguration} for the specified queue, + * or {@code null} if there is no mapping for the queue * @throws YarnException if the request is invalid/fails */ GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index fbdb7bff90..7c06256a41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -165,9 +165,8 @@ public GetSubClusterInfoResponse getSubCluster( FederationMembershipStateStoreInputValidator.validate(request); SubClusterId subClusterId = request.getSubClusterId(); if (!membership.containsKey(subClusterId)) { - String errMsg = - "SubCluster " + subClusterId.toString() + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + LOG.warn("The queried SubCluster: {} does not exist.", subClusterId); + return null; } return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId)); @@ -274,8 +273,8 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( FederationPolicyStoreInputValidator.validate(request); String queue = request.getQueue(); if (!policies.containsKey(queue)) { - String errMsg = "Policy for queue " + queue + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + LOG.warn("Policy for queue: {} does not exist.", queue); + return null; } return GetSubClusterPolicyConfigurationResponse diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java index a849c6a3d2..63d8e42460 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -385,6 +385,12 @@ public GetSubClusterInfoResponse getSubCluster( String rmAdminAddress = cstmt.getString(4); String webAppAddress = cstmt.getString(5); + // first check if the subCluster exists + if((amRMAddress == null) || (clientRMAddress == null)) { + LOG.warn("The queried SubCluster: {} does not exist.", subClusterId); + return null; + } + Timestamp heartBeatTimeStamp = cstmt.getTimestamp(6, utcCalendar); long lastHeartBeat = heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0; @@ -788,9 +794,8 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( + subClusterPolicyConfiguration.toString()); } } else { - String errMsg = - "Policy for queue " + request.getQueue() + " does not exist"; - FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + LOG.warn("Policy for queue: {} does not exist.", request.getQueue()); + return null; } } catch (SQLException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index 569334275d..389c769427 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; @@ -221,7 +222,8 @@ public static FederationStateStoreFacade getInstance() { * Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}. * * @param subClusterId the identifier of the sub-cluster - * @return the sub cluster information + * @return the sub cluster information, or + * {@code null} if there is no mapping for the subClusterId * @throws YarnException if the call to the state store is unsuccessful */ public SubClusterInfo getSubCluster(final SubClusterId subClusterId) @@ -229,9 +231,13 @@ public SubClusterInfo getSubCluster(final SubClusterId subClusterId) if (isCachingEnabled()) { return getSubClusters(false).get(subClusterId); } else { - return stateStore - .getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId)) - .getSubClusterInfo(); + GetSubClusterInfoResponse response = stateStore + .getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId)); + if (response == null) { + return null; + } else { + return response.getSubClusterInfo(); + } } } @@ -282,7 +288,8 @@ public Map getSubClusters( * Returns the {@link SubClusterPolicyConfiguration} for the specified queue. * * @param queue the queue whose policy is required - * @return the corresponding configured policy + * @return the corresponding configured policy, or {@code null} if there is no + * mapping for the queue * @throws YarnException if the call to the state store is unsuccessful */ public SubClusterPolicyConfiguration getPolicyConfiguration( @@ -295,8 +302,7 @@ public SubClusterPolicyConfiguration getPolicyConfiguration( stateStore.getPolicyConfiguration( GetSubClusterPolicyConfigurationRequest.newInstance(queue)); if (response == null) { - throw new YarnException("The stateStore returned a null for " - + "GetSubClusterPolicyConfigurationResponse for queue " + queue); + return null; } else { return response.getPolicyConfiguration(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index db04592e3a..15cc0f0a00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; @@ -157,13 +158,8 @@ public void testGetSubClusterInfoUnknownSubCluster() throws Exception { GetSubClusterInfoRequest request = GetSubClusterInfoRequest.newInstance(subClusterId); - try { - stateStore.getSubCluster(request).getSubClusterInfo(); - Assert.fail(); - } catch (FederationStateStoreException e) { - Assert.assertTrue( - e.getMessage().startsWith("SubCluster SC does not exist")); - } + GetSubClusterInfoResponse response = stateStore.getSubCluster(request); + Assert.assertNull(response); } @Test @@ -473,13 +469,10 @@ public void testGetPolicyConfigurationUnknownQueue() throws Exception { GetSubClusterPolicyConfigurationRequest request = GetSubClusterPolicyConfigurationRequest.newInstance("Queue"); - try { - stateStore.getPolicyConfiguration(request); - Assert.fail(); - } catch (FederationStateStoreException e) { - Assert.assertTrue( - e.getMessage().startsWith("Policy for queue Queue does not exist")); - } + + GetSubClusterPolicyConfigurationResponse response = + stateStore.getPolicyConfiguration(request); + Assert.assertNull(response); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java index 423bf86b31..5d4c8d5ad7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java @@ -51,6 +51,7 @@ public class FederationStateStoreTestUtil { public static final String SC_PREFIX = "SC-"; public static final String Q_PREFIX = "queue-"; public static final String POLICY_PREFIX = "policy-"; + public static final String INVALID = "dummy"; private FederationStateStore stateStore; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java index d46bef09b1..6328122e77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java @@ -47,9 +47,10 @@ public class TestFederationStateStoreFacade { @Parameters + @SuppressWarnings({"NoWhitespaceAfter"}) public static Collection getParameters() { return Arrays - .asList(new Boolean[][] {{Boolean.FALSE }, {Boolean.TRUE } }); + .asList(new Boolean[][] { { Boolean.FALSE }, { Boolean.TRUE } }); } private final long clusterTs = System.currentTimeMillis(); @@ -98,6 +99,13 @@ public void testGetSubCluster() throws YarnException { } } + @Test + public void testInvalidGetSubCluster() throws YarnException { + SubClusterId subClusterId = + SubClusterId.newInstance(FederationStateStoreTestUtil.INVALID); + Assert.assertNull(facade.getSubCluster(subClusterId)); + } + @Test public void testGetSubClusterFlushCache() throws YarnException { for (int i = 0; i < numSubClusters; i++) { @@ -127,6 +135,12 @@ public void testGetPolicyConfiguration() throws YarnException { } } + @Test + public void testInvalidGetPolicyConfiguration() throws YarnException { + Assert.assertNull( + facade.getPolicyConfiguration(FederationStateStoreTestUtil.INVALID)); + } + @Test public void testGetPoliciesConfigurations() throws YarnException { Map queuePolicies =