YARN-6815. FederationStateStoreFacade return behavior should be consistent irrespective of whether caching is enabled or not.

(cherry picked from commit 8820693cd769065eed83193b673ec5d919d69500)
This commit is contained in:
Subru Krishnan 2017-07-13 18:51:06 -07:00 committed by Carlo Curino
parent f427e4201f
commit 825108d08a
9 changed files with 84 additions and 57 deletions

View File

@ -23,13 +23,10 @@
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
@ -38,6 +35,8 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -47,8 +46,8 @@
*/ */
public class RouterPolicyFacade { public class RouterPolicyFacade {
private static final Log LOG = private static final Logger LOG =
LogFactory.getLog(LocalityMulticastAMRMProxyPolicy.class); LoggerFactory.getLogger(RouterPolicyFacade.class);
private final SubClusterResolver subClusterResolver; private final SubClusterResolver subClusterResolver;
private final FederationStateStoreFacade federationFacade; private final FederationStateStoreFacade federationFacade;
@ -68,10 +67,10 @@ public RouterPolicyFacade(Configuration conf,
this.globalPolicyMap = new ConcurrentHashMap<>(); this.globalPolicyMap = new ConcurrentHashMap<>();
// load default behavior from store if possible // load default behavior from store if possible
String defaulKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; String defaultKey = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
SubClusterPolicyConfiguration configuration = null; SubClusterPolicyConfiguration configuration = null;
try { try {
configuration = federationFacade.getPolicyConfiguration(defaulKey); configuration = federationFacade.getPolicyConfiguration(defaultKey);
} catch (YarnException e) { } catch (YarnException e) {
LOG.warn("No fallback behavior defined in store, defaulting to XML " LOG.warn("No fallback behavior defined in store, defaulting to XML "
+ "configuration fallback behavior."); + "configuration fallback behavior.");
@ -88,7 +87,7 @@ public RouterPolicyFacade(Configuration conf,
ByteBuffer defaultPolicyParam = ByteBuffer ByteBuffer defaultPolicyParam = ByteBuffer
.wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8)); .wrap(defaultPolicyParamString.getBytes(StandardCharsets.UTF_8));
configuration = SubClusterPolicyConfiguration.newInstance(defaulKey, configuration = SubClusterPolicyConfiguration.newInstance(defaultKey,
defaultFederationPolicyManager, defaultPolicyParam); defaultFederationPolicyManager, defaultPolicyParam);
} }
@ -98,12 +97,12 @@ public RouterPolicyFacade(Configuration conf,
subClusterResolver, federationFacade, homeSubcluster); subClusterResolver, federationFacade, homeSubcluster);
FederationPolicyManager fallbackPolicyManager = FederationPolicyManager fallbackPolicyManager =
FederationPolicyUtils.instantiatePolicyManager(configuration.getType()); FederationPolicyUtils.instantiatePolicyManager(configuration.getType());
fallbackPolicyManager.setQueue(defaulKey); fallbackPolicyManager.setQueue(defaultKey);
// add to the cache the fallback behavior // add to the cache the fallback behavior
globalConfMap.put(defaulKey, globalConfMap.put(defaultKey,
fallbackContext.getSubClusterPolicyConfiguration()); fallbackContext.getSubClusterPolicyConfiguration());
globalPolicyMap.put(defaulKey, globalPolicyMap.put(defaultKey,
fallbackPolicyManager.getRouterPolicy(fallbackContext, null)); fallbackPolicyManager.getRouterPolicy(fallbackContext, null));
} }
@ -155,28 +154,36 @@ public SubClusterId getHomeSubcluster(
try { try {
configuration = federationFacade.getPolicyConfiguration(queue); configuration = federationFacade.getPolicyConfiguration(queue);
} catch (YarnException e) { } catch (YarnException e) {
LOG.debug(e); String errMsg = "There is no policy configured for the queue: " + queue
+ ", falling back to defaults.";
LOG.warn(errMsg, e);
} }
// If there is no policy configured for this queue, fallback to the baseline // If there is no policy configured for this queue, fallback to the baseline
// policy that is configured either in the store or via XML config (and // policy that is configured either in the store or via XML config (and
// cached) // cached)
if (configuration == null) { if (configuration == null) {
try {
LOG.warn("There is no policies configured for queue: " + queue + " we" LOG.warn("There is no policies configured for queue: " + queue + " we"
+ " fallback to default policy for: " + " fallback to default policy for: "
+ YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); + YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY; queue = YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY;
configuration = federationFacade.getPolicyConfiguration( try {
YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); configuration = federationFacade.getPolicyConfiguration(queue);
} catch (YarnException e) { } catch (YarnException e) {
String errMsg = "Cannot retrieve policy configured for the queue: "
+ queue + ", falling back to defaults.";
LOG.warn(errMsg, e);
}
}
// the fallback is not configure via store, but via XML, using // the fallback is not configure via store, but via XML, using
// previously loaded configuration. // previously loaded configuration.
if (configuration == null) {
configuration = configuration =
cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY); cachedConfs.get(YarnConfiguration.DEFAULT_FEDERATION_POLICY_KEY);
} }
}
// if the configuration has changed since last loaded, reinit the policy // if the configuration has changed since last loaded, reinit the policy
// based on current configuration // based on current configuration

View File

@ -94,7 +94,8 @@ SubClusterHeartbeatResponse subClusterHeartbeat(
* endpoint and current capabilities as represented by {@code SubClusterInfo}. * endpoint and current capabilities as represented by {@code SubClusterInfo}.
* *
* @param subClusterRequest the subcluster whose information is required * @param subClusterRequest the subcluster whose information is required
* @return the {@code SubClusterInfo} * @return the {@code SubClusterInfo}, or {@code null} if there is no mapping
* for the subcluster
* @throws YarnException if the request is invalid/fails * @throws YarnException if the request is invalid/fails
*/ */
GetSubClusterInfoResponse getSubCluster( GetSubClusterInfoResponse getSubCluster(

View File

@ -45,7 +45,8 @@ public interface FederationPolicyStore {
* *
* @param request the queue whose {@code SubClusterPolicyConfiguration} is * @param request the queue whose {@code SubClusterPolicyConfiguration} is
* required * required
* @return the {@code SubClusterPolicyConfiguration} for the specified queue * @return the {@code SubClusterPolicyConfiguration} for the specified queue,
* or {@code null} if there is no mapping for the queue
* @throws YarnException if the request is invalid/fails * @throws YarnException if the request is invalid/fails
*/ */
GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(

View File

@ -165,9 +165,8 @@ public GetSubClusterInfoResponse getSubCluster(
FederationMembershipStateStoreInputValidator.validate(request); FederationMembershipStateStoreInputValidator.validate(request);
SubClusterId subClusterId = request.getSubClusterId(); SubClusterId subClusterId = request.getSubClusterId();
if (!membership.containsKey(subClusterId)) { if (!membership.containsKey(subClusterId)) {
String errMsg = LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
"SubCluster " + subClusterId.toString() + " does not exist"; return null;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
} }
return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId)); return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId));
@ -274,8 +273,8 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
FederationPolicyStoreInputValidator.validate(request); FederationPolicyStoreInputValidator.validate(request);
String queue = request.getQueue(); String queue = request.getQueue();
if (!policies.containsKey(queue)) { if (!policies.containsKey(queue)) {
String errMsg = "Policy for queue " + queue + " does not exist"; LOG.warn("Policy for queue: {} does not exist.", queue);
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); return null;
} }
return GetSubClusterPolicyConfigurationResponse return GetSubClusterPolicyConfigurationResponse

View File

@ -385,6 +385,12 @@ public GetSubClusterInfoResponse getSubCluster(
String rmAdminAddress = cstmt.getString(4); String rmAdminAddress = cstmt.getString(4);
String webAppAddress = cstmt.getString(5); String webAppAddress = cstmt.getString(5);
// first check if the subCluster exists
if((amRMAddress == null) || (clientRMAddress == null)) {
LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
return null;
}
Timestamp heartBeatTimeStamp = cstmt.getTimestamp(6, utcCalendar); Timestamp heartBeatTimeStamp = cstmt.getTimestamp(6, utcCalendar);
long lastHeartBeat = long lastHeartBeat =
heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0; heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0;
@ -788,9 +794,8 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+ subClusterPolicyConfiguration.toString()); + subClusterPolicyConfiguration.toString());
} }
} else { } else {
String errMsg = LOG.warn("Policy for queue: {} does not exist.", request.getQueue());
"Policy for queue " + request.getQueue() + " does not exist"; return null;
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
} }
} catch (SQLException e) { } catch (SQLException e) {

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
@ -221,7 +222,8 @@ public static FederationStateStoreFacade getInstance() {
* Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}. * Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}.
* *
* @param subClusterId the identifier of the sub-cluster * @param subClusterId the identifier of the sub-cluster
* @return the sub cluster information * @return the sub cluster information, or
* {@code null} if there is no mapping for the subClusterId
* @throws YarnException if the call to the state store is unsuccessful * @throws YarnException if the call to the state store is unsuccessful
*/ */
public SubClusterInfo getSubCluster(final SubClusterId subClusterId) public SubClusterInfo getSubCluster(final SubClusterId subClusterId)
@ -229,9 +231,13 @@ public SubClusterInfo getSubCluster(final SubClusterId subClusterId)
if (isCachingEnabled()) { if (isCachingEnabled()) {
return getSubClusters(false).get(subClusterId); return getSubClusters(false).get(subClusterId);
} else { } else {
return stateStore GetSubClusterInfoResponse response = stateStore
.getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId)) .getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId));
.getSubClusterInfo(); if (response == null) {
return null;
} else {
return response.getSubClusterInfo();
}
} }
} }
@ -282,7 +288,8 @@ public Map<SubClusterId, SubClusterInfo> getSubClusters(
* Returns the {@link SubClusterPolicyConfiguration} for the specified queue. * Returns the {@link SubClusterPolicyConfiguration} for the specified queue.
* *
* @param queue the queue whose policy is required * @param queue the queue whose policy is required
* @return the corresponding configured policy * @return the corresponding configured policy, or {@code null} if there is no
* mapping for the queue
* @throws YarnException if the call to the state store is unsuccessful * @throws YarnException if the call to the state store is unsuccessful
*/ */
public SubClusterPolicyConfiguration getPolicyConfiguration( public SubClusterPolicyConfiguration getPolicyConfiguration(
@ -295,8 +302,7 @@ public SubClusterPolicyConfiguration getPolicyConfiguration(
stateStore.getPolicyConfiguration( stateStore.getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest.newInstance(queue)); GetSubClusterPolicyConfigurationRequest.newInstance(queue));
if (response == null) { if (response == null) {
throw new YarnException("The stateStore returned a null for " return null;
+ "GetSubClusterPolicyConfigurationResponse for queue " + queue);
} else { } else {
return response.getPolicyConfiguration(); return response.getPolicyConfiguration();
} }

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
@ -157,13 +158,8 @@ public void testGetSubClusterInfoUnknownSubCluster() throws Exception {
GetSubClusterInfoRequest request = GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId); GetSubClusterInfoRequest.newInstance(subClusterId);
try { GetSubClusterInfoResponse response = stateStore.getSubCluster(request);
stateStore.getSubCluster(request).getSubClusterInfo(); Assert.assertNull(response);
Assert.fail();
} catch (FederationStateStoreException e) {
Assert.assertTrue(
e.getMessage().startsWith("SubCluster SC does not exist"));
}
} }
@Test @Test
@ -473,13 +469,10 @@ public void testGetPolicyConfigurationUnknownQueue() throws Exception {
GetSubClusterPolicyConfigurationRequest request = GetSubClusterPolicyConfigurationRequest request =
GetSubClusterPolicyConfigurationRequest.newInstance("Queue"); GetSubClusterPolicyConfigurationRequest.newInstance("Queue");
try {
GetSubClusterPolicyConfigurationResponse response =
stateStore.getPolicyConfiguration(request); stateStore.getPolicyConfiguration(request);
Assert.fail(); Assert.assertNull(response);
} catch (FederationStateStoreException e) {
Assert.assertTrue(
e.getMessage().startsWith("Policy for queue Queue does not exist"));
}
} }
@Test @Test

View File

@ -51,6 +51,7 @@ public class FederationStateStoreTestUtil {
public static final String SC_PREFIX = "SC-"; public static final String SC_PREFIX = "SC-";
public static final String Q_PREFIX = "queue-"; public static final String Q_PREFIX = "queue-";
public static final String POLICY_PREFIX = "policy-"; public static final String POLICY_PREFIX = "policy-";
public static final String INVALID = "dummy";
private FederationStateStore stateStore; private FederationStateStore stateStore;

View File

@ -47,9 +47,10 @@
public class TestFederationStateStoreFacade { public class TestFederationStateStoreFacade {
@Parameters @Parameters
@SuppressWarnings({"NoWhitespaceAfter"})
public static Collection<Boolean[]> getParameters() { public static Collection<Boolean[]> getParameters() {
return Arrays return Arrays
.asList(new Boolean[][] {{Boolean.FALSE }, {Boolean.TRUE } }); .asList(new Boolean[][] { { Boolean.FALSE }, { Boolean.TRUE } });
} }
private final long clusterTs = System.currentTimeMillis(); private final long clusterTs = System.currentTimeMillis();
@ -98,6 +99,13 @@ public void testGetSubCluster() throws YarnException {
} }
} }
@Test
public void testInvalidGetSubCluster() throws YarnException {
SubClusterId subClusterId =
SubClusterId.newInstance(FederationStateStoreTestUtil.INVALID);
Assert.assertNull(facade.getSubCluster(subClusterId));
}
@Test @Test
public void testGetSubClusterFlushCache() throws YarnException { public void testGetSubClusterFlushCache() throws YarnException {
for (int i = 0; i < numSubClusters; i++) { for (int i = 0; i < numSubClusters; i++) {
@ -127,6 +135,12 @@ public void testGetPolicyConfiguration() throws YarnException {
} }
} }
@Test
public void testInvalidGetPolicyConfiguration() throws YarnException {
Assert.assertNull(
facade.getPolicyConfiguration(FederationStateStoreTestUtil.INVALID));
}
@Test @Test
public void testGetPoliciesConfigurations() throws YarnException { public void testGetPoliciesConfigurations() throws YarnException {
Map<String, SubClusterPolicyConfiguration> queuePolicies = Map<String, SubClusterPolicyConfiguration> queuePolicies =