YARN-11614. [Federation] Add Federation PolicyManager Validation Rules. (#6271) Contributed by Shilun Fan.
Reviewed-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
parent
abd550cff4
commit
8a610e616f
@ -822,4 +822,12 @@ public static boolean isRouterWebProxyEnable(Configuration conf) {
|
||||
return conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PROXY_ENABLE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PROXY_ENABLE);
|
||||
}
|
||||
|
||||
public static boolean checkPolicyManagerValid(String policyManager,
|
||||
List<String> supportWeightList) throws YarnException {
|
||||
if (supportWeightList.contains(policyManager)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
@ -75,6 +75,9 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.DeleteFederationApplicationResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.PriorityBroadcastPolicyManager;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedHomePolicyManager;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
@ -92,6 +95,7 @@
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
@ -107,6 +111,8 @@
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.router.RouterServerUtil.checkPolicyManagerValid;
|
||||
|
||||
public class FederationRMAdminInterceptor extends AbstractRMAdminRequestInterceptor {
|
||||
|
||||
private static final Logger LOG =
|
||||
@ -115,6 +121,10 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
|
||||
private static final String COMMA = ",";
|
||||
private static final String COLON = ":";
|
||||
|
||||
private static final List<String> SUPPORT_WEIGHT_MANAGERS =
|
||||
new ArrayList<>(Arrays.asList(WeightedLocalityPolicyManager.class.getName(),
|
||||
PriorityBroadcastPolicyManager.class.getName(), WeightedHomePolicyManager.class.getName()));
|
||||
|
||||
private Map<SubClusterId, ResourceManagerAdministrationProtocol> adminRMProxies;
|
||||
private FederationStateStoreFacade federationFacade;
|
||||
private final Clock clock = new MonotonicClock();
|
||||
@ -924,6 +934,13 @@ public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
|
||||
RouterServerUtil.logAndThrowException("Missing Queue information.", null);
|
||||
}
|
||||
|
||||
String policyManagerClassName = request.getPolicyManagerClassName();
|
||||
if (!checkPolicyManagerValid(policyManagerClassName, SUPPORT_WEIGHT_MANAGERS)) {
|
||||
routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException(policyManagerClassName +
|
||||
" does not support the use of queue weights.", null);
|
||||
}
|
||||
|
||||
String amRmWeight = federationQueueWeight.getAmrmWeight();
|
||||
FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);
|
||||
|
||||
@ -935,9 +952,6 @@ public SaveFederationQueuePolicyResponse saveFederationQueuePolicy(
|
||||
|
||||
try {
|
||||
long startTime = clock.getTime();
|
||||
// Step1, get parameters.
|
||||
String policyManagerClassName = request.getPolicyManagerClassName();
|
||||
|
||||
|
||||
// Step2, parse amRMPolicyWeights.
|
||||
Map<SubClusterIdInfo, Float> amRMPolicyWeights = getSubClusterWeightMap(amRmWeight);
|
||||
@ -1346,6 +1360,12 @@ private void saveFederationQueuePolicy(FederationQueueWeight federationQueueWeig
|
||||
RouterServerUtil.logAndThrowException("Missing PolicyManagerClassName information.", null);
|
||||
}
|
||||
|
||||
if (!checkPolicyManagerValid(policyManagerClassName, SUPPORT_WEIGHT_MANAGERS)) {
|
||||
routerMetrics.incrSaveFederationQueuePolicyFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException(policyManagerClassName +
|
||||
"does not support the use of queue weights.", null);
|
||||
}
|
||||
|
||||
String amRmWeight = federationQueueWeight.getAmrmWeight();
|
||||
FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amRmWeight);
|
||||
|
||||
|
@ -643,15 +643,27 @@ public void testSaveFederationQueuePolicyErrorRequest() throws Exception {
|
||||
LambdaTestUtils.intercept(YarnException.class, "Missing Queue information.",
|
||||
() -> interceptor.saveFederationQueuePolicy(request));
|
||||
|
||||
// routerWeight / amrmWeight
|
||||
// The sum of the routerWeight is not equal to 1.
|
||||
// PolicyManager needs to support weight
|
||||
FederationQueueWeight federationQueueWeight2 = FederationQueueWeight.newInstance(
|
||||
"SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0");
|
||||
SaveFederationQueuePolicyRequest request2 =
|
||||
SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight2, "-");
|
||||
SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight2,
|
||||
"TestPolicyManager");
|
||||
LambdaTestUtils.intercept(YarnException.class,
|
||||
"TestPolicyManager does not support the use of queue weights.",
|
||||
() -> interceptor.saveFederationQueuePolicy(request2));
|
||||
|
||||
// routerWeight / amrmWeight
|
||||
// The sum of the routerWeight is not equal to 1.
|
||||
String policyTypeName = WeightedLocalityPolicyManager.class.getCanonicalName();
|
||||
FederationQueueWeight federationQueueWeight3 = FederationQueueWeight.newInstance(
|
||||
"SC-1:0.7,SC-2:0.3", "SC-1:0.8,SC-2:0.3", "1.0");
|
||||
SaveFederationQueuePolicyRequest request3 =
|
||||
SaveFederationQueuePolicyRequest.newInstance("root.a", federationQueueWeight3,
|
||||
policyTypeName);
|
||||
LambdaTestUtils.intercept(YarnException.class,
|
||||
"The sum of ratios for all subClusters must be equal to 1.",
|
||||
() -> interceptor.saveFederationQueuePolicy(request2));
|
||||
() -> interceptor.saveFederationQueuePolicy(request3));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user