YARN-6190. Validation and synchronization fixes in LocalityMulticastAMRMProxyPolicy. (Botong Huang via curino)

(cherry picked from commit 5c486961cd3a175b122ef86275c99b72964f2c50)
This commit is contained in:
Carlo Curino 2017-02-28 17:04:20 -08:00
parent 193ec456d6
commit 8623644f45
5 changed files with 73 additions and 29 deletions

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
@ -143,10 +144,9 @@ public void reinitialize(
Map<SubClusterId, Float> newWeightsConverted = new HashMap<>();
boolean allInactive = true;
WeightedPolicyInfo policy = getPolicyInfo();
if (policy.getAMRMPolicyWeights() == null
|| policy.getAMRMPolicyWeights().size() == 0) {
allInactive = false;
} else {
if (policy.getAMRMPolicyWeights() != null
&& policy.getAMRMPolicyWeights().size() > 0) {
for (Map.Entry<SubClusterIdInfo, Float> e : policy.getAMRMPolicyWeights()
.entrySet()) {
if (e.getValue() > 0) {
@ -180,7 +180,6 @@ public void reinitialize(
this.federationFacade =
policyContext.getFederationStateStoreFacade();
this.bookkeeper = new AllocationBookkeeper();
this.homeSubcluster = policyContext.getHomeSubcluster();
}
@ -197,7 +196,9 @@ public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests) throws YarnException {
// object used to accumulate statistics about the answer, initialize with
// active subclusters.
// active subclusters. Create a new instance per call because this method
// can be called concurrently.
bookkeeper = new AllocationBookkeeper();
bookkeeper.reinitialize(federationFacade.getSubClusters(true));
List<ResourceRequest> nonLocalizedRequests =
@ -238,13 +239,17 @@ public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
// we log altogether later
}
if (targetIds != null && targetIds.size() > 0) {
boolean hasActive = false;
for (SubClusterId tid : targetIds) {
if (bookkeeper.isActiveAndEnabled(tid)) {
bookkeeper.addRackRR(tid, rr);
hasActive = true;
}
}
if (hasActive) {
continue;
}
}
// Handle node/rack requests that the SubClusterResolver cannot map to
// any cluster. Defaulting to home subcluster.
@ -347,7 +352,7 @@ private void splitIndividualAny(ResourceRequest originalResourceRequest,
originalResourceRequest.getExecutionTypeRequest());
out.setAllocationRequestId(allocationId);
out.setNumContainers((int) Math.ceil(numContainer));
if (out.isAnyLocation(out.getResourceName())) {
if (ResourceRequest.isAnyLocation(out.getResourceName())) {
allocationBookkeeper.addAnyRR(targetId, out);
} else {
allocationBookkeeper.addRackRR(targetId, out);
@ -362,7 +367,7 @@ private void splitIndividualAny(ResourceRequest originalResourceRequest,
*/
private float getLocalityBasedWeighting(long reqId, SubClusterId targetId,
AllocationBookkeeper allocationBookkeeper) {
float totWeight = allocationBookkeeper.getTotNumLocalizedContainers();
float totWeight = allocationBookkeeper.getTotNumLocalizedContainers(reqId);
float localWeight =
allocationBookkeeper.getNumLocalizedContainers(reqId, targetId);
return totWeight > 0 ? localWeight / totWeight : 0;
@ -375,7 +380,7 @@ private float getLocalityBasedWeighting(long reqId, SubClusterId targetId,
private float getPolicyConfigWeighting(SubClusterId targetId,
AllocationBookkeeper allocationBookkeeper) {
float totWeight = allocationBookkeeper.totPolicyWeight;
Float localWeight = weights.get(targetId);
Float localWeight = allocationBookkeeper.policyWeights.get(targetId);
return (localWeight != null && totWeight > 0) ? localWeight / totWeight : 0;
}
@ -424,29 +429,36 @@ private final class AllocationBookkeeper {
// asks, used to correctly "spread" the corresponding ANY
private Map<Long, Map<SubClusterId, AtomicLong>> countContainersPerRM =
new HashMap<>();
private Map<Long, AtomicLong> totNumLocalizedContainers = new HashMap<>();
private Set<SubClusterId> activeAndEnabledSC = new HashSet<>();
private long totNumLocalizedContainers = 0;
private float totHeadroomMemory = 0;
private int totHeadRoomEnabledRMs = 0;
private Map<SubClusterId, Float> policyWeights;
private float totPolicyWeight = 0;
private void reinitialize(
Map<SubClusterId, SubClusterInfo> activeSubclusters)
throws YarnException {
if (activeSubclusters == null) {
throw new YarnRuntimeException("null activeSubclusters received");
}
// reset data structures
answer.clear();
countContainersPerRM.clear();
totNumLocalizedContainers.clear();
activeAndEnabledSC.clear();
totNumLocalizedContainers = 0;
totHeadroomMemory = 0;
totHeadRoomEnabledRMs = 0;
// save the reference locally in case the weights get reinitialized
// concurrently
policyWeights = weights;
totPolicyWeight = 0;
// pre-compute the set of subclusters that are both active and enabled by
// the policy weights, and accumulate their total weight
for (Map.Entry<SubClusterId, Float> entry : weights.entrySet()) {
for (Map.Entry<SubClusterId, Float> entry : policyWeights.entrySet()) {
if (entry.getValue() > 0
&& activeSubclusters.containsKey(entry.getKey())) {
activeAndEnabledSC.add(entry.getKey());
@ -467,7 +479,6 @@ private void reinitialize(
totHeadRoomEnabledRMs++;
}
}
}
/**
@ -475,7 +486,8 @@ private void reinitialize(
* on a per-allocation-id and per-subcluster bases.
*/
private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) {
Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
Preconditions
.checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>());
@ -488,7 +500,12 @@ private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) {
countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId)
.addAndGet(rr.getNumContainers());
totNumLocalizedContainers += rr.getNumContainers();
if (!totNumLocalizedContainers.containsKey(rr.getAllocationRequestId())) {
totNumLocalizedContainers.put(rr.getAllocationRequestId(),
new AtomicLong(0));
}
totNumLocalizedContainers.get(rr.getAllocationRequestId())
.addAndGet(rr.getNumContainers());
internalAddToAnswer(targetId, rr);
}
@ -497,7 +514,8 @@ private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) {
* Add a rack-local request to the final asnwer.
*/
public void addRackRR(SubClusterId targetId, ResourceRequest rr) {
Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
Preconditions
.checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
internalAddToAnswer(targetId, rr);
}
@ -505,7 +523,8 @@ public void addRackRR(SubClusterId targetId, ResourceRequest rr) {
* Add an ANY request to the final answer.
*/
private void addAnyRR(SubClusterId targetId, ResourceRequest rr) {
Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName()));
Preconditions
.checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName()));
internalAddToAnswer(targetId, rr);
}
@ -552,10 +571,12 @@ private Set<SubClusterId> getActiveAndEnabledSC() {
}
/**
* Return the total number of container coming from localized requests.
* Return the total number of container coming from localized requests
* matching an allocation Id.
*/
private long getTotNumLocalizedContainers() {
return totNumLocalizedContainers;
private long getTotNumLocalizedContainers(long allocationId) {
AtomicLong c = totNumLocalizedContainers.get(allocationId);
return c == null ? 0 : c.get();
}
/**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -40,6 +41,7 @@
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -117,6 +119,21 @@ private void initializePolicy() throws YarnException {
getActiveSubclusters());
}
@Test(expected = FederationPolicyInitializationException.class)
public void testNullWeights() throws Exception {
getPolicyInfo().setAMRMPolicyWeights(null);
initializePolicy();
fail();
}
@Test(expected = FederationPolicyInitializationException.class)
public void testEmptyWeights() throws Exception {
getPolicyInfo()
.setAMRMPolicyWeights(new HashMap<SubClusterIdInfo, Float>());
initializePolicy();
fail();
}
@Test
public void testSplitBasedOnHeadroom() throws Exception {
@ -154,7 +171,7 @@ public void testSplitBasedOnHeadroom() throws Exception {
AllocateResponse ar = getAllocateResponseWithTargetHeadroom(100);
((FederationAMRMProxyPolicy) getPolicy())
.notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
((FederationAMRMProxyPolicy) getPolicy())
response = ((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
LOG.info("After headroom update");
@ -332,7 +349,7 @@ public void testSplitAllocateRequest() throws Exception {
// we expect 5 entry for subcluster1 (4 from request-id 1, and part
// of the broadcast of request-id 2
checkExpectedAllocation(response, "subcluster1", 5, 25);
checkExpectedAllocation(response, "subcluster1", 5, 26);
// sub-cluster 2 should contain 3 entry from request-id 1 and 1 from the
// broadcast of request-id 2, and no request-id 0

View File

@ -89,9 +89,6 @@ protected static void serializeAndDeserializePolicyManager(
FederationAMRMProxyPolicy federationAMRMProxyPolicy =
wfp2.getAMRMPolicy(context, null);
// needed only for tests (getARMRMPolicy change the "type" in conf)
fpc.setType(wfp.getClass().getCanonicalName());
FederationRouterPolicy federationRouterPolicy =
wfp2.getRouterPolicy(context, null);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.resolver;
import java.io.File;
import java.net.URL;
import java.util.HashSet;
import java.util.Set;
@ -46,8 +47,10 @@ public static void setUpGoodFile() {
throw new RuntimeException(
"Could not find 'nodes' dummy file in classpath");
}
// This will get rid of the beginning '/' in the url in Windows env
File file = new File(url.getPath());
conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath());
resolver.setConf(conf);
resolver.load();
}
@ -62,8 +65,10 @@ private void setUpMalformedFile() {
throw new RuntimeException(
"Could not find 'nodes-malformed' dummy file in classpath");
}
// This will get rid of the beginning '/' in the url in Windows env
File file = new File(url.getPath());
conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath());
resolver.setConf(conf);
resolver.load();
}

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.*;
import org.apache.hadoop.yarn.util.Records;
import java.io.File;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -162,7 +163,10 @@ public static SubClusterResolver initResolver() {
throw new RuntimeException(
"Could not find 'nodes' dummy file in classpath");
}
conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, url.getPath());
// This will get rid of the beginning '/' in the url in Windows env
File file = new File(url.getPath());
conf.set(YarnConfiguration.FEDERATION_MACHINE_LIST, file.getPath());
resolver.setConf(conf);
resolver.load();
return resolver;