diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 4d0c2301df..06a1d00d95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; @@ -109,6 +110,7 @@ public class RMActiveServiceContext { private RMAppLifetimeMonitor rmAppLifetimeMonitor; private QueueLimitCalculator queueLimitCalculator; private AllocationTagsManager allocationTagsManager; + private PlacementConstraintManager placementConstraintManager; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -411,6 +413,19 @@ public class RMActiveServiceContext { this.allocationTagsManager = allocationTagsManager; } + @Private + @Unstable + public PlacementConstraintManager getPlacementConstraintManager() { + return placementConstraintManager; + } + + @Private + @Unstable + public void setPlacementConstraintManager( + PlacementConstraintManager placementConstraintManager) { + this.placementConstraintManager = placementConstraintManager; + } + @Private @Unstable public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 00da10895d..eb91a311a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; @@ -171,4 +172,9 @@ public interface RMContext extends ApplicationMasterServiceContext { AllocationTagsManager getAllocationTagsManager(); void setAllocationTagsManager(AllocationTagsManager allocationTagsManager); + + PlacementConstraintManager getPlacementConstraintManager(); + + void setPlacementConstraintManager( + PlacementConstraintManager placementConstraintManager); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index da50ef859b..0b6be722ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; @@ -515,6 +516,18 @@ public class RMContextImpl implements RMContext { activeServiceContext.setAllocationTagsManager(allocationTagsManager); } + @Override + public PlacementConstraintManager getPlacementConstraintManager() { + return activeServiceContext.getPlacementConstraintManager(); + } + + @Override + public void setPlacementConstraintManager( + PlacementConstraintManager placementConstraintManager) { + activeServiceContext + .setPlacementConstraintManager(placementConstraintManager); + } + @Override public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() { return activeServiceContext.getRMDelegatedNodeLabelsUpdater(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 1d838f0453..5140c9fa55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -97,6 +97,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -498,6 +500,12 @@ public class ResourceManager extends CompositeService implements Recoverable { protected AllocationTagsManager createAllocationTagsManager() { return new AllocationTagsManager(this.rmContext); } + + protected PlacementConstraintManagerService + createPlacementConstraintManager() { + // Use the in memory Placement Constraint Manager. + return new MemoryPlacementConstraintManager(); + } protected DelegationTokenRenewer createDelegationTokenRenewer() { return new DelegationTokenRenewer(); @@ -628,6 +636,11 @@ public class ResourceManager extends CompositeService implements Recoverable { createAllocationTagsManager(); rmContext.setAllocationTagsManager(allocationTagsManager); + PlacementConstraintManagerService placementConstraintManager = + createPlacementConstraintManager(); + addService(placementConstraintManager); + rmContext.setPlacementConstraintManager(placementConstraintManager); + RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = createRMDelegatedNodeLabelsUpdater(); if (delegatedNodeLabelsUpdater != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java new file mode 100644 index 0000000000..ceff6f6881 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java @@ -0,0 +1,282 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * In memory implementation of the {@link PlacementConstraintManagerService}. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class MemoryPlacementConstraintManager + extends PlacementConstraintManagerService { + + private static final Logger LOG = + LoggerFactory.getLogger(MemoryPlacementConstraintManager.class); + + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + + /** + * Stores the global constraints that will be manipulated by the cluster + * admin. The key of each entry is the tag that will enable the corresponding + * constraint. + */ + private Map globalConstraints; + /** + * Stores the constraints for each application, along with the allocation tags + * that will enable each of the constraints for a given application. + */ + private Map> appConstraints; + + public MemoryPlacementConstraintManager() { + this.globalConstraints = new HashMap<>(); + this.appConstraints = new HashMap<>(); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + public void registerApplication(ApplicationId appId, + Map, PlacementConstraint> constraintMap) { + // Check if app already exists. If not, prepare its constraint map. + Map constraintsForApp = new HashMap<>(); + try { + readLock.lock(); + if (appConstraints.get(appId) != null) { + LOG.warn("Application {} has already been registered.", appId); + return; + } + // Go over each sourceTag-constraint pair, validate it, and add it to the + // constraint map for this app. + for (Map.Entry, PlacementConstraint> entry : constraintMap + .entrySet()) { + Set sourceTags = entry.getKey(); + PlacementConstraint constraint = entry.getValue(); + if (validateConstraint(sourceTags, constraint)) { + String sourceTag = getValidSourceTag(sourceTags); + constraintsForApp.put(sourceTag, constraint); + } + } + } finally { + readLock.unlock(); + } + + if (constraintsForApp.isEmpty()) { + LOG.info("Application {} was registered, but no constraints were added.", + appId); + } + // Update appConstraints. + try { + writeLock.lock(); + appConstraints.put(appId, constraintsForApp); + } finally { + writeLock.unlock(); + } + } + + @Override + public void addConstraint(ApplicationId appId, Set sourceTags, + PlacementConstraint placementConstraint, boolean replace) { + try { + writeLock.lock(); + Map constraintsForApp = + appConstraints.get(appId); + if (constraintsForApp == null) { + LOG.info("Cannot add constraint to application {}, as it has not " + + "been registered yet.", appId); + return; + } + + addConstraintToMap(constraintsForApp, sourceTags, placementConstraint, + replace); + } finally { + writeLock.unlock(); + } + } + + @Override + public void addGlobalConstraint(Set sourceTags, + PlacementConstraint placementConstraint, boolean replace) { + try { + writeLock.lock(); + addConstraintToMap(globalConstraints, sourceTags, placementConstraint, + replace); + } finally { + writeLock.unlock(); + } + } + + /** + * Helper method that adds a constraint to a map for a given source tag. + * Assumes there is already a lock on the constraint map. + * + * @param constraintMap constraint map to which the constraint will be added + * @param sourceTags the source tags that will enable this constraint + * @param placementConstraint the new constraint to be added + * @param replace if true, an existing constraint for these sourceTags will be + * replaced with the new one + */ + private void addConstraintToMap( + Map constraintMap, Set sourceTags, + PlacementConstraint placementConstraint, boolean replace) { + if (validateConstraint(sourceTags, placementConstraint)) { + String sourceTag = getValidSourceTag(sourceTags); + if (constraintMap.get(sourceTag) == null || replace) { + if (replace) { + LOG.info("Replacing the constraint associated with tag {} with {}.", + sourceTag, placementConstraint); + } + constraintMap.put(sourceTag, placementConstraint); + } else { + LOG.info("Constraint {} will not be added. There is already a " + + "constraint associated with tag {}.", + placementConstraint, sourceTag); + } + } + } + + @Override + public Map, PlacementConstraint> getConstraints( + ApplicationId appId) { + try { + readLock.lock(); + if (appConstraints.get(appId) == null) { + LOG.info("Application {} is not registered in the Placement " + + "Constraint Manager.", appId); + return null; + } + + // Copy to a new map and return an unmodifiable version of it. + // Each key of the map is a set with a single source tag. + Map, PlacementConstraint> constraintMap = + appConstraints.get(appId).entrySet().stream() + .collect(Collectors.toMap( + e -> Stream.of(e.getKey()).collect(Collectors.toSet()), + e -> e.getValue())); + + return Collections.unmodifiableMap(constraintMap); + } finally { + readLock.unlock(); + } + } + + @Override + public PlacementConstraint getConstraint(ApplicationId appId, + Set sourceTags) { + if (!validateSourceTags(sourceTags)) { + return null; + } + String sourceTag = getValidSourceTag(sourceTags); + try { + readLock.lock(); + if (appConstraints.get(appId) == null) { + LOG.info("Application {} is not registered in the Placement " + + "Constraint Manager.", appId); + return null; + } + // TODO: Merge this constraint with the global one for this tag, if one + // exists. + return appConstraints.get(appId).get(sourceTag); + } finally { + readLock.unlock(); + } + } + + @Override + public PlacementConstraint getGlobalConstraint(Set sourceTags) { + if (!validateSourceTags(sourceTags)) { + return null; + } + String sourceTag = getValidSourceTag(sourceTags); + try { + readLock.lock(); + return globalConstraints.get(sourceTag); + } finally { + readLock.unlock(); + } + } + + @Override + public void unregisterApplication(ApplicationId appId) { + try { + writeLock.lock(); + appConstraints.remove(appId); + } finally { + writeLock.unlock(); + } + } + + @Override + public void removeGlobalConstraint(Set sourceTags) { + if (!validateSourceTags(sourceTags)) { + return; + } + String sourceTag = getValidSourceTag(sourceTags); + try { + writeLock.lock(); + globalConstraints.remove(sourceTag); + } finally { + writeLock.unlock(); + } + } + + @Override + public int getNumRegisteredApplications() { + try { + readLock.lock(); + return appConstraints.size(); + } finally { + readLock.unlock(); + } + } + + @Override + public int getNumGlobalConstraints() { + try { + readLock.lock(); + return globalConstraints.size(); + } finally { + readLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java new file mode 100644 index 0000000000..7725d0d1a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java @@ -0,0 +1,151 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; + +/** + * Interface for storing and retrieving placement constraints (see + * {@link PlacementConstraint}). + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface PlacementConstraintManager { + + /** + * Register all placement constraints of an application. + * + * @param appId the application ID + * @param constraintMap the map of allocation tags to constraints for this + * application + */ + void registerApplication(ApplicationId appId, + Map, PlacementConstraint> constraintMap); + + /** + * Add a placement constraint for a given application and a given set of + * (source) allocation tags. The constraint will be used on Scheduling + * Requests that carry this set of allocation tags. + * TODO: Support merge and not only replace when adding a constraint. + * + * @param appId the application ID + * @param sourceTags the set of allocation tags that will enable this + * constraint + * @param placementConstraint the constraint + * @param replace if true, an existing constraint for these tags will be + * replaced by the given one + */ + void addConstraint(ApplicationId appId, Set sourceTags, + PlacementConstraint placementConstraint, boolean replace); + + /** + * Add a placement constraint that will be used globally. These constraints + * are added by the cluster administrator. + * TODO: Support merge and not only replace when adding a constraint. + * + * @param sourceTags the allocation tags that will enable this constraint + * @param placementConstraint the constraint + * @param replace if true, an existing constraint for these tags will be + * replaced by the given one + */ + void addGlobalConstraint(Set sourceTags, + PlacementConstraint placementConstraint, boolean replace); + + /** + * Retrieve all constraints for a given application, along with the allocation + * tags that enable each constraint. + * + * @param appId the application ID + * @return the constraints for this application with the associated tags + */ + Map, PlacementConstraint> getConstraints(ApplicationId appId); + + /** + * Retrieve the placement constraint that is associated with a set of + * allocation tags for a given application. + * + * @param appId the application ID + * @param sourceTags the allocation tags that enable this constraint + * @return the constraint + */ + PlacementConstraint getConstraint(ApplicationId appId, + Set sourceTags); + + /** + * Retrieve a global constraint that is associated with a given set of + * allocation tags. + * + * @param sourceTags the allocation tags that enable this constraint + * @return the constraint + */ + PlacementConstraint getGlobalConstraint(Set sourceTags); + + /** + * Remove the constraints that correspond to a given application. + * + * @param appId the application that will be removed. + */ + void unregisterApplication(ApplicationId appId); + + /** + * Remove a global constraint that is associated with the given allocation + * tags. + * + * @param sourceTags the allocation tags + */ + void removeGlobalConstraint(Set sourceTags); + + /** + * Returns the number of currently registered applications in the Placement + * Constraint Manager. + * + * @return number of registered applications. + */ + int getNumRegisteredApplications(); + + /** + * Returns the number of global constraints registered in the Placement + * Constraint Manager. + * + * @return number of global constraints. + */ + int getNumGlobalConstraints(); + + /** + * Validate a placement constraint and the set of allocation tags that will + * enable it. + * + * @param sourceTags the associated allocation tags + * @param placementConstraint the constraint + * @return true if constraint and tags are valid + */ + default boolean validateConstraint(Set sourceTags, + PlacementConstraint placementConstraint) { + return true; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManagerService.java new file mode 100644 index 0000000000..967f251442 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManagerService.java @@ -0,0 +1,93 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; + +/** + * The service that implements the {@link PlacementConstraintManager} interface. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public abstract class PlacementConstraintManagerService extends AbstractService + implements PlacementConstraintManager { + + protected static final Log LOG = + LogFactory.getLog(PlacementConstraintManagerService.class); + + private PlacementConstraintManager placementConstraintManager = null; + + public PlacementConstraintManagerService() { + super(PlacementConstraintManagerService.class.getName()); + } + + @Override + public boolean validateConstraint(Set sourceTags, + PlacementConstraint placementConstraint) { + if (!validateSourceTags(sourceTags)) { + return false; + } + // TODO: Perform actual validation of the constraint (in YARN-6621). + // TODO: Perform satisfiability check for constraint. + return true; + } + + /** + * Validates whether the allocation tags that will enable a constraint have + * the expected format. At the moment we support a single allocation tag per + * constraint. + * + * @param sourceTags the source allocation tags + * @return true if the tags have the expected format + */ + protected boolean validateSourceTags(Set sourceTags) { + if (sourceTags.isEmpty()) { + LOG.warn("A placement constraint cannot be associated with an empty " + + "set of tags."); + return false; + } + if (sourceTags.size() > 1) { + LOG.warn("Only a single tag can be associated with a placement " + + "constraint currently."); + return false; + } + return true; + } + + /** + * This method will return a single allocation tag. It should be called after + * validating the tags by calling {@link #validateSourceTags}. + * + * @param sourceTags the source allocation tags + * @return the single source tag + */ + protected String getValidSourceTag(Set sourceTags) { + return sourceTags.iterator().next(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/package-info.java new file mode 100644 index 0000000000..cbb7a55564 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement + * contains classes related to scheduling containers using placement + * constraints. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java new file mode 100644 index 0000000000..abcab1a3a0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; + +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.nodeAttribute; + +import java.util.AbstractMap.SimpleEntry; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for {@link PlacementConstraintManagerService}. + */ +public class TestPlacementConstraintManagerService { + + private PlacementConstraintManagerService pcm; + + protected PlacementConstraintManagerService createPCM() { + return new MemoryPlacementConstraintManager(); + } + + private ApplicationId appId1, appId2; + private PlacementConstraint c1, c2, c3, c4; + private Set sourceTag1, sourceTag2, sourceTag3, sourceTag4; + private Map, PlacementConstraint> constraintMap1, constraintMap2; + + @Before + public void before() { + this.pcm = createPCM(); + + // Build appIDs, constraints, source tags, and constraint map. + long ts = System.currentTimeMillis(); + appId1 = BuilderUtils.newApplicationId(ts, 123); + appId2 = BuilderUtils.newApplicationId(ts, 234); + + c1 = PlacementConstraints.build(targetIn(NODE, allocationTag("hbase-m"))); + c2 = PlacementConstraints.build(targetIn(RACK, allocationTag("hbase-rs"))); + c3 = PlacementConstraints + .build(targetNotIn(NODE, nodeAttribute("java", "1.8"))); + c4 = PlacementConstraints + .build(targetCardinality(RACK, 2, 10, allocationTag("zk"))); + + sourceTag1 = new HashSet<>(Arrays.asList("spark")); + sourceTag2 = new HashSet<>(Arrays.asList("zk")); + sourceTag3 = new HashSet<>(Arrays.asList("storm")); + sourceTag4 = new HashSet<>(Arrays.asList("hbase-m", "hbase-sec")); + + constraintMap1 = Stream + .of(new SimpleEntry<>(sourceTag1, c1), + new SimpleEntry<>(sourceTag2, c2)) + .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); + + constraintMap2 = Stream.of(new SimpleEntry<>(sourceTag3, c4)) + .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); + } + + @Test + public void testRegisterUnregisterApps() { + Assert.assertEquals(0, pcm.getNumRegisteredApplications()); + + // Register two applications. + pcm.registerApplication(appId1, constraintMap1); + Assert.assertEquals(1, pcm.getNumRegisteredApplications()); + Map, PlacementConstraint> constrMap = + pcm.getConstraints(appId1); + Assert.assertNotNull(constrMap); + Assert.assertEquals(2, constrMap.size()); + Assert.assertNotNull(constrMap.get(sourceTag1)); + Assert.assertNotNull(constrMap.get(sourceTag2)); + + pcm.registerApplication(appId2, constraintMap2); + Assert.assertEquals(2, pcm.getNumRegisteredApplications()); + constrMap = pcm.getConstraints(appId2); + Assert.assertNotNull(constrMap); + Assert.assertEquals(1, constrMap.size()); + Assert.assertNotNull(constrMap.get(sourceTag3)); + Assert.assertNull(constrMap.get(sourceTag2)); + + // Try to register the same app again. + pcm.registerApplication(appId2, constraintMap1); + Assert.assertEquals(2, pcm.getNumRegisteredApplications()); + + // Unregister appId1. + pcm.unregisterApplication(appId1); + Assert.assertEquals(1, pcm.getNumRegisteredApplications()); + Assert.assertNull(pcm.getConstraints(appId1)); + Assert.assertNotNull(pcm.getConstraints(appId2)); + } + + @Test + public void testAddConstraint() { + // Cannot add constraint to unregistered app. + Assert.assertEquals(0, pcm.getNumRegisteredApplications()); + pcm.addConstraint(appId1, sourceTag1, c1, false); + Assert.assertEquals(0, pcm.getNumRegisteredApplications()); + + // Register application. + pcm.registerApplication(appId1, new HashMap<>()); + Assert.assertEquals(1, pcm.getNumRegisteredApplications()); + Assert.assertEquals(0, pcm.getConstraints(appId1).size()); + + // Add two constraints. + pcm.addConstraint(appId1, sourceTag1, c1, false); + pcm.addConstraint(appId1, sourceTag2, c3, false); + Assert.assertEquals(2, pcm.getConstraints(appId1).size()); + + // Constraint for sourceTag1 should not be replaced. + pcm.addConstraint(appId1, sourceTag1, c2, false); + Assert.assertEquals(2, pcm.getConstraints(appId1).size()); + Assert.assertEquals(c1, pcm.getConstraint(appId1, sourceTag1)); + Assert.assertNotEquals(c2, pcm.getConstraint(appId1, sourceTag1)); + + // Now c2 should replace c1 for sourceTag1. + pcm.addConstraint(appId1, sourceTag1, c2, true); + Assert.assertEquals(2, pcm.getConstraints(appId1).size()); + Assert.assertEquals(c2, pcm.getConstraint(appId1, sourceTag1)); + } + + @Test + public void testGlobalConstraints() { + Assert.assertEquals(0, pcm.getNumGlobalConstraints()); + pcm.addGlobalConstraint(sourceTag1, c1, false); + Assert.assertEquals(1, pcm.getNumGlobalConstraints()); + Assert.assertNotNull(pcm.getGlobalConstraint(sourceTag1)); + + // Constraint for sourceTag1 should not be replaced. + pcm.addGlobalConstraint(sourceTag1, c2, false); + Assert.assertEquals(1, pcm.getNumGlobalConstraints()); + Assert.assertEquals(c1, pcm.getGlobalConstraint(sourceTag1)); + Assert.assertNotEquals(c2, pcm.getGlobalConstraint(sourceTag1)); + + // Now c2 should replace c1 for sourceTag1. + pcm.addGlobalConstraint(sourceTag1, c2, true); + Assert.assertEquals(1, pcm.getNumGlobalConstraints()); + Assert.assertEquals(c2, pcm.getGlobalConstraint(sourceTag1)); + + pcm.removeGlobalConstraint(sourceTag1); + Assert.assertEquals(0, pcm.getNumGlobalConstraints()); + } + + @Test + public void testValidateConstraint() { + // At the moment we only disallow multiple source tags to be associated with + // a constraint. TODO: More tests to be added for YARN-6621. + Assert.assertTrue(pcm.validateConstraint(sourceTag1, c1)); + Assert.assertFalse(pcm.validateConstraint(sourceTag4, c1)); + } +}