YARN-8697. LocalityMulticastAMRMProxyPolicy should fallback to random sub-cluster when cannot resolve resource. Contributed by Botong Huang.

This commit is contained in:
Giovanni Matteo Fumarola 2018-08-28 16:01:35 -07:00
parent 3e18b957eb
commit 7ed458b255
2 changed files with 125 additions and 33 deletions

View File

@ -21,8 +21,11 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -123,6 +126,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class); LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class);
private static Random rand = new Random();
private Map<SubClusterId, Float> weights; private Map<SubClusterId, Float> weights;
private SubClusterResolver resolver; private SubClusterResolver resolver;
@ -275,26 +280,18 @@ public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
} }
// Handle node/rack requests that the SubClusterResolver cannot map to // Handle node/rack requests that the SubClusterResolver cannot map to
// any cluster. Defaulting to home subcluster. // any cluster. Pick a random sub-cluster from active and enabled ones.
targetId = getSubClusterForUnResolvedRequest(bookkeeper,
rr.getAllocationRequestId());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("ERROR resolving sub-cluster for resourceName: " LOG.debug("ERROR resolving sub-cluster for resourceName: "
+ rr.getResourceName() + " we are falling back to homeSubCluster:" + rr.getResourceName() + ", picked a random subcluster to forward:"
+ homeSubcluster); + targetId);
} }
if (targetIds != null && targetIds.size() > 0) {
// If home-subcluster is not active, ignore node/rack request bookkeeper.addRackRR(targetId, rr);
if (bookkeeper.isActiveAndEnabled(homeSubcluster)) {
if (targetIds != null && targetIds.size() > 0) {
bookkeeper.addRackRR(homeSubcluster, rr);
} else {
bookkeeper.addLocalizedNodeRR(homeSubcluster, rr);
}
} else { } else {
if (LOG.isDebugEnabled()) { bookkeeper.addLocalizedNodeRR(targetId, rr);
LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are "
+ "defaulting to is not active, the ResourceRequest "
+ "will be ignored.");
}
} }
} }
@ -313,6 +310,14 @@ public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
return bookkeeper.getAnswer(); return bookkeeper.getAnswer();
} }
/**
* For unit test to override.
*/
protected SubClusterId getSubClusterForUnResolvedRequest(
AllocationBookkeeper bookKeeper, long allocationId) {
return bookKeeper.getSubClusterForUnResolvedRequest(allocationId);
}
/** /**
* It splits a list of non-localized resource requests among sub-clusters. * It splits a list of non-localized resource requests among sub-clusters.
*/ */
@ -512,10 +517,11 @@ private float getHeadroomWeighting(SubClusterId targetId,
* This helper class is used to book-keep the requests made to each * This helper class is used to book-keep the requests made to each
* subcluster, and maintain useful statistics to split ANY requests. * subcluster, and maintain useful statistics to split ANY requests.
*/ */
private final class AllocationBookkeeper { protected final class AllocationBookkeeper {
// the answer being accumulated // the answer being accumulated
private Map<SubClusterId, List<ResourceRequest>> answer = new TreeMap<>(); private Map<SubClusterId, List<ResourceRequest>> answer = new TreeMap<>();
private Map<SubClusterId, Set<Long>> maskForRackDeletion = new HashMap<>();
// stores how many containers we have allocated in each RM for localized // stores how many containers we have allocated in each RM for localized
// asks, used to correctly "spread" the corresponding ANY // asks, used to correctly "spread" the corresponding ANY
@ -523,6 +529,10 @@ private final class AllocationBookkeeper {
new HashMap<>(); new HashMap<>();
private Map<Long, AtomicLong> totNumLocalizedContainers = new HashMap<>(); private Map<Long, AtomicLong> totNumLocalizedContainers = new HashMap<>();
// Store the randomly selected subClusterId for unresolved resource requests
// keyed by requestId
private Map<Long, SubClusterId> unResolvedRequestLocation = new HashMap<>();
private Set<SubClusterId> activeAndEnabledSC = new HashSet<>(); private Set<SubClusterId> activeAndEnabledSC = new HashSet<>();
private float totHeadroomMemory = 0; private float totHeadroomMemory = 0;
private int totHeadRoomEnabledRMs = 0; private int totHeadRoomEnabledRMs = 0;
@ -538,6 +548,7 @@ private void reinitialize(
// reset data structures // reset data structures
answer.clear(); answer.clear();
maskForRackDeletion.clear();
countContainersPerRM.clear(); countContainersPerRM.clear();
totNumLocalizedContainers.clear(); totNumLocalizedContainers.clear();
activeAndEnabledSC.clear(); activeAndEnabledSC.clear();
@ -628,16 +639,16 @@ private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) {
.addAndGet(rr.getNumContainers()); .addAndGet(rr.getNumContainers());
} }
internalAddToAnswer(targetId, rr); internalAddToAnswer(targetId, rr, false);
} }
/** /**
* Add a rack-local request to the final asnwer. * Add a rack-local request to the final asnwer.
*/ */
public void addRackRR(SubClusterId targetId, ResourceRequest rr) { private void addRackRR(SubClusterId targetId, ResourceRequest rr) {
Preconditions Preconditions
.checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName())); .checkArgument(!ResourceRequest.isAnyLocation(rr.getResourceName()));
internalAddToAnswer(targetId, rr); internalAddToAnswer(targetId, rr, true);
} }
/** /**
@ -646,17 +657,45 @@ public void addRackRR(SubClusterId targetId, ResourceRequest rr) {
private void addAnyRR(SubClusterId targetId, ResourceRequest rr) { private void addAnyRR(SubClusterId targetId, ResourceRequest rr) {
Preconditions Preconditions
.checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName())); .checkArgument(ResourceRequest.isAnyLocation(rr.getResourceName()));
internalAddToAnswer(targetId, rr); internalAddToAnswer(targetId, rr, false);
} }
private void internalAddToAnswer(SubClusterId targetId, private void internalAddToAnswer(SubClusterId targetId,
ResourceRequest partialRR) { ResourceRequest partialRR, boolean isRack) {
if (!isRack) {
if (!maskForRackDeletion.containsKey(targetId)) {
maskForRackDeletion.put(targetId, new HashSet<Long>());
}
maskForRackDeletion.get(targetId)
.add(partialRR.getAllocationRequestId());
}
if (!answer.containsKey(targetId)) { if (!answer.containsKey(targetId)) {
answer.put(targetId, new ArrayList<ResourceRequest>()); answer.put(targetId, new ArrayList<ResourceRequest>());
} }
answer.get(targetId).add(partialRR); answer.get(targetId).add(partialRR);
} }
/**
* For requests whose location cannot be resolved, choose an active and
* enabled sub-cluster to forward this requestId to.
*/
private SubClusterId getSubClusterForUnResolvedRequest(long allocationId) {
if (unResolvedRequestLocation.containsKey(allocationId)) {
return unResolvedRequestLocation.get(allocationId);
}
int id = rand.nextInt(activeAndEnabledSC.size());
for (SubClusterId subclusterId : activeAndEnabledSC) {
if (id == 0) {
unResolvedRequestLocation.put(allocationId, subclusterId);
return subclusterId;
}
id--;
}
throw new RuntimeException(
"Should not be here. activeAndEnabledSC size = "
+ activeAndEnabledSC.size() + " id = " + id);
}
/** /**
* Return all known subclusters associated with an allocation id. * Return all known subclusters associated with an allocation id.
* *
@ -678,6 +717,28 @@ private Set<SubClusterId> getSubClustersForId(long allocationId) {
* @return the answer * @return the answer
*/ */
private Map<SubClusterId, List<ResourceRequest>> getAnswer() { private Map<SubClusterId, List<ResourceRequest>> getAnswer() {
Iterator<Entry<SubClusterId, List<ResourceRequest>>> answerIter =
answer.entrySet().iterator();
// Remove redundant rack RR before returning the answer
while (answerIter.hasNext()) {
Entry<SubClusterId, List<ResourceRequest>> entry = answerIter.next();
SubClusterId scId = entry.getKey();
Set<Long> mask = maskForRackDeletion.get(scId);
if (mask != null) {
Iterator<ResourceRequest> rrIter = entry.getValue().iterator();
while (rrIter.hasNext()) {
ResourceRequest rr = rrIter.next();
if (!mask.contains(rr.getAllocationRequestId())) {
rrIter.remove();
}
}
}
if (mask == null || entry.getValue().size() == 0) {
answerIter.remove();
LOG.info("removing {} from output because it has only rack RR",
scId);
}
}
return answer; return answer;
} }

View File

@ -69,12 +69,12 @@ public class TestLocalityMulticastAMRMProxyPolicy
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
setPolicy(new LocalityMulticastAMRMProxyPolicy()); setPolicy(new TestableLocalityMulticastAMRMProxyPolicy());
setPolicyInfo(new WeightedPolicyInfo()); setPolicyInfo(new WeightedPolicyInfo());
Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>(); Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>(); Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
// simulate 20 subclusters with a 5% chance of being inactive // Six sub-clusters with one inactive and one disabled
for (int i = 0; i < 6; i++) { for (int i = 0; i < 6; i++) {
SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i); SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i);
// sub-cluster 3 is not active // sub-cluster 3 is not active
@ -207,6 +207,7 @@ public void testStressPolicy() throws Exception {
getPolicyInfo().setHeadroomAlpha(1.0f); getPolicyInfo().setHeadroomAlpha(1.0f);
initializePolicy(); initializePolicy();
addHomeSubClusterAsActive();
int numRR = 1000; int numRR = 1000;
List<ResourceRequest> resourceRequests = createLargeRandomList(numRR); List<ResourceRequest> resourceRequests = createLargeRandomList(numRR);
@ -324,14 +325,11 @@ Collections.<NodeReport> emptyList(),
null, Collections.<NMToken> emptyList()); null, Collections.<NMToken> emptyList());
} }
@Test /**
public void testSplitAllocateRequest() throws Exception { * modify default initialization to include a "homesubcluster" which we will
* use as the default for when nodes or racks are unknown.
// Test a complex List<ResourceRequest> is split correctly */
initializePolicy(); private void addHomeSubClusterAsActive() {
// modify default initialization to include a "homesubcluster"
// which we will use as the default for when nodes or racks are unknown
SubClusterInfo sci = mock(SubClusterInfo.class); SubClusterInfo sci = mock(SubClusterInfo.class);
when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
when(sci.getSubClusterId()).thenReturn(getHomeSubCluster()); when(sci.getSubClusterId()).thenReturn(getHomeSubCluster());
@ -340,6 +338,14 @@ public void testSplitAllocateRequest() throws Exception {
getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f); getPolicyInfo().getRouterPolicyWeights().put(sc, 0.1f);
getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f); getPolicyInfo().getAMRMPolicyWeights().put(sc, 0.1f);
}
@Test
public void testSplitAllocateRequest() throws Exception {
// Test a complex List<ResourceRequest> is split correctly
initializePolicy();
addHomeSubClusterAsActive();
FederationPoliciesTestUtil.initializePolicyContext( FederationPoliciesTestUtil.initializePolicyContext(
getFederationPolicyContext(), getPolicy(), getPolicyInfo(), getFederationPolicyContext(), getPolicy(), getPolicyInfo(),
@ -502,7 +508,8 @@ private void validateSplit(Map<SubClusterId, List<ResourceRequest>> split,
// Test target Ids // Test target Ids
for (SubClusterId targetId : split.keySet()) { for (SubClusterId targetId : split.keySet()) {
Assert.assertTrue("Target subclusters should be in the active set", Assert.assertTrue(
"Target subcluster " + targetId + " should be in the active set",
getActiveSubclusters().containsKey(targetId)); getActiveSubclusters().containsKey(targetId));
Assert.assertTrue( Assert.assertTrue(
"Target subclusters (" + targetId + ") should have weight >0 in " "Target subclusters (" + targetId + ") should have weight >0 in "
@ -787,4 +794,28 @@ public void testSubClusterExpiry() throws Exception {
checkTotalContainerAllocation(response, 100); checkTotalContainerAllocation(response, 100);
} }
/**
* A testable version of LocalityMulticastAMRMProxyPolicy that
* deterministically falls back to home sub-cluster for unresolved requests.
*/
private class TestableLocalityMulticastAMRMProxyPolicy
extends LocalityMulticastAMRMProxyPolicy {
@Override
protected SubClusterId getSubClusterForUnResolvedRequest(
AllocationBookkeeper bookkeeper, long allocationId) {
SubClusterId originalResult =
super.getSubClusterForUnResolvedRequest(bookkeeper, allocationId);
Map<SubClusterId, SubClusterInfo> activeClusters = null;
try {
activeClusters = getActiveSubclusters();
} catch (YarnException e) {
throw new RuntimeException(e);
}
// The randomly selected sub-cluster should at least be active
Assert.assertTrue(activeClusters.containsKey(originalResult));
// Alwasy use home sub-cluster so that unit test is deterministic
return getHomeSubCluster();
}
}
} }