YARN-7788. Factor out management of temp tags from AllocationTagsManager. (Arun Suresh via kkaranasos)

This commit is contained in:
Konstantinos Karanasos 2018-01-22 23:51:02 -08:00 committed by Arun Suresh
parent 8bf7c44436
commit adbe87abf8
5 changed files with 336 additions and 170 deletions

View File

@ -24,17 +24,14 @@
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -61,9 +58,6 @@ public class AllocationTagsManager {
// Application's tags to Rack // Application's tags to Rack
private Map<ApplicationId, TypeToCountedTags> perAppRackMappings = private Map<ApplicationId, TypeToCountedTags> perAppRackMappings =
new HashMap<>(); new HashMap<>();
// Application's Temporary containers mapping
private Map<ApplicationId, Map<NodeId, Map<ContainerId, Set<String>>>>
appTempMappings = new HashMap<>();
// Global tags to node mapping (used to fast return aggregated tags // Global tags to node mapping (used to fast return aggregated tags
// cardinality across apps) // cardinality across apps)
@ -76,7 +70,7 @@ public class AllocationTagsManager {
* Currently used both for NodeId to Tag, Count and Rack to Tag, Count * Currently used both for NodeId to Tag, Count and Rack to Tag, Count
*/ */
@VisibleForTesting @VisibleForTesting
static class TypeToCountedTags<T> { public static class TypeToCountedTags<T> {
// Map<Type, Map<Tag, Count>> // Map<Type, Map<Tag, Count>>
private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>(); private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
@ -214,7 +208,7 @@ public Map<T, Map<String, Long>> getTypeToTagsWithCount() {
} }
@VisibleForTesting @VisibleForTesting
Map<ApplicationId, TypeToCountedTags> getPerAppNodeMappings() { public Map<ApplicationId, TypeToCountedTags> getPerAppNodeMappings() {
return perAppNodeMappings; return perAppNodeMappings;
} }
@ -233,12 +227,6 @@ TypeToCountedTags getGlobalRackMapping() {
return globalRackMapping; return globalRackMapping;
} }
@VisibleForTesting
public Map<NodeId, Map<ContainerId, Set<String>>> getAppTempMappings(
ApplicationId applicationId) {
return appTempMappings.get(applicationId);
}
public AllocationTagsManager(RMContext context) { public AllocationTagsManager(RMContext context) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock(); readLock = lock.readLock();
@ -246,39 +234,6 @@ public AllocationTagsManager(RMContext context) {
rmContext = context; rmContext = context;
} }
//
/**
* Method adds a temporary fake-container tag to Node mapping.
* Used by the constrained placement algorithm to keep track of containers
* that are currently placed on nodes but are not yet allocated.
* @param nodeId
* @param applicationId
* @param allocationTags
*/
public void addTempContainer(NodeId nodeId, ApplicationId applicationId,
Set<String> allocationTags) {
ContainerId tmpContainer = ContainerId.newContainerId(
ApplicationAttemptId.newInstance(applicationId, 1), System.nanoTime());
writeLock.lock();
try {
Map<NodeId, Map<ContainerId, Set<String>>> appTempMapping =
appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>());
Map<ContainerId, Set<String>> containerTempMapping =
appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>());
containerTempMapping.put(tmpContainer, allocationTags);
if (LOG.isDebugEnabled()) {
LOG.debug("Added TEMP container=" + tmpContainer + " with tags=["
+ StringUtils.join(allocationTags, ",") + "]");
}
} finally {
writeLock.unlock();
}
addContainer(nodeId, tmpContainer, allocationTags);
}
/** /**
* Notify container allocated on a node. * Notify container allocated on a node.
* *
@ -297,6 +252,15 @@ public void addContainer(NodeId nodeId, ContainerId containerId,
} }
ApplicationId applicationId = ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId(); containerId.getApplicationAttemptId().getApplicationId();
addTags(nodeId, applicationId, allocationTags);
if (LOG.isDebugEnabled()) {
LOG.debug("Added container=" + containerId + " with tags=["
+ StringUtils.join(allocationTags, ",") + "]");
}
}
public void addTags(NodeId nodeId, ApplicationId applicationId,
Set<String> allocationTags) {
writeLock.lock(); writeLock.lock();
try { try {
TypeToCountedTags perAppTagsMapping = perAppNodeMappings TypeToCountedTags perAppTagsMapping = perAppNodeMappings
@ -312,11 +276,6 @@ public void addContainer(NodeId nodeId, ContainerId containerId,
perAppRackTagsMapping.addTags(nodeRack, allocationTags); perAppRackTagsMapping.addTags(nodeRack, allocationTags);
globalNodeMapping.addTags(nodeId, allocationTags); globalNodeMapping.addTags(nodeId, allocationTags);
globalRackMapping.addTags(nodeRack, allocationTags); globalRackMapping.addTags(nodeRack, allocationTags);
if (LOG.isDebugEnabled()) {
LOG.debug("Added container=" + containerId + " with tags=["
+ StringUtils.join(allocationTags, ",") + "]");
}
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -339,6 +298,21 @@ public void removeContainer(NodeId nodeId,
ApplicationId applicationId = ApplicationId applicationId =
containerId.getApplicationAttemptId().getApplicationId(); containerId.getApplicationAttemptId().getApplicationId();
removeTags(nodeId, applicationId, allocationTags);
if (LOG.isDebugEnabled()) {
LOG.debug("Removed container=" + containerId + " with tags=["
+ StringUtils.join(allocationTags, ",") + "]");
}
}
/**
* Helper method to just remove the tags associated with a container.
* @param nodeId
* @param applicationId
* @param allocationTags
*/
public void removeTags(NodeId nodeId, ApplicationId applicationId,
Set<String> allocationTags) {
writeLock.lock(); writeLock.lock();
try { try {
TypeToCountedTags perAppTagsMapping = TypeToCountedTags perAppTagsMapping =
@ -364,43 +338,11 @@ public void removeContainer(NodeId nodeId,
if (perAppRackTagsMapping.isEmpty()) { if (perAppRackTagsMapping.isEmpty()) {
perAppRackMappings.remove(applicationId); perAppRackMappings.remove(applicationId);
} }
if (LOG.isDebugEnabled()) {
LOG.debug("Removed container=" + containerId + " with tags=["
+ StringUtils.join(allocationTags, ",") + "]");
}
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
} }
/**
* Method removes temporary containers associated with an application
* Used by the placement algorithm to clean temporary tags at the end of
* a placement cycle.
* @param applicationId Application Id.
*/
public void cleanTempContainers(ApplicationId applicationId) {
if (!appTempMappings.get(applicationId).isEmpty()) {
appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> {
nodeE.getValue().entrySet().stream().forEach(containerE -> {
removeContainer(nodeE.getKey(), containerE.getKey(),
containerE.getValue());
});
});
writeLock.lock();
try {
appTempMappings.remove(applicationId);
if (LOG.isDebugEnabled()) {
LOG.debug("Removed TEMP containers of app=" + applicationId);
}
} finally {
writeLock.unlock();
}
}
}
/** /**
* Get Node cardinality for a specific tag. * Get Node cardinality for a specific tag.

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil;
@ -53,13 +52,14 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
// Number of times to re-attempt placing a single scheduling request. // Number of times to re-attempt placing a single scheduling request.
private static final int RE_ATTEMPT_COUNT = 2; private static final int RE_ATTEMPT_COUNT = 2;
private AllocationTagsManager tagsManager; private LocalAllocationTagsManager tagsManager;
private PlacementConstraintManager constraintManager; private PlacementConstraintManager constraintManager;
private NodeCandidateSelector nodeSelector; private NodeCandidateSelector nodeSelector;
@Override @Override
public void init(RMContext rmContext) { public void init(RMContext rmContext) {
this.tagsManager = rmContext.getAllocationTagsManager(); this.tagsManager = new LocalAllocationTagsManager(
rmContext.getAllocationTagsManager());
this.constraintManager = rmContext.getPlacementConstraintManager(); this.constraintManager = rmContext.getPlacementConstraintManager();
this.nodeSelector = this.nodeSelector =
filter -> ((AbstractYarnScheduler) (rmContext).getScheduler()) filter -> ((AbstractYarnScheduler) (rmContext).getScheduler())
@ -143,7 +143,7 @@ private void doPlacement(BatchedRequests requests,
numAllocs = numAllocs =
schedulingRequest.getResourceSizing().getNumAllocations(); schedulingRequest.getResourceSizing().getNumAllocations();
// Add temp-container tags for current placement cycle // Add temp-container tags for current placement cycle
this.tagsManager.addTempContainer(node.getNodeID(), this.tagsManager.addTempTags(node.getNodeID(),
requests.getApplicationId(), requests.getApplicationId(),
schedulingRequest.getAllocationTags()); schedulingRequest.getAllocationTags());
lastSatisfiedNode = node; lastSatisfiedNode = node;

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongBinaryOperator;
class LocalAllocationTagsManager extends AllocationTagsManager {
private static final Logger LOG =
LoggerFactory.getLogger(LocalAllocationTagsManager.class);
private final AllocationTagsManager tagsManager;
// Application's Temporary containers mapping
private Map<ApplicationId, Map<NodeId, Map<String, AtomicInteger>>>
appTempMappings = new HashMap<>();
LocalAllocationTagsManager(
AllocationTagsManager allocationTagsManager) {
super(null);
this.tagsManager = allocationTagsManager;
}
void addTempTags(NodeId nodeId,
ApplicationId applicationId, Set<String> allocationTags) {
Map<NodeId, Map<String, AtomicInteger>> appTempMapping =
appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>());
Map<String, AtomicInteger> containerTempMapping =
appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>());
for (String tag : allocationTags) {
containerTempMapping.computeIfAbsent(tag,
k -> new AtomicInteger(0)).incrementAndGet();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Added TEMP container with tags=["
+ StringUtils.join(allocationTags, ",") + "]");
}
tagsManager.addTags(nodeId, applicationId, allocationTags);
}
void removeTempTags(NodeId nodeId, ApplicationId applicationId,
Set<String> allocationTags) {
Map<NodeId, Map<String, AtomicInteger>> appTempMapping =
appTempMappings.get(applicationId);
if (appTempMapping != null) {
Map<String, AtomicInteger> containerTempMap =
appTempMapping.get(nodeId);
if (containerTempMap != null) {
for (String tag : allocationTags) {
AtomicInteger count = containerTempMap.get(tag);
if (count != null) {
if (count.decrementAndGet() <= 0) {
containerTempMap.remove(tag);
}
}
}
}
}
if (allocationTags != null) {
removeTags(nodeId, applicationId, allocationTags);
}
}
/**
* Method removes temporary containers associated with an application
* Used by the placement algorithm to clean temporary tags at the end of
* a placement cycle.
* @param applicationId Application Id.
*/
public void cleanTempContainers(ApplicationId applicationId) {
if (!appTempMappings.get(applicationId).isEmpty()) {
appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> {
nodeE.getValue().entrySet().stream().forEach(tagE -> {
for (int i = 0; i < tagE.getValue().get(); i++) {
removeTags(nodeE.getKey(), applicationId,
Collections.singleton(tagE.getKey()));
}
});
});
appTempMappings.remove(applicationId);
if (LOG.isDebugEnabled()) {
LOG.debug("Removed TEMP containers of app=" + applicationId);
}
}
}
@Override
public void addContainer(NodeId nodeId, ContainerId containerId,
Set<String> allocationTags) {
tagsManager.addContainer(nodeId, containerId, allocationTags);
}
@Override
public void removeContainer(NodeId nodeId, ContainerId containerId,
Set<String> allocationTags) {
tagsManager.removeContainer(nodeId, containerId, allocationTags);
}
@Override
public void removeTags(NodeId nodeId, ApplicationId applicationId,
Set<String> allocationTags) {
tagsManager.removeTags(nodeId, applicationId, allocationTags);
}
@Override
public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId,
String tag) throws InvalidAllocationTagsQueryException {
return tagsManager.getNodeCardinality(nodeId, applicationId, tag);
}
@Override
public long getRackCardinality(String rack, ApplicationId applicationId,
String tag) throws InvalidAllocationTagsQueryException {
return tagsManager.getRackCardinality(rack, applicationId, tag);
}
@Override
public boolean allocationTagExistsOnNode(NodeId nodeId,
ApplicationId applicationId, String tag)
throws InvalidAllocationTagsQueryException {
return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag);
}
@Override
public long getNodeCardinalityByOp(NodeId nodeId,
ApplicationId applicationId, Set<String> tags, LongBinaryOperator op)
throws InvalidAllocationTagsQueryException {
return tagsManager.getNodeCardinalityByOp(nodeId, applicationId, tags, op);
}
@Override
public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
Set<String> tags, LongBinaryOperator op)
throws InvalidAllocationTagsQueryException {
return tagsManager.getRackCardinalityByOp(rack, applicationId, tags, op);
}
}

