diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java index 962e548809..7ad5e8c372 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -24,17 +24,14 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; -import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.log4j.Logger; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -61,9 +58,6 @@ public class AllocationTagsManager { // Application's tags to Rack private Map perAppRackMappings = new HashMap<>(); - // Application's Temporary containers mapping - private Map>>> - appTempMappings = new HashMap<>(); // Global tags to node mapping (used to fast return aggregated tags // cardinality across apps) @@ -76,7 +70,7 @@ public class AllocationTagsManager { * Currently used both for NodeId to Tag, Count and Rack to Tag, Count */ @VisibleForTesting - static class TypeToCountedTags { + public static class TypeToCountedTags { // Map> private Map> typeToTagsWithCount = new HashMap<>(); @@ -214,7 +208,7 @@ public Map> getTypeToTagsWithCount() { } @VisibleForTesting - Map getPerAppNodeMappings() { + public Map getPerAppNodeMappings() { return perAppNodeMappings; } @@ -233,12 +227,6 @@ TypeToCountedTags getGlobalRackMapping() { return globalRackMapping; } - @VisibleForTesting - public Map>> getAppTempMappings( - ApplicationId applicationId) { - return appTempMappings.get(applicationId); - } - public AllocationTagsManager(RMContext context) { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); @@ -246,39 +234,6 @@ public AllocationTagsManager(RMContext context) { rmContext = context; } - // - - /** - * Method adds a temporary fake-container tag to Node mapping. - * Used by the constrained placement algorithm to keep track of containers - * that are currently placed on nodes but are not yet allocated. - * @param nodeId - * @param applicationId - * @param allocationTags - */ - public void addTempContainer(NodeId nodeId, ApplicationId applicationId, - Set allocationTags) { - ContainerId tmpContainer = ContainerId.newContainerId( - ApplicationAttemptId.newInstance(applicationId, 1), System.nanoTime()); - - writeLock.lock(); - try { - Map>> appTempMapping = - appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>()); - Map> containerTempMapping = - appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>()); - containerTempMapping.put(tmpContainer, allocationTags); - if (LOG.isDebugEnabled()) { - LOG.debug("Added TEMP container=" + tmpContainer + " with tags=[" - + StringUtils.join(allocationTags, ",") + "]"); - } - } finally { - writeLock.unlock(); - } - - addContainer(nodeId, tmpContainer, allocationTags); - } - /** * Notify container allocated on a node. * @@ -297,6 +252,15 @@ public void addContainer(NodeId nodeId, ContainerId containerId, } ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); + addTags(nodeId, applicationId, allocationTags); + if (LOG.isDebugEnabled()) { + LOG.debug("Added container=" + containerId + " with tags=[" + + StringUtils.join(allocationTags, ",") + "]"); + } + } + + public void addTags(NodeId nodeId, ApplicationId applicationId, + Set allocationTags) { writeLock.lock(); try { TypeToCountedTags perAppTagsMapping = perAppNodeMappings @@ -312,11 +276,6 @@ public void addContainer(NodeId nodeId, ContainerId containerId, perAppRackTagsMapping.addTags(nodeRack, allocationTags); globalNodeMapping.addTags(nodeId, allocationTags); globalRackMapping.addTags(nodeRack, allocationTags); - - if (LOG.isDebugEnabled()) { - LOG.debug("Added container=" + containerId + " with tags=[" - + StringUtils.join(allocationTags, ",") + "]"); - } } finally { writeLock.unlock(); } @@ -339,6 +298,21 @@ public void removeContainer(NodeId nodeId, ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); + removeTags(nodeId, applicationId, allocationTags); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed container=" + containerId + " with tags=[" + + StringUtils.join(allocationTags, ",") + "]"); + } + } + + /** + * Helper method to just remove the tags associated with a container. + * @param nodeId + * @param applicationId + * @param allocationTags + */ + public void removeTags(NodeId nodeId, ApplicationId applicationId, + Set allocationTags) { writeLock.lock(); try { TypeToCountedTags perAppTagsMapping = @@ -364,43 +338,11 @@ public void removeContainer(NodeId nodeId, if (perAppRackTagsMapping.isEmpty()) { perAppRackMappings.remove(applicationId); } - - if (LOG.isDebugEnabled()) { - LOG.debug("Removed container=" + containerId + " with tags=[" - + StringUtils.join(allocationTags, ",") + "]"); - } } finally { writeLock.unlock(); } } - /** - * Method removes temporary containers associated with an application - * Used by the placement algorithm to clean temporary tags at the end of - * a placement cycle. - * @param applicationId Application Id. - */ - public void cleanTempContainers(ApplicationId applicationId) { - - if (!appTempMappings.get(applicationId).isEmpty()) { - appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> { - nodeE.getValue().entrySet().stream().forEach(containerE -> { - removeContainer(nodeE.getKey(), containerE.getKey(), - containerE.getValue()); - }); - }); - writeLock.lock(); - try { - appTempMappings.remove(applicationId); - if (LOG.isDebugEnabled()) { - LOG.debug("Removed TEMP containers of app=" + applicationId); - } - } finally { - writeLock.unlock(); - } - } - } - /** * Get Node cardinality for a specific tag. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java index cf2ed15521..9887749e3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java @@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintsUtil; @@ -53,13 +52,14 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { // Number of times to re-attempt placing a single scheduling request. private static final int RE_ATTEMPT_COUNT = 2; - private AllocationTagsManager tagsManager; + private LocalAllocationTagsManager tagsManager; private PlacementConstraintManager constraintManager; private NodeCandidateSelector nodeSelector; @Override public void init(RMContext rmContext) { - this.tagsManager = rmContext.getAllocationTagsManager(); + this.tagsManager = new LocalAllocationTagsManager( + rmContext.getAllocationTagsManager()); this.constraintManager = rmContext.getPlacementConstraintManager(); this.nodeSelector = filter -> ((AbstractYarnScheduler) (rmContext).getScheduler()) @@ -143,7 +143,7 @@ private void doPlacement(BatchedRequests requests, numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); // Add temp-container tags for current placement cycle - this.tagsManager.addTempContainer(node.getNodeID(), + this.tagsManager.addTempTags(node.getNodeID(), requests.getApplicationId(), schedulingRequest.getAllocationTags()); lastSatisfiedNode = node; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java new file mode 100644 index 0000000000..9472719ae6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/LocalAllocationTagsManager.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.LongBinaryOperator; + +class LocalAllocationTagsManager extends AllocationTagsManager { + + private static final Logger LOG = + LoggerFactory.getLogger(LocalAllocationTagsManager.class); + + private final AllocationTagsManager tagsManager; + + // Application's Temporary containers mapping + private Map>> + appTempMappings = new HashMap<>(); + + LocalAllocationTagsManager( + AllocationTagsManager allocationTagsManager) { + super(null); + this.tagsManager = allocationTagsManager; + } + + void addTempTags(NodeId nodeId, + ApplicationId applicationId, Set allocationTags) { + Map> appTempMapping = + appTempMappings.computeIfAbsent(applicationId, k -> new HashMap<>()); + Map containerTempMapping = + appTempMapping.computeIfAbsent(nodeId, k -> new HashMap<>()); + for (String tag : allocationTags) { + containerTempMapping.computeIfAbsent(tag, + k -> new AtomicInteger(0)).incrementAndGet(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Added TEMP container with tags=[" + + StringUtils.join(allocationTags, ",") + "]"); + } + tagsManager.addTags(nodeId, applicationId, allocationTags); + } + + void removeTempTags(NodeId nodeId, ApplicationId applicationId, + Set allocationTags) { + Map> appTempMapping = + appTempMappings.get(applicationId); + if (appTempMapping != null) { + Map containerTempMap = + appTempMapping.get(nodeId); + if (containerTempMap != null) { + for (String tag : allocationTags) { + AtomicInteger count = containerTempMap.get(tag); + if (count != null) { + if (count.decrementAndGet() <= 0) { + containerTempMap.remove(tag); + } + } + } + } + } + if (allocationTags != null) { + removeTags(nodeId, applicationId, allocationTags); + } + } + + /** + * Method removes temporary containers associated with an application + * Used by the placement algorithm to clean temporary tags at the end of + * a placement cycle. + * @param applicationId Application Id. + */ + public void cleanTempContainers(ApplicationId applicationId) { + + if (!appTempMappings.get(applicationId).isEmpty()) { + appTempMappings.get(applicationId).entrySet().stream().forEach(nodeE -> { + nodeE.getValue().entrySet().stream().forEach(tagE -> { + for (int i = 0; i < tagE.getValue().get(); i++) { + removeTags(nodeE.getKey(), applicationId, + Collections.singleton(tagE.getKey())); + } + }); + }); + appTempMappings.remove(applicationId); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed TEMP containers of app=" + applicationId); + } + } + } + + @Override + public void addContainer(NodeId nodeId, ContainerId containerId, + Set allocationTags) { + tagsManager.addContainer(nodeId, containerId, allocationTags); + } + + @Override + public void removeContainer(NodeId nodeId, ContainerId containerId, + Set allocationTags) { + tagsManager.removeContainer(nodeId, containerId, allocationTags); + } + + @Override + public void removeTags(NodeId nodeId, ApplicationId applicationId, + Set allocationTags) { + tagsManager.removeTags(nodeId, applicationId, allocationTags); + } + + @Override + public long getNodeCardinality(NodeId nodeId, ApplicationId applicationId, + String tag) throws InvalidAllocationTagsQueryException { + return tagsManager.getNodeCardinality(nodeId, applicationId, tag); + } + + @Override + public long getRackCardinality(String rack, ApplicationId applicationId, + String tag) throws InvalidAllocationTagsQueryException { + return tagsManager.getRackCardinality(rack, applicationId, tag); + } + + @Override + public boolean allocationTagExistsOnNode(NodeId nodeId, + ApplicationId applicationId, String tag) + throws InvalidAllocationTagsQueryException { + return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag); + } + + @Override + public long getNodeCardinalityByOp(NodeId nodeId, + ApplicationId applicationId, Set tags, LongBinaryOperator op) + throws InvalidAllocationTagsQueryException { + return tagsManager.getNodeCardinalityByOp(nodeId, applicationId, tags, op); + } + + @Override + public long getRackCardinalityByOp(String rack, ApplicationId applicationId, + Set tags, LongBinaryOperator op) + throws InvalidAllocationTagsQueryException { + return tagsManager.getRackCardinalityByOp(rack, applicationId, tags, op); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java index 7afe4ef584..76f451e919 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -362,87 +361,6 @@ public void testAllocationTagsManagerMemoryAfterCleanup() Assert.assertEquals(0, atm.getPerAppRackMappings().size()); } - @Test - public void testTempContainerAllocations() - throws InvalidAllocationTagsQueryException { - /** - * Construct both TEMP and normal containers: Node1: TEMP container_1_1 - * (mapper/reducer/app_1) container_1_2 (service/app_1) - * - * Node2: container_1_3 (reducer/app_1) TEMP container_2_1 (service/app_2) - */ - - AllocationTagsManager atm = new AllocationTagsManager(rmContext); - - // 3 Containers from app1 - atm.addTempContainer(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), - ImmutableSet.of("mapper", "reducer")); - - atm.addContainer(NodeId.fromString("host1:123"), - TestUtils.getMockContainerId(1, 2), ImmutableSet.of("service")); - - atm.addContainer(NodeId.fromString("host2:123"), - TestUtils.getMockContainerId(1, 3), ImmutableSet.of("reducer")); - - // 1 Container from app2 - atm.addTempContainer(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), ImmutableSet.of("service")); - - // Expect tag mappings to be present including temp Tags - Assert.assertEquals(1, - atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), - Long::sum)); - - Assert.assertEquals(1, - atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("service"), - Long::sum)); - - Assert.assertEquals(1, - atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), - Long::sum)); - - // Do a temp Tag cleanup on app2 - atm.cleanTempContainers(TestUtils.getMockApplicationId(2)); - Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), - Long::sum)); - // Expect app1 to be unaffected - Assert.assertEquals(1, - atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), - Long::sum)); - // Do a cleanup on app1 as well - atm.cleanTempContainers(TestUtils.getMockApplicationId(1)); - Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), - Long::sum)); - - // Non temp-tags should be unaffected - Assert.assertEquals(1, - atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), - TestUtils.getMockApplicationId(1), ImmutableSet.of("service"), - Long::sum)); - - Assert.assertEquals(0, - atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), - TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), - Long::sum)); - - // Expect app2 with no containers, and app1 with 2 containers across 2 nodes - Assert.assertEquals(2, - atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1)) - .getTypeToTagsWithCount().size()); - - Assert.assertNull( - atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2))); - } - @Test public void testQueryCardinalityWithIllegalParameters() throws InvalidAllocationTagsQueryException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java new file mode 100644 index 0000000000..0b9657f15d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/TestLocalAllocationTagsManager.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm; + +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +/** + * Tests the LocalAllocationTagsManager. + */ +public class TestLocalAllocationTagsManager { + + private RMContext rmContext; + + @Before + public void setup() { + MockRM rm = new MockRM(); + rm.start(); + MockNodes.resetHostIds(); + List rmNodes = + MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4)); + for (RMNode rmNode : rmNodes) { + rm.getRMContext().getRMNodes().putIfAbsent(rmNode.getNodeID(), rmNode); + } + rmContext = rm.getRMContext(); + } + + @Test + public void testTempContainerAllocations() + throws InvalidAllocationTagsQueryException { + /** + * Construct both TEMP and normal containers: Node1: TEMP container_1_1 + * (mapper/reducer/app_1) container_1_2 (service/app_1) + * + * Node2: container_1_3 (reducer/app_1) TEMP container_2_1 (service/app_2) + */ + + AllocationTagsManager atm = new AllocationTagsManager(rmContext); + LocalAllocationTagsManager ephAtm = + new LocalAllocationTagsManager(atm); + + // 3 Containers from app1 + ephAtm.addTempTags(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), + ImmutableSet.of("mapper", "reducer")); + + atm.addContainer(NodeId.fromString("host1:123"), + TestUtils.getMockContainerId(1, 2), ImmutableSet.of("service")); + + atm.addContainer(NodeId.fromString("host2:123"), + TestUtils.getMockContainerId(1, 3), ImmutableSet.of("reducer")); + + // 1 Container from app2 + ephAtm.addTempTags(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(2), ImmutableSet.of("service")); + + // Expect tag mappings to be present including temp Tags + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + Long::sum)); + + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("service"), + Long::sum)); + + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), + Long::sum)); + + // Do a temp Tag cleanup on app2 + ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(2)); + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), + Long::sum)); + // Expect app1 to be unaffected + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + Long::sum)); + // Do a cleanup on app1 as well + ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(1)); + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"), + Long::sum)); + + // Non temp-tags should be unaffected + Assert.assertEquals(1, + atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"), + TestUtils.getMockApplicationId(1), ImmutableSet.of("service"), + Long::sum)); + + Assert.assertEquals(0, + atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), + TestUtils.getMockApplicationId(2), ImmutableSet.of("service"), + Long::sum)); + + // Expect app2 with no containers, and app1 with 2 containers across 2 nodes + Assert.assertEquals(2, + atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(1)) + .getTypeToTagsWithCount().size()); + + Assert.assertNull( + atm.getPerAppNodeMappings().get(TestUtils.getMockApplicationId(2))); + } + +}