YARN-8521. NPE in AllocationTagsManager when a container is removed more than once. Contributed by Weiwei Yang.
This commit is contained in:
parent
f5dbbfe2e9
commit
08d5060605
@ -115,6 +115,11 @@ private void addTag(T type, String tag) {
|
|||||||
|
|
||||||
private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) {
|
private void removeTagFromInnerMap(Map<String, Long> innerMap, String tag) {
|
||||||
Long count = innerMap.get(tag);
|
Long count = innerMap.get(tag);
|
||||||
|
if (count == null) {
|
||||||
|
LOG.warn("Trying to remove tags, however the tag " + tag
|
||||||
|
+ " no longer exists on this node/rack.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (count > 1) {
|
if (count > 1) {
|
||||||
innerMap.put(tag, count - 1);
|
innerMap.put(tag, count - 1);
|
||||||
} else {
|
} else {
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
@ -38,6 +39,7 @@
|
|||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
@ -60,6 +62,41 @@ public void setup() {
|
|||||||
rmContext = rm.getRMContext();
|
rmContext = rm.getRMContext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleAddRemoveContainer() {
|
||||||
|
AllocationTagsManager atm = new AllocationTagsManager(rmContext);
|
||||||
|
|
||||||
|
NodeId nodeId = NodeId.fromString("host1:123");
|
||||||
|
ContainerId cid1 = TestUtils.getMockContainerId(1, 1);
|
||||||
|
ContainerId cid2 = TestUtils.getMockContainerId(1, 2);
|
||||||
|
ContainerId cid3 = TestUtils.getMockContainerId(1, 3);
|
||||||
|
Set<String> tags1 = ImmutableSet.of("mapper", "reducer");
|
||||||
|
Set<String> tags2 = ImmutableSet.of("mapper");
|
||||||
|
Set<String> tags3 = ImmutableSet.of("zk");
|
||||||
|
|
||||||
|
// node - mapper : 2
|
||||||
|
// - reduce : 1
|
||||||
|
atm.addContainer(nodeId, cid1, tags1);
|
||||||
|
atm.addContainer(nodeId, cid2, tags2);
|
||||||
|
atm.addContainer(nodeId, cid3, tags3);
|
||||||
|
Assert.assertEquals(2L,
|
||||||
|
(long) atm.getAllocationTagsWithCount(nodeId).get("mapper"));
|
||||||
|
Assert.assertEquals(1L,
|
||||||
|
(long) atm.getAllocationTagsWithCount(nodeId).get("reducer"));
|
||||||
|
|
||||||
|
// remove container1
|
||||||
|
atm.removeContainer(nodeId, cid1, tags1);
|
||||||
|
Assert.assertEquals(1L,
|
||||||
|
(long) atm.getAllocationTagsWithCount(nodeId).get("mapper"));
|
||||||
|
Assert.assertNull(atm.getAllocationTagsWithCount(nodeId).get("reducer"));
|
||||||
|
|
||||||
|
// remove the same container again, the reducer no longer exists,
|
||||||
|
// make sure there is no NPE here
|
||||||
|
atm.removeContainer(nodeId, cid1, tags1);
|
||||||
|
Assert.assertNull(atm.getAllocationTagsWithCount(nodeId).get("mapper"));
|
||||||
|
Assert.assertNull(atm.getAllocationTagsWithCount(nodeId).get("reducer"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAllocationTagsManagerSimpleCases()
|
public void testAllocationTagsManagerSimpleCases()
|
||||||
throws InvalidAllocationTagsQueryException {
|
throws InvalidAllocationTagsQueryException {
|
||||||
|
@ -163,6 +163,11 @@ private ContainerId newContainerId(ApplicationId appId) {
|
|||||||
ApplicationAttemptId.newInstance(appId, 0), 0);
|
ApplicationAttemptId.newInstance(appId, 0), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ContainerId newContainerId(ApplicationId appId, int containerId) {
|
||||||
|
return ContainerId.newContainerId(
|
||||||
|
ApplicationAttemptId.newInstance(appId, 0), containerId);
|
||||||
|
}
|
||||||
|
|
||||||
private SchedulerNode newSchedulerNode(String hostname, String rackName,
|
private SchedulerNode newSchedulerNode(String hostname, String rackName,
|
||||||
NodeId nodeId) {
|
NodeId nodeId) {
|
||||||
SchedulerNode node = mock(SchedulerNode.class);
|
SchedulerNode node = mock(SchedulerNode.class);
|
||||||
@ -271,12 +276,10 @@ public void testMultiTagsPlacementConstraints()
|
|||||||
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
|
SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(),
|
||||||
n3_r2.getRackName(), n3_r2.getNodeID());
|
n3_r2.getRackName(), n3_r2.getNodeID());
|
||||||
|
|
||||||
ContainerId ca = ContainerId
|
ContainerId ca = newContainerId(appId1, 0);
|
||||||
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
|
||||||
tm.addContainer(n0_r1.getNodeID(), ca, ImmutableSet.of("A"));
|
tm.addContainer(n0_r1.getNodeID(), ca, ImmutableSet.of("A"));
|
||||||
|
|
||||||
ContainerId cb = ContainerId
|
ContainerId cb = newContainerId(appId1, 1);
|
||||||
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
|
||||||
tm.addContainer(n1_r1.getNodeID(), cb, ImmutableSet.of("B"));
|
tm.addContainer(n1_r1.getNodeID(), cb, ImmutableSet.of("B"));
|
||||||
|
|
||||||
// n0 and n1 has A/B so they cannot satisfy the PC
|
// n0 and n1 has A/B so they cannot satisfy the PC
|
||||||
@ -297,11 +300,9 @@ public void testMultiTagsPlacementConstraints()
|
|||||||
* n2: A(1), B(1)
|
* n2: A(1), B(1)
|
||||||
* n3:
|
* n3:
|
||||||
*/
|
*/
|
||||||
ContainerId ca1 = ContainerId
|
ContainerId ca1 = newContainerId(appId1, 2);
|
||||||
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
|
||||||
tm.addContainer(n2_r2.getNodeID(), ca1, ImmutableSet.of("A"));
|
tm.addContainer(n2_r2.getNodeID(), ca1, ImmutableSet.of("A"));
|
||||||
ContainerId cb1 = ContainerId
|
ContainerId cb1 = newContainerId(appId1, 3);
|
||||||
.newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0);
|
|
||||||
tm.addContainer(n2_r2.getNodeID(), cb1, ImmutableSet.of("B"));
|
tm.addContainer(n2_r2.getNodeID(), cb1, ImmutableSet.of("B"));
|
||||||
|
|
||||||
// Only n2 has both A and B so only it can satisfy the PC
|
// Only n2 has both A and B so only it can satisfy the PC
|
||||||
@ -468,9 +469,9 @@ public void testORConstraintAssignment()
|
|||||||
* n3: ""
|
* n3: ""
|
||||||
*/
|
*/
|
||||||
tm.addContainer(n0r1.getNodeID(),
|
tm.addContainer(n0r1.getNodeID(),
|
||||||
newContainerId(appId1), ImmutableSet.of("hbase-m"));
|
newContainerId(appId1, 1), ImmutableSet.of("hbase-m"));
|
||||||
tm.addContainer(n2r2.getNodeID(),
|
tm.addContainer(n2r2.getNodeID(),
|
||||||
newContainerId(appId1), ImmutableSet.of("hbase-rs"));
|
newContainerId(appId1, 2), ImmutableSet.of("hbase-rs"));
|
||||||
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
|
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
|
||||||
.get("hbase-m").longValue());
|
.get("hbase-m").longValue());
|
||||||
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
|
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
|
||||||
@ -504,7 +505,7 @@ public void testORConstraintAssignment()
|
|||||||
* n3: hbase-rs(1)
|
* n3: hbase-rs(1)
|
||||||
*/
|
*/
|
||||||
tm.addContainer(n3r2.getNodeID(),
|
tm.addContainer(n3r2.getNodeID(),
|
||||||
newContainerId(appId1), ImmutableSet.of("hbase-rs"));
|
newContainerId(appId1, 2), ImmutableSet.of("hbase-rs"));
|
||||||
// n3 is qualified now because it is allocated with hbase-rs tag
|
// n3 is qualified now because it is allocated with hbase-rs tag
|
||||||
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
||||||
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
|
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
|
||||||
@ -518,7 +519,7 @@ public void testORConstraintAssignment()
|
|||||||
*/
|
*/
|
||||||
// Place
|
// Place
|
||||||
tm.addContainer(n2r2.getNodeID(),
|
tm.addContainer(n2r2.getNodeID(),
|
||||||
newContainerId(appId1), ImmutableSet.of("spark"));
|
newContainerId(appId1, 3), ImmutableSet.of("spark"));
|
||||||
// According to constraint, "zk" is allowed to be placed on a node
|
// According to constraint, "zk" is allowed to be placed on a node
|
||||||
// has "hbase-m" tag OR a node has both "hbase-rs" and "spark" tags.
|
// has "hbase-m" tag OR a node has both "hbase-rs" and "spark" tags.
|
||||||
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
|
||||||
@ -552,9 +553,9 @@ public void testANDConstraintAssignment()
|
|||||||
* n3: ""
|
* n3: ""
|
||||||
*/
|
*/
|
||||||
tm.addContainer(n0r1.getNodeID(),
|
tm.addContainer(n0r1.getNodeID(),
|
||||||
newContainerId(appId1), ImmutableSet.of("hbase-m"));
|
newContainerId(appId1, 0), ImmutableSet.of("hbase-m"));
|
||||||
tm.addContainer(n2r2.getNodeID(),
|
tm.addContainer(n2r2.getNodeID(),
|
||||||
newContainerId(appId1), ImmutableSet.of("hbase-m"));
|
newContainerId(appId1, 1), ImmutableSet.of("hbase-m"));
|
||||||
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
|
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
|
||||||
.get("hbase-m").longValue());
|
.get("hbase-m").longValue());
|
||||||
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
|
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
|
||||||
@ -589,7 +590,7 @@ public void testANDConstraintAssignment()
|
|||||||
*/
|
*/
|
||||||
for (int i=0; i<4; i++) {
|
for (int i=0; i<4; i++) {
|
||||||
tm.addContainer(n1r1.getNodeID(),
|
tm.addContainer(n1r1.getNodeID(),
|
||||||
newContainerId(appId1), ImmutableSet.of("spark"));
|
newContainerId(appId1, i+2), ImmutableSet.of("spark"));
|
||||||
}
|
}
|
||||||
Assert.assertEquals(4L, tm.getAllocationTagsWithCount(n1r1.getNodeID())
|
Assert.assertEquals(4L, tm.getAllocationTagsWithCount(n1r1.getNodeID())
|
||||||
.get("spark").longValue());
|
.get("spark").longValue());
|
||||||
@ -633,19 +634,19 @@ public void testGlobalAppConstraints()
|
|||||||
* n3: ""
|
* n3: ""
|
||||||
*/
|
*/
|
||||||
tm.addContainer(n0r1.getNodeID(),
|
tm.addContainer(n0r1.getNodeID(),
|
||||||
newContainerId(application1), ImmutableSet.of("A"));
|
newContainerId(application1, 0), ImmutableSet.of("A"));
|
||||||
tm.addContainer(n0r1.getNodeID(),
|
tm.addContainer(n0r1.getNodeID(),
|
||||||
newContainerId(application2), ImmutableSet.of("A"));
|
newContainerId(application2, 1), ImmutableSet.of("A"));
|
||||||
tm.addContainer(n1r1.getNodeID(),
|
tm.addContainer(n1r1.getNodeID(),
|
||||||
newContainerId(application3), ImmutableSet.of("A"));
|
newContainerId(application3, 2), ImmutableSet.of("A"));
|
||||||
tm.addContainer(n1r1.getNodeID(),
|
tm.addContainer(n1r1.getNodeID(),
|
||||||
newContainerId(application3), ImmutableSet.of("A"));
|
newContainerId(application3, 3), ImmutableSet.of("A"));
|
||||||
tm.addContainer(n1r1.getNodeID(),
|
tm.addContainer(n1r1.getNodeID(),
|
||||||
newContainerId(application3), ImmutableSet.of("A"));
|
newContainerId(application3, 4), ImmutableSet.of("A"));
|
||||||
tm.addContainer(n2r2.getNodeID(),
|
tm.addContainer(n2r2.getNodeID(),
|
||||||
newContainerId(application1), ImmutableSet.of("A"));
|
newContainerId(application1, 5), ImmutableSet.of("A"));
|
||||||
tm.addContainer(n2r2.getNodeID(),
|
tm.addContainer(n2r2.getNodeID(),
|
||||||
newContainerId(application1), ImmutableSet.of("A"));
|
newContainerId(application1, 6), ImmutableSet.of("A"));
|
||||||
|
|
||||||
SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
|
SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
|
||||||
n0r1.getRackName(), n0r1.getNodeID());
|
n0r1.getRackName(), n0r1.getNodeID());
|
||||||
@ -888,9 +889,9 @@ public void testInterAppConstraintsByAppID()
|
|||||||
* n3: ""
|
* n3: ""
|
||||||
*/
|
*/
|
||||||
tm.addContainer(n0r1.getNodeID(),
|
tm.addContainer(n0r1.getNodeID(),
|
||||||
newContainerId(application1), ImmutableSet.of("hbase-m"));
|
newContainerId(application1, 0), ImmutableSet.of("hbase-m"));
|
||||||
tm.addContainer(n2r2.getNodeID(),
|
tm.addContainer(n2r2.getNodeID(),
|
||||||
newContainerId(application1), ImmutableSet.of("hbase-m"));
|
newContainerId(application1, 1), ImmutableSet.of("hbase-m"));
|
||||||
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
|
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
|
||||||
.get("hbase-m").longValue());
|
.get("hbase-m").longValue());
|
||||||
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
|
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
|
||||||
@ -958,7 +959,7 @@ application2, createSchedulingRequest(srcTags2),
|
|||||||
* n3: ""
|
* n3: ""
|
||||||
*/
|
*/
|
||||||
tm.addContainer(n0r1.getNodeID(),
|
tm.addContainer(n0r1.getNodeID(),
|
||||||
newContainerId(application3), ImmutableSet.of("hbase-m"));
|
newContainerId(application3, 0), ImmutableSet.of("hbase-m"));
|
||||||
|
|
||||||
// Anti-affinity to self/hbase-m
|
// Anti-affinity to self/hbase-m
|
||||||
Assert.assertFalse(PlacementConstraintsUtil
|
Assert.assertFalse(PlacementConstraintsUtil
|
||||||
|
Loading…
Reference in New Issue
Block a user