View File

@ -23,7 +23,6 @@
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -362,87 +361,6 @@ public void testAllocationTagsManagerMemoryAfterCleanup()
Assert.assertEquals(0, atm.getPerAppRackMappings().size()); Assert.assertEquals(0, atm.getPerAppRackMappings().size());
} }
@Test
public void testTempContainerAllocations()
throws InvalidAllocationTagsQueryException {
/**
* Construct both TEMP and normal containers: Node1: TEMP container_1_1
* (mapper/reducer/app_1) container_1_2 (service/app_1)
*
* Node2: container_1_3 (reducer/app_1) TEMP container_2_1 (service/app_2)
*/
AllocationTagsManager atm = new AllocationTagsManager(rmContext);
// 3 Containers from app1
atm.addTempContainer(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockContainerId(1, 2), ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockContainerId(1, 3), ImmutableSet.of("reducer"));
// 1 Container from app2
atm.addTempContainer(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"));
// Expect tag mappings to be present including temp Tags
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::sum));
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
Long::sum));
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
Long::sum));
// Do a temp Tag cleanup on app2
atm.cleanTempContainers(TestUtils.getMockApplicationId(2));
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
Long::sum));
// Expect app1 to be unaffected
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::sum));
// Do a cleanup on app1 as well
atm.cleanTempContainers(TestUtils.getMockApplicationId(1));
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::sum));
// Non temp-tags should be unaffected
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
Long::sum));
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
Long::sum));
// Expect app2 with no containers, and app1 with 2 containers across 2 nodes
Assert.assertEquals(2,
atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1))
.getTypeToTagsWithCount().size());
Assert.assertNull(
atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2)));
}
@Test @Test
public void testQueryCardinalityWithIllegalParameters() public void testQueryCardinalityWithIllegalParameters()
throws InvalidAllocationTagsQueryException { throws InvalidAllocationTagsQueryException {

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
/**
* Tests the LocalAllocationTagsManager.
*/
public class TestLocalAllocationTagsManager {
private RMContext rmContext;
@Before
public void setup() {
MockRM rm = new MockRM();
rm.start();
MockNodes.resetHostIds();
List<RMNode> rmNodes =
MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
for (RMNode rmNode : rmNodes) {
rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode);
}
rmContext = rm.getRMContext();
}
@Test
public void testTempContainerAllocations()
throws InvalidAllocationTagsQueryException {
/**
* Construct both TEMP and normal containers: Node1: TEMP container_1_1
* (mapper/reducer/app_1) container_1_2 (service/app_1)
*
* Node2: container_1_3 (reducer/app_1) TEMP container_2_1 (service/app_2)
*/
AllocationTagsManager atm = new AllocationTagsManager(rmContext);
LocalAllocationTagsManager ephAtm =
new LocalAllocationTagsManager(atm);
// 3 Containers from app1
ephAtm.addTempTags(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1),
ImmutableSet.of("mapper", "reducer"));
atm.addContainer(NodeId.fromString("host1:123"),
TestUtils.getMockContainerId(1, 2), ImmutableSet.of("service"));
atm.addContainer(NodeId.fromString("host2:123"),
TestUtils.getMockContainerId(1, 3), ImmutableSet.of("reducer"));
// 1 Container from app2
ephAtm.addTempTags(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"));
// Expect tag mappings to be present including temp Tags
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::sum));
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
Long::sum));
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
Long::sum));
// Do a temp Tag cleanup on app2
ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(2));
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
Long::sum));
// Expect app1 to be unaffected
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::sum));
// Do a cleanup on app1 as well
ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(1));
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
Long::sum));
// Non temp-tags should be unaffected
Assert.assertEquals(1,
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
Long::sum));
Assert.assertEquals(0,
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
Long::sum));
// Expect app2 with no containers, and app1 with 2 containers across 2 nodes
Assert.assertEquals(2,
atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1))
.getTypeToTagsWithCount().size());
Assert.assertNull(
atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2)));
}
}