YARN-5391. PolicyManager to tie together Router/AMRM Federation policies. (Carlo Curino via Subru).

(cherry picked from commit 20893682eced98dfba55d88edd63296993087c85)
This commit is contained in:
Subru Krishnan 2016-11-01 19:54:18 -07:00 committed by Carlo Curino
parent 1dadd0b45a
commit 93d9fdeca6
10 changed files with 545 additions and 3 deletions

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class provides basic implementation for common methods that multiple
* policies will need to implement.
*/
public abstract class AbstractPolicyManager implements
FederationPolicyManager {
private String queue;
@SuppressWarnings("checkstyle:visibilitymodifier")
protected Class routerFederationPolicy;
@SuppressWarnings("checkstyle:visibilitymodifier")
protected Class amrmProxyFederationPolicy;
public static final Logger LOG =
LoggerFactory.getLogger(AbstractPolicyManager.class);
/**
* This default implementation validates the
* {@link FederationPolicyInitializationContext},
* then checks whether it needs to reinstantiate the class (null or
* mismatching type), and reinitialize the policy.
*
* @param federationPolicyContext the current context
* @param oldInstance the existing (possibly null) instance.
*
* @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy}
* instance
*
* @throws FederationPolicyInitializationException if the reinitalization is
* not valid, and ensure
* previous state is preserved
*/
public FederationAMRMProxyPolicy getAMRMPolicy(
FederationPolicyInitializationContext federationPolicyContext,
FederationAMRMProxyPolicy oldInstance)
throws FederationPolicyInitializationException {
if (amrmProxyFederationPolicy == null) {
throw new FederationPolicyInitializationException("The parameter "
+ "amrmProxyFederationPolicy should be initialized in "
+ this.getClass().getSimpleName() + " constructor.");
}
try {
return (FederationAMRMProxyPolicy) internalPolicyGetter(
federationPolicyContext, oldInstance, amrmProxyFederationPolicy);
} catch (ClassCastException e) {
throw new FederationPolicyInitializationException(e);
}
}
/**
* This default implementation validates the
* {@link FederationPolicyInitializationContext},
* then checks whether it needs to reinstantiate the class (null or
* mismatching type), and reinitialize the policy.
*
* @param federationPolicyContext the current context
* @param oldInstance the existing (possibly null) instance.
*
* @return a valid and fully reinitalized {@link FederationRouterPolicy}
* instance
*
* @throws FederationPolicyInitializationException if the reinitalization is
* not valid, and ensure
* previous state is preserved
*/
public FederationRouterPolicy getRouterPolicy(
FederationPolicyInitializationContext federationPolicyContext,
FederationRouterPolicy oldInstance)
throws FederationPolicyInitializationException {
//checks that sub-types properly initialize the types of policies
if (routerFederationPolicy == null) {
throw new FederationPolicyInitializationException("The policy "
+ "type should be initialized in " + this.getClass().getSimpleName()
+ " constructor.");
}
try {
return (FederationRouterPolicy) internalPolicyGetter(
federationPolicyContext, oldInstance, routerFederationPolicy);
} catch (ClassCastException e) {
throw new FederationPolicyInitializationException(e);
}
}
@Override
public String getQueue() {
return queue;
}
@Override
public void setQueue(String queue) {
this.queue = queue;
}
/**
* Common functionality to instantiate a reinitialize a {@link
* ConfigurableFederationPolicy}.
*/
private ConfigurableFederationPolicy internalPolicyGetter(
final FederationPolicyInitializationContext federationPolicyContext,
ConfigurableFederationPolicy oldInstance, Class policy)
throws FederationPolicyInitializationException {
FederationPolicyInitializationContextValidator
.validate(federationPolicyContext, this.getClass().getCanonicalName());
if (oldInstance == null || !oldInstance.getClass().equals(policy)) {
try {
oldInstance = (ConfigurableFederationPolicy) policy.newInstance();
} catch (InstantiationException e) {
throw new FederationPolicyInitializationException(e);
} catch (IllegalAccessException e) {
throw new FederationPolicyInitializationException(e);
}
}
//copying the context to avoid side-effects
FederationPolicyInitializationContext modifiedContext =
updateContext(federationPolicyContext,
oldInstance.getClass().getCanonicalName());
oldInstance.reinitialize(modifiedContext);
return oldInstance;
}
/**
* This method is used to copy-on-write the context, that will be passed
* downstream to the router/amrmproxy policies.
*/
private FederationPolicyInitializationContext updateContext(
FederationPolicyInitializationContext federationPolicyContext,
String type) {
// copying configuration and context to avoid modification of original
SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration
.newInstance(federationPolicyContext
.getSubClusterPolicyConfiguration());
newConf.setType(type);
return new FederationPolicyInitializationContext(newConf,
federationPolicyContext.getFederationSubclusterResolver(),
federationPolicyContext.getFederationStateStoreFacade(),
federationPolicyContext.getHomeSubcluster());
}
}

