YARN-6370. Properly handle rack requests for non-active subclusters in LocalityMulticastAMRMProxyPolicy. (Contributed by Botong Huang via curino).
(cherry picked from commit ce419881c32b178c48c3a01b5a15e4e3a3e750f5)
This commit is contained in:
parent
86b2bec56e
commit
1c64e1709b
@ -261,7 +261,11 @@ public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
|
|||||||
|
|
||||||
// If home-subcluster is not active, ignore node/rack request
|
// If home-subcluster is not active, ignore node/rack request
|
||||||
if (bookkeeper.isActiveAndEnabled(homeSubcluster)) {
|
if (bookkeeper.isActiveAndEnabled(homeSubcluster)) {
|
||||||
bookkeeper.addLocalizedNodeRR(homeSubcluster, rr);
|
if (targetIds != null && targetIds.size() > 0) {
|
||||||
|
bookkeeper.addRackRR(homeSubcluster, rr);
|
||||||
|
} else {
|
||||||
|
bookkeeper.addLocalizedNodeRR(homeSubcluster, rr);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are "
|
LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are "
|
||||||
|
@ -339,19 +339,20 @@ public void testSplitAllocateRequest() throws Exception {
|
|||||||
validateSplit(response, resourceRequests);
|
validateSplit(response, resourceRequests);
|
||||||
prettyPrintRequests(response);
|
prettyPrintRequests(response);
|
||||||
|
|
||||||
// we expect 4 entry for home subcluster (3 for request-id 4, and a part
|
// we expect 7 entries for home subcluster (2 for request-id 4, 3 for
|
||||||
// of the broadcast of request-id 2
|
// request-id 5, and a part of the broadcast of request-id 2
|
||||||
checkExpectedAllocation(response, getHomeSubCluster().getId(), 4, 23);
|
checkExpectedAllocation(response, getHomeSubCluster().getId(), 7, 29);
|
||||||
|
|
||||||
// for subcluster0 we expect 3 entry from request-id 0, and 3 from
|
// for subcluster0 we expect 10 entries, 3 from request-id 0, and 3 from
|
||||||
// request-id 3, as well as part of the request-id 2 broadast
|
// request-id 3, 3 entries from request-id 5, as well as part of the
|
||||||
checkExpectedAllocation(response, "subcluster0", 7, 26);
|
// request-id 2 broadast
|
||||||
|
checkExpectedAllocation(response, "subcluster0", 10, 32);
|
||||||
|
|
||||||
// we expect 5 entry for subcluster1 (4 from request-id 1, and part
|
// we expect 5 entries for subcluster1 (4 from request-id 1, and part
|
||||||
// of the broadcast of request-id 2
|
// of the broadcast of request-id 2
|
||||||
checkExpectedAllocation(response, "subcluster1", 5, 26);
|
checkExpectedAllocation(response, "subcluster1", 5, 26);
|
||||||
|
|
||||||
// sub-cluster 2 should contain 3 entry from request-id 1 and 1 from the
|
// sub-cluster 2 should contain 3 entries from request-id 1 and 1 from the
|
||||||
// broadcast of request-id 2, and no request-id 0
|
// broadcast of request-id 2, and no request-id 0
|
||||||
checkExpectedAllocation(response, "subcluster2", 4, 23);
|
checkExpectedAllocation(response, "subcluster2", 4, 23);
|
||||||
|
|
||||||
@ -364,28 +365,33 @@ public void testSplitAllocateRequest() throws Exception {
|
|||||||
|
|
||||||
// check that the allocations that show up are what expected
|
// check that the allocations that show up are what expected
|
||||||
for (ResourceRequest rr : response.get(getHomeSubCluster())) {
|
for (ResourceRequest rr : response.get(getHomeSubCluster())) {
|
||||||
Assert.assertTrue(rr.getAllocationRequestId() == 4L
|
Assert.assertTrue(
|
||||||
|| rr.getAllocationRequestId() == 2L);
|
rr.getAllocationRequestId() == 2L || rr.getAllocationRequestId() == 4L
|
||||||
}
|
|| rr.getAllocationRequestId() == 5L);
|
||||||
|
|
||||||
for (ResourceRequest rr : response.get(getHomeSubCluster())) {
|
|
||||||
Assert.assertTrue(rr.getAllocationRequestId() != 1L);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
List<ResourceRequest> rrs =
|
List<ResourceRequest> rrs =
|
||||||
response.get(SubClusterId.newInstance("subcluster0"));
|
response.get(SubClusterId.newInstance("subcluster0"));
|
||||||
for (ResourceRequest rr : rrs) {
|
for (ResourceRequest rr : rrs) {
|
||||||
Assert.assertTrue(rr.getAllocationRequestId() != 1L);
|
Assert.assertTrue(rr.getAllocationRequestId() != 1L);
|
||||||
|
Assert.assertTrue(rr.getAllocationRequestId() != 4L);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ResourceRequest rr : response
|
||||||
|
.get(SubClusterId.newInstance("subcluster1"))) {
|
||||||
|
Assert.assertTrue(rr.getAllocationRequestId() == 1L
|
||||||
|
|| rr.getAllocationRequestId() == 2L);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ResourceRequest rr : response
|
for (ResourceRequest rr : response
|
||||||
.get(SubClusterId.newInstance("subcluster2"))) {
|
.get(SubClusterId.newInstance("subcluster2"))) {
|
||||||
Assert.assertTrue(rr.getAllocationRequestId() != 0L);
|
Assert.assertTrue(rr.getAllocationRequestId() == 1L
|
||||||
|
|| rr.getAllocationRequestId() == 2L);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ResourceRequest rr : response
|
for (ResourceRequest rr : response
|
||||||
.get(SubClusterId.newInstance("subcluster5"))) {
|
.get(SubClusterId.newInstance("subcluster5"))) {
|
||||||
Assert.assertTrue(rr.getAllocationRequestId() >= 2);
|
Assert.assertTrue(rr.getAllocationRequestId() == 2);
|
||||||
Assert.assertTrue(rr.getRelaxLocality());
|
Assert.assertTrue(rr.getRelaxLocality());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -555,7 +561,7 @@ private List<ResourceRequest> createComplexRequest() throws Exception {
|
|||||||
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
|
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
|
||||||
"subcluster2-rack3", 1024, 1, 1, 1, null, false));
|
"subcluster2-rack3", 1024, 1, 1, 1, null, false));
|
||||||
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
|
out.add(FederationPoliciesTestUtil.createResourceRequest(1L,
|
||||||
ResourceRequest.ANY, 1024, 1, 1, 2, null, false));
|
ResourceRequest.ANY, 1024, 1, 1, 3, null, false));
|
||||||
|
|
||||||
// create a non-local ANY request that can span anything
|
// create a non-local ANY request that can span anything
|
||||||
out.add(FederationPoliciesTestUtil.createResourceRequest(2L,
|
out.add(FederationPoliciesTestUtil.createResourceRequest(2L,
|
||||||
@ -578,6 +584,19 @@ private List<ResourceRequest> createComplexRequest() throws Exception {
|
|||||||
out.add(FederationPoliciesTestUtil.createResourceRequest(4L,
|
out.add(FederationPoliciesTestUtil.createResourceRequest(4L,
|
||||||
ResourceRequest.ANY, 1024, 1, 1, 1, null, false));
|
ResourceRequest.ANY, 1024, 1, 1, 1, null, false));
|
||||||
|
|
||||||
|
// create a request of two hosts, an unknown node and a known node, both in
|
||||||
|
// a known rack, and expect the unknown node to show up in homesubcluster
|
||||||
|
out.add(FederationPoliciesTestUtil.createResourceRequest(5L,
|
||||||
|
"subcluster0-rack0-host0", 1024, 1, 1, 2, null, false));
|
||||||
|
out.add(FederationPoliciesTestUtil.createResourceRequest(5L,
|
||||||
|
"subcluster0-rack0", 1024, 1, 1, 2, null, false));
|
||||||
|
out.add(FederationPoliciesTestUtil.createResourceRequest(5L, "node4", 1024,
|
||||||
|
1, 1, 2, null, false));
|
||||||
|
out.add(FederationPoliciesTestUtil.createResourceRequest(5L, "rack2", 1024,
|
||||||
|
1, 1, 2, null, false));
|
||||||
|
out.add(FederationPoliciesTestUtil.createResourceRequest(5L,
|
||||||
|
ResourceRequest.ANY, 1024, 1, 1, 4, null, false));
|
||||||
|
|
||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user