View File

@ -41,10 +41,11 @@ public FederationPolicyInitializationContext() {
public FederationPolicyInitializationContext(
SubClusterPolicyConfiguration policy, SubClusterResolver resolver,
FederationStateStoreFacade storeFacade) {
FederationStateStoreFacade storeFacade, SubClusterId home) {
this.federationPolicyConfiguration = policy;
this.federationSubclusterResolver = resolver;
this.federationStateStoreFacade = storeFacade;
this.homeSubcluster = home;
}
/**

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import java.nio.ByteBuffer;
/**
* This class represents a simple implementation of a {@code
* FederationPolicyManager}.
*
* It combines the basic policies: {@link UniformRandomRouterPolicy} and
* {@link BroadcastAMRMProxyPolicy}, which are designed to work together and
* "spread" the load among sub-clusters uniformly.
*
* This simple policy might impose heavy load on the RMs and return more
* containers than a job requested as all requests are (replicated and)
* broadcasted.
*/
public class UniformBroadcastPolicyManager
extends AbstractPolicyManager {
public UniformBroadcastPolicyManager() {
//this structurally hard-codes two compatible policies for Router and
// AMRMProxy.
routerFederationPolicy = UniformRandomRouterPolicy.class;
amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
}
@Override
public SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException {
ByteBuffer buf = ByteBuffer.allocate(0);
return SubClusterPolicyConfiguration
.newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import java.nio.ByteBuffer;
/**
* Policy that allows operator to configure "weights" for routing. This picks a
* {@link WeightedRandomRouterPolicy} for the router and a {@link
* LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to
* work together.
*/
public class WeightedLocalityPolicyManager
extends AbstractPolicyManager {
private WeightedPolicyInfo weightedPolicyInfo;
public WeightedLocalityPolicyManager() {
//this structurally hard-codes two compatible policies for Router and
// AMRMProxy.
routerFederationPolicy = WeightedRandomRouterPolicy.class;
amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class;
weightedPolicyInfo = new WeightedPolicyInfo();
}
@Override
public SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException {
ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
return SubClusterPolicyConfiguration
.newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
}
@VisibleForTesting
public WeightedPolicyInfo getWeightedPolicyInfo() {
return weightedPolicyInfo;
}
@VisibleForTesting
public void setWeightedPolicyInfo(
WeightedPolicyInfo weightedPolicyInfo) {
this.weightedPolicyInfo = weightedPolicyInfo;
}
}

View File

@ -40,6 +40,7 @@
@Unstable
public abstract class SubClusterPolicyConfiguration {
@Private
@Unstable
public static SubClusterPolicyConfiguration newInstance(String queue,
@ -52,6 +53,18 @@ public static SubClusterPolicyConfiguration newInstance(String queue,
return policy;
}
@Private
@Unstable
public static SubClusterPolicyConfiguration newInstance(
SubClusterPolicyConfiguration conf) {
SubClusterPolicyConfiguration policy =
Records.newRecord(SubClusterPolicyConfiguration.class);
policy.setQueue(conf.getQueue());
policy.setType(conf.getType());
policy.setParams(conf.getParams());
return policy;
}
/**
* Get the name of the queue for which we are configuring a policy.
*

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Test;
/**
* This class provides common test methods for testing {@code
* FederationPolicyManager}s.
*/
public abstract class BasePolicyManagerTest {
@SuppressWarnings("checkstyle:visibilitymodifier")
protected FederationPolicyManager wfp = null;
@SuppressWarnings("checkstyle:visibilitymodifier")
protected Class expectedPolicyManager;
@SuppressWarnings("checkstyle:visibilitymodifier")
protected Class expectedAMRMProxyPolicy;
@SuppressWarnings("checkstyle:visibilitymodifier")
protected Class expectedRouterPolicy;
@Test
public void testSerializeAndInstantiate() throws Exception {
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
expectedAMRMProxyPolicy,
expectedRouterPolicy);
}
@Test(expected = FederationPolicyInitializationException.class)
public void testSerializeAndInstantiateBad1() throws Exception {
serializeAndDeserializePolicyManager(wfp, String.class,
expectedAMRMProxyPolicy, expectedRouterPolicy);
}
@Test(expected = AssertionError.class)
public void testSerializeAndInstantiateBad2() throws Exception {
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
String.class, expectedRouterPolicy);
}
@Test(expected = AssertionError.class)
public void testSerializeAndInstantiateBad3() throws Exception {
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
expectedAMRMProxyPolicy, String.class);
}
protected static void serializeAndDeserializePolicyManager(
FederationPolicyManager wfp, Class policyManagerType,
Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception {
// serializeConf it in a context
SubClusterPolicyConfiguration fpc =
wfp.serializeConf();
fpc.setType(policyManagerType.getCanonicalName());
FederationPolicyInitializationContext context = new
FederationPolicyInitializationContext();
context.setSubClusterPolicyConfiguration(fpc);
context
.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
context.setFederationSubclusterResolver(
FederationPoliciesTestUtil.initResolver());
context.setHomeSubcluster(SubClusterId.newInstance("homesubcluster"));
// based on the "context" created instantiate new class and use it
Class c = Class.forName(wfp.getClass().getCanonicalName());
FederationPolicyManager wfp2 = (FederationPolicyManager) c.newInstance();
FederationAMRMProxyPolicy federationAMRMProxyPolicy =
wfp2.getAMRMPolicy(context, null);
//needed only for tests (getARMRMPolicy change the "type" in conf)
fpc.setType(wfp.getClass().getCanonicalName());
FederationRouterPolicy federationRouterPolicy =
wfp2.getRouterPolicy(context, null);
Assert.assertEquals(federationAMRMProxyPolicy.getClass(),
expAMRMProxyPolicy);
Assert.assertEquals(federationRouterPolicy.getClass(),
expRouterPolicy);
}
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@ -38,6 +39,7 @@ public class TestFederationPolicyInitializationContextValidator {
private SubClusterPolicyConfiguration goodConfig;
private SubClusterResolver goodSR;
private FederationStateStoreFacade goodFacade;
private SubClusterId goodHome;
private FederationPolicyInitializationContext context;
@Before
@ -45,8 +47,9 @@ public void setUp() throws Exception {
goodFacade = FederationPoliciesTestUtil.initFacade();
goodConfig = new MockPolicyManager().serializeConf();
goodSR = FederationPoliciesTestUtil.initResolver();
goodHome = SubClusterId.newInstance("homesubcluster");
context = new FederationPolicyInitializationContext(goodConfig, goodSR,
goodFacade);
goodFacade, goodHome);
}
@Test

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
import org.junit.Before;
/**
* Simple test of {@link UniformBroadcastPolicyManager}.
*/
public class TestUniformBroadcastPolicyManager extends BasePolicyManagerTest {
@Before
public void setup() {
//config policy
wfp = new UniformBroadcastPolicyManager();
wfp.setQueue("queue1");
//set expected params that the base test class will use for tests
expectedPolicyManager = UniformBroadcastPolicyManager.class;
expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
expectedRouterPolicy = UniformRandomRouterPolicy.class;
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
/**
* Simple test of {@link WeightedLocalityPolicyManager}.
*/
public class TestWeightedLocalityPolicyManager extends
BasePolicyManagerTest {
private WeightedPolicyInfo policyInfo;
@Before
public void setup() {
// configure a policy
wfp = new WeightedLocalityPolicyManager();
wfp.setQueue("queue1");
SubClusterId sc1 = SubClusterId.newInstance("sc1");
SubClusterId sc2 = SubClusterId.newInstance("sc2");
policyInfo = new WeightedPolicyInfo();
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
routerWeights.put(new SubClusterIdInfo(sc1), 0.2f);
routerWeights.put(new SubClusterIdInfo(sc2), 0.8f);
policyInfo.setRouterPolicyWeights(routerWeights);
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
amrmWeights.put(new SubClusterIdInfo(sc1), 0.2f);
amrmWeights.put(new SubClusterIdInfo(sc2), 0.8f);
policyInfo.setAMRMPolicyWeights(amrmWeights);
((WeightedLocalityPolicyManager) wfp).setWeightedPolicyInfo(
policyInfo);
//set expected params that the base test class will use for tests
expectedPolicyManager = WeightedLocalityPolicyManager.class;
expectedAMRMProxyPolicy = LocalityMulticastAMRMProxyPolicy.class;
expectedRouterPolicy = WeightedRandomRouterPolicy.class;
}
@Test
public void testPolicyInfoSetCorrectly() throws Exception {
serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
expectedAMRMProxyPolicy,
expectedRouterPolicy);
//check the policyInfo propagates through ser/der correctly
Assert.assertEquals(((WeightedLocalityPolicyManager) wfp)
.getWeightedPolicyInfo(), policyInfo);
}
}

View File

@ -143,7 +143,7 @@ public static void initializePolicyContext(
SubClusterInfo> activeSubclusters) throws YarnException {
FederationPolicyInitializationContext context =
new FederationPolicyInitializationContext(null, initResolver(),
initFacade());
initFacade(), SubClusterId.newInstance("homesubcluster"));
initializePolicyContext(context, policy, policyInfo, activeSubclusters);
}