YARN-8191. Fair scheduler: queue deletion without RM restart. (Gergo Repas via Haibo Chen)

This commit is contained in:
Haibo Chen 2018-05-24 17:07:21 -07:00
parent d9852eb589
commit 86bc6425d4
7 changed files with 596 additions and 81 deletions

View File

@ -87,7 +87,7 @@ public class AllocationFileLoaderService extends AbstractService {
private Path allocFile;
private FileSystem fs;
private Listener reloadListener;
private final Listener reloadListener;
@VisibleForTesting
long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
@ -95,15 +95,16 @@ public class AllocationFileLoaderService extends AbstractService {
private Thread reloadThread;
private volatile boolean running = true;
public AllocationFileLoaderService() {
this(SystemClock.getInstance());
public AllocationFileLoaderService(Listener reloadListener) {
this(reloadListener, SystemClock.getInstance());
}
private List<Permission> defaultPermissions;
public AllocationFileLoaderService(Clock clock) {
public AllocationFileLoaderService(Listener reloadListener, Clock clock) {
super(AllocationFileLoaderService.class.getName());
this.clock = clock;
this.reloadListener = reloadListener;
}
@Override
@ -114,6 +115,7 @@ public void serviceInit(Configuration conf) throws Exception {
reloadThread = new Thread(() -> {
while (running) {
try {
reloadListener.onCheck();
long time = clock.getTime();
long lastModified =
fs.getFileStatus(allocFile).getModificationTime();
@ -207,10 +209,6 @@ public Path getAllocationFile(Configuration conf)
return allocPath;
}
public synchronized void setReloadListener(Listener reloadListener) {
this.reloadListener = reloadListener;
}
/**
* Updates the allocation list from the allocation config file. This file is
* expected to be in the XML format specified in the design doc.
@ -351,5 +349,7 @@ protected List<Permission> getDefaultPermissions() {
public interface Listener {
void onReload(AllocationConfiguration info) throws IOException;
void onCheck();
}
}

View File

@ -21,7 +21,9 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -34,6 +36,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
@ -56,6 +59,8 @@ public class FSLeafQueue extends FSQueue {
// apps that are runnable
private final List<FSAppAttempt> runnableApps = new ArrayList<>();
private final List<FSAppAttempt> nonRunnableApps = new ArrayList<>();
// assignedApps keeps track of applications that have no appAttempts
private final Set<ApplicationId> assignedApps = new HashSet<>();
// get a lock with fair distribution for app list updates
private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
private final Lock readLock = rwl.readLock();
@ -89,6 +94,9 @@ void addApp(FSAppAttempt app, boolean runnable) {
} else {
nonRunnableApps.add(app);
}
// when an appAttempt is created for an application, we'd like to move
// it over from assignedApps to either runnableApps or nonRunnableApps
assignedApps.remove(app.getApplicationId());
incUsedResource(app.getResourceUsage());
} finally {
writeLock.unlock();
@ -440,6 +448,15 @@ public int getNumPendingApps() {
return numPendingApps;
}
public int getNumAssignedApps() {
readLock.lock();
try {
return assignedApps.size();
} finally {
readLock.unlock();
}
}
/**
* TODO: Based on how frequently this is called, we might want to club
* counting pending and active apps in the same method.
@ -609,4 +626,18 @@ protected void dumpStateInternal(StringBuilder sb) {
", LastTimeAtMinShare: " + lastTimeAtMinShare +
"}");
}
/**
* This method is called when an application is assigned to this queue
* for book-keeping purposes (to be able to determine if the queue is empty).
* @param applicationId the application's id
*/
public void addAssignedApp(ApplicationId applicationId) {
writeLock.lock();
try {
assignedApps.add(applicationId);
} finally {
writeLock.unlock();
}
}
}

View File

@ -83,6 +83,7 @@ public abstract class FSQueue implements Queue, Schedulable {
private long minSharePreemptionTimeout = Long.MAX_VALUE;
private float fairSharePreemptionThreshold = 0.5f;
private boolean preemptable = true;
private boolean isDynamic = true;
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
@ -585,4 +586,12 @@ public String dumpState() {
* @param sb the {code StringBuilder} which holds queue states
*/
protected abstract void dumpStateInternal(StringBuilder sb);
public boolean isDynamic() {
return isDynamic;
}
public void setDynamic(boolean dynamic) {
this.isDynamic = dynamic;
}
}

View File

@ -99,6 +99,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
@ -207,7 +208,8 @@ public class FairScheduler extends
public FairScheduler() {
super(FairScheduler.class.getName());
context = new FSContext(this);
allocsLoader = new AllocationFileLoaderService();
allocsLoader =
new AllocationFileLoaderService(new AllocationReloadListener());
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
}
@ -516,6 +518,7 @@ protected void addApplication(ApplicationId applicationId,
new SchedulerApplication<FSAppAttempt>(queue, user);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
queue.addAssignedApp(applicationId);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queue.getName()
@ -1435,7 +1438,6 @@ private void initScheduler(Configuration conf) throws IOException {
}
allocsLoader.init(conf);
allocsLoader.setReloadListener(new AllocationReloadListener());
// If we fail to load allocations file on initialize, we want to fail
// immediately. After a successful load, exceptions on future reloads
// will just result in leaving things as they are.
@ -1589,6 +1591,7 @@ public void onReload(AllocationConfiguration queueInfo)
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
Set<String> removedStaticQueues = getRemovedStaticQueues(queueInfo);
writeLock.lock();
try {
if (queueInfo == null) {
@ -1599,6 +1602,7 @@ public void onReload(AllocationConfiguration queueInfo)
setQueueAcls(allocConf.getQueueAcls());
allocConf.getDefaultSchedulingPolicy().initialize(getContext());
queueMgr.updateAllocationConfiguration(allocConf);
queueMgr.setQueuesToDynamic(removedStaticQueues);
applyChildDefaults();
maxRunningEnforcer.updateRunnabilityOnReload();
}
@ -1606,6 +1610,27 @@ public void onReload(AllocationConfiguration queueInfo)
writeLock.unlock();
}
}
private Set<String> getRemovedStaticQueues(
AllocationConfiguration queueInfo) {
if (queueInfo == null || allocConf == null) {
return Collections.emptySet();
}
Set<String> removedStaticQueues = new HashSet<>();
for (Set<String> queues : allocConf.getConfiguredQueues().values()) {
removedStaticQueues.addAll(queues);
}
for (Set<String> queues : queueInfo.getConfiguredQueues().values()) {
removedStaticQueues.removeAll(queues);
}
return removedStaticQueues;
}
@Override
public void onCheck() {
queueMgr.removeEmptyDynamicQueues();
queueMgr.removePendingIncompatibleQueues();
}
}
private void setQueueAcls(

View File

@ -22,13 +22,17 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.xml.parsers.ParserConfigurationException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -52,6 +56,36 @@ public class QueueManager {
public static final Log LOG = LogFactory.getLog(
QueueManager.class.getName());
private final class IncompatibleQueueRemovalTask {
private final String queueToCreate;
private final FSQueueType queueType;
private IncompatibleQueueRemovalTask(String queueToCreate,
FSQueueType queueType) {
this.queueToCreate = queueToCreate;
this.queueType = queueType;
}
private void execute() {
Boolean removed =
removeEmptyIncompatibleQueues(queueToCreate, queueType).orElse(null);
if (Boolean.TRUE.equals(removed)) {
FSQueue queue = getQueue(queueToCreate, true, queueType, false);
if (queue != null &&
// if queueToCreate is present in the allocation config, set it
// to static
scheduler.allocConf.configuredQueues.values().stream()
.anyMatch(s -> s.contains(queueToCreate))) {
queue.setDynamic(false);
}
}
if (!Boolean.FALSE.equals(removed)) {
incompatibleQueuesPendingRemoval.remove(this);
}
}
}
public static final String ROOT_QUEUE = "root";
private final FairScheduler scheduler;
@ -59,6 +93,8 @@ public class QueueManager {
private final Collection<FSLeafQueue> leafQueues =
new CopyOnWriteArrayList<FSLeafQueue>();
private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private Set<IncompatibleQueueRemovalTask> incompatibleQueuesPendingRemoval =
new HashSet<>();
private FSParentQueue rootQueue;
public QueueManager(FairScheduler scheduler) {
@ -75,10 +111,13 @@ public void initialize(Configuration conf) throws IOException,
// SchedulingPolicy.DEFAULT_POLICY since the allocation file hasn't been
// loaded yet.
rootQueue = new FSParentQueue("root", scheduler, null);
rootQueue.setDynamic(false);
queues.put(rootQueue.getName(), rootQueue);
// Create the default queue
getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
FSLeafQueue defaultQueue =
getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
defaultQueue.setDynamic(false);
// Recursively reinitialize to propagate queue properties
rootQueue.reinit(true);
}
@ -121,7 +160,8 @@ public FSLeafQueue getLeafQueue(
*/
public boolean removeLeafQueue(String name) {
name = ensureRootPrefix(name);
return removeEmptyIncompatibleQueues(name, FSQueueType.PARENT);
return !Boolean.FALSE.equals(
removeEmptyIncompatibleQueues(name, FSQueueType.PARENT).orElse(null));
}
@ -346,9 +386,13 @@ void setChildResourceLimits(FSParentQueue parent, FSQueue child,
*
* We will never remove the root queue or the default queue in this way.
*
* @return true if we can create queueToCreate or it already exists.
* @return Optional.of(Boolean.TRUE) if there was an incompatible queue that
* has been removed,
* Optional.of(Boolean.FALSE) if there was an incompatible queue that
* have not be removed,
* Optional.empty() if there is no incompatible queue.
*/
private boolean removeEmptyIncompatibleQueues(String queueToCreate,
private Optional<Boolean> removeEmptyIncompatibleQueues(String queueToCreate,
FSQueueType queueType) {
queueToCreate = ensureRootPrefix(queueToCreate);
@ -357,7 +401,7 @@ private boolean removeEmptyIncompatibleQueues(String queueToCreate,
if (queueToCreate.equals(ROOT_QUEUE) ||
queueToCreate.startsWith(
ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
return false;
return Optional.empty();
}
FSQueue queue = queues.get(queueToCreate);
@ -365,19 +409,18 @@ private boolean removeEmptyIncompatibleQueues(String queueToCreate,
if (queue != null) {
if (queue instanceof FSLeafQueue) {
if (queueType == FSQueueType.LEAF) {
// if queue is already a leaf then return true
return true;
return Optional.empty();
}
// remove incompatibility since queue is a leaf currently
// needs to change to a parent.
return removeQueueIfEmpty(queue);
return Optional.of(removeQueueIfEmpty(queue));
} else {
if (queueType == FSQueueType.PARENT) {
return true;
return Optional.empty();
}
// If it's an existing parent queue and needs to change to leaf,
// remove it if it's empty.
return removeQueueIfEmpty(queue);
return Optional.of(removeQueueIfEmpty(queue));
}
}
@ -389,11 +432,51 @@ private boolean removeEmptyIncompatibleQueues(String queueToCreate,
String prefixString = queueToCreate.substring(0, sepIndex);
FSQueue prefixQueue = queues.get(prefixString);
if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
return removeQueueIfEmpty(prefixQueue);
return Optional.of(removeQueueIfEmpty(prefixQueue));
}
sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
}
return true;
return Optional.empty();
}
/**
* Removes all empty dynamic queues (including empty dynamic parent queues).
*/
public void removeEmptyDynamicQueues() {
synchronized (queues) {
Set<FSParentQueue> parentQueuesToCheck = new HashSet<>();
for (FSQueue queue : getQueues()) {
if (queue.isDynamic() && queue.getChildQueues().isEmpty()) {
boolean removed = removeQueueIfEmpty(queue);
if (removed && queue.getParent().isDynamic()) {
parentQueuesToCheck.add(queue.getParent());
}
}
}
while (!parentQueuesToCheck.isEmpty()) {
FSParentQueue queue = parentQueuesToCheck.iterator().next();
if (queue.getChildQueues().isEmpty()) {
removeQueue(queue);
if (queue.getParent().isDynamic()) {
parentQueuesToCheck.add(queue.getParent());
}
}
parentQueuesToCheck.remove(queue);
}
}
}
/**
* Re-checking incompatible queues that could not be removed earlier due to
* not being empty, and removing those that became empty.
*/
public void removePendingIncompatibleQueues() {
synchronized (queues) {
for (IncompatibleQueueRemovalTask removalTask :
ImmutableSet.copyOf(incompatibleQueuesPendingRemoval)) {
removalTask.execute();
}
}
}
/**
@ -435,7 +518,8 @@ protected boolean isEmpty(FSQueue queue) {
if (queue instanceof FSLeafQueue) {
FSLeafQueue leafQueue = (FSLeafQueue)queue;
return queue.getNumRunnableApps() == 0 &&
leafQueue.getNumNonRunnableApps() == 0;
leafQueue.getNumNonRunnableApps() == 0 &&
leafQueue.getNumAssignedApps() == 0;
} else {
for (FSQueue child : queue.getChildQueues()) {
if (!isEmpty(child)) {
@ -501,21 +585,13 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
LOG.error("Setting scheduling policies for existing queues failed!");
}
for (String name : queueConf.getConfiguredQueues().get(
FSQueueType.LEAF)) {
if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
getLeafQueue(name, true, false);
}
}
ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf, FSQueueType.LEAF);
// At this point all leaves and 'parents with
// at least one child' would have been created.
// Now create parents with no configured leaf.
for (String name : queueConf.getConfiguredQueues().get(
FSQueueType.PARENT)) {
if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
getParentQueue(name, true, false);
}
}
ensureQueueExistsAndIsCompatibleAndIsStatic(queueConf,
FSQueueType.PARENT);
}
// Initialize all queues recursively
@ -524,6 +600,35 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
rootQueue.recomputeSteadyShares();
}
private void ensureQueueExistsAndIsCompatibleAndIsStatic(
AllocationConfiguration queueConf, FSQueueType queueType) {
for (String name : queueConf.getConfiguredQueues().get(queueType)) {
Boolean removed =
removeEmptyIncompatibleQueues(name, queueType).orElse(null);
if (Boolean.FALSE.equals(removed)) {
incompatibleQueuesPendingRemoval.add(
new IncompatibleQueueRemovalTask(name, queueType));
} else {
FSQueue queue = getQueue(name, true, queueType, false);
if (queue != null) {
queue.setDynamic(false);
}
}
}
}
/**
* Setting a set of queues to dynamic.
* @param queueNames The names of the queues to be set to dynamic
*/
protected void setQueuesToDynamic(Set<String> queueNames) {
synchronized (queues) {
for (String queueName : queueNames) {
queues.get(queueName).setDynamic(true);
}
}
}
/**
* Check whether queue name is valid,
* return true if it is valid, otherwise return false.

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService.Listener;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
@ -32,6 +33,8 @@
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
@ -79,7 +82,8 @@ public void testGetAllocationFileFromFileSystem()
fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(Mockito.mock(Listener.class));
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(fsAllocPath, allocationFile.toString());
assertTrue(fs.exists(allocationFile));
@ -92,7 +96,8 @@ public void testDenyGetAllocationFileFromUnsupportedFileSystem()
throws UnsupportedFileSystemException {
Configuration conf = new YarnConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(Mockito.mock(Listener.class));
allocLoader.getAllocationFile(conf);
}
@ -105,7 +110,7 @@ public void testGetAllocationFileFromClasspath() {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
TEST_FAIRSCHED_XML);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService();
new AllocationFileLoaderService(Mockito.mock(Listener.class));
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
assertTrue(fs.exists(allocationFile));
@ -134,12 +139,11 @@ public void testReload() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(
clock);
ReloadListener confHolder = new ReloadListener();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder, clock);
allocLoader.reloadIntervalMs = 5;
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration allocConf = confHolder.allocConf;
@ -205,7 +209,9 @@ public void testReload() throws Exception {
public void testAllocationFileParsing() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
ReloadListener confHolder = new ReloadListener();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
AllocationFileWriter
.create()
@ -278,8 +284,6 @@ public void testAllocationFileParsing() throws Exception {
.writeToFile(ALLOC_FILE);
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf;
@ -427,7 +431,9 @@ public void testAllocationFileParsing() throws Exception {
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
ReloadListener confHolder = new ReloadListener();
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
@ -473,8 +479,6 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
out.close();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf;
@ -550,10 +554,10 @@ public void testSimplePlacementPolicyFromConf() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
allocLoader.init(conf);
allocLoader.reloadAllocations();
AllocationConfiguration allocConf = confHolder.allocConf;
@ -584,10 +588,10 @@ public void testQueueAlongsideRoot() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@ -608,10 +612,10 @@ public void testQueueNameContainingPeriods() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@ -632,10 +636,10 @@ public void testQueueNameContainingOnlyWhitespace() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@ -654,10 +658,10 @@ public void testParentTagWithReservation() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
allocLoader.init(conf);
try {
allocLoader.reloadAllocations();
} catch (AllocationConfigurationException ex) {
@ -685,10 +689,10 @@ public void testParentWithReservation() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
allocLoader.init(conf);
try {
allocLoader.reloadAllocations();
} catch (AllocationConfigurationException ex) {
@ -714,10 +718,10 @@ public void testParentTagWithChild() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
allocLoader.init(conf);
allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf;
// Check whether queue 'parent' and 'child' are loaded successfully
@ -745,10 +749,10 @@ public void testQueueNameContainingNBWhitespace() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@ -767,10 +771,10 @@ public void testDefaultQueueSchedulingModeIsFIFO() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@ -793,10 +797,10 @@ public void testReservableQueue() throws Exception {
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
allocLoader.init(conf);
allocLoader.reloadAllocations();
AllocationConfiguration allocConf = confHolder.allocConf;
@ -853,10 +857,10 @@ public void testReservableCannotBeCombinedWithDynamicUserQueue()
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener();
allocLoader.setReloadListener(confHolder);
AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(confHolder);
allocLoader.init(conf);
allocLoader.reloadAllocations();
}
@ -867,5 +871,9 @@ private class ReloadListener implements AllocationFileLoaderService.Listener {
public void onReload(AllocationConfiguration info) {
allocConf = info;
}
@Override
public void onCheck() {
}
}
}

View File

@ -20,15 +20,22 @@
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
public class TestQueueManager {
@ -305,4 +312,334 @@ public void testCreateParentQueueAndParent() {
assertEquals("createQueue() returned wrong queue",
"root.queue1.queue2", q2.getName());
}
@Test
public void testRemovalOfDynamicLeafQueue() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager.updateAllocationConfiguration(allocConf);
FSQueue q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", true);
assertNotNull("Queue root.test.childB.dynamic1 was not created", q1);
assertEquals("createQueue() returned wrong queue",
"root.test.childB.dynamic1", q1.getName());
assertTrue("root.test.childB.dynamic1 is not a dynamic queue",
q1.isDynamic());
// an application is submitted to root.test.childB.dynamic1
notEmptyQueues.add(q1);
// root.test.childB.dynamic1 is not empty and should not be removed
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", false);
assertNotNull("Queue root.test.childB.dynamic1 was deleted", q1);
// the application finishes, the next removeEmptyDynamicQueues() should
// clean root.test.childB.dynamic1 up, but keep its static parent
notEmptyQueues.remove(q1);
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", false);
assertNull("Queue root.test.childB.dynamic1 was not deleted", q1);
assertNotNull("The static parent of root.test.childB.dynamic1 was deleted",
queueManager.getParentQueue("root.test.childB", false));
}
@Test
public void testRemovalOfDynamicParentQueue() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager.updateAllocationConfiguration(allocConf);
FSQueue q1 = queueManager.getLeafQueue("root.parent1.dynamic1", true);
assertNotNull("Queue root.parent1.dynamic1 was not created", q1);
assertEquals("createQueue() returned wrong queue",
"root.parent1.dynamic1", q1.getName());
assertTrue("root.parent1.dynamic1 is not a dynamic queue", q1.isDynamic());
FSQueue p1 = queueManager.getParentQueue("root.parent1", false);
assertNotNull("Queue root.parent1 was not created", p1);
assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic());
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q1 = queueManager.getLeafQueue("root.parent1.dynamic1", false);
p1 = queueManager.getParentQueue("root.parent1", false);
assertNull("Queue root.parent1.dynamic1 was not deleted", q1);
assertNull("Queue root.parent1 was not deleted", p1);
}
@Test
public void testNonEmptyDynamicQueueBecomingStaticQueue() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager.updateAllocationConfiguration(allocConf);
FSLeafQueue q1 = queueManager.getLeafQueue("root.leaf1", true);
assertNotNull("Queue root.leaf1 was not created", q1);
assertEquals("createQueue() returned wrong queue",
"root.leaf1", q1.getName());
assertTrue("root.leaf1 is not a dynamic queue", q1.isDynamic());
// pretend that we submitted an app to the queue
notEmptyQueues.add(q1);
// non-empty queues should not be deleted
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q1 = queueManager.getLeafQueue("root.leaf1", false);
assertNotNull("Queue root.leaf1 was deleted", q1);
// next we add leaf1 under root in the allocation config
allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.leaf1");
queueManager.updateAllocationConfiguration(allocConf);
// updateAllocationConfiguration() should make root.leaf1 a dynamic queue
assertFalse("root.leaf1 is not a static queue", q1.isDynamic());
// application finished now and the queue is empty, but since leaf1 is a
// static queue at this point, hence not affected by
// removeEmptyDynamicQueues()
notEmptyQueues.clear();
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q1 = queueManager.getLeafQueue("root.leaf1", false);
assertNotNull("Queue root.leaf1 was deleted", q1);
assertFalse("root.leaf1 is not a static queue", q1.isDynamic());
}
@Test
public void testNonEmptyStaticQueueBecomingDynamicQueue() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager.updateAllocationConfiguration(allocConf);
FSLeafQueue q1 = queueManager.getLeafQueue("root.test.childA", false);
assertNotNull("Queue root.test.childA does not exist", q1);
assertEquals("createQueue() returned wrong queue",
"root.test.childA", q1.getName());
assertFalse("root.test.childA is not a static queue", q1.isDynamic());
// we submitted an app to the queue
notEmptyQueues.add(q1);
// the next removeEmptyDynamicQueues() call should not modify
// root.test.childA
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q1 = queueManager.getLeafQueue("root.test.childA", false);
assertNotNull("Queue root.test.childA was deleted", q1);
assertFalse("root.test.childA is not a dynamic queue", q1.isDynamic());
// next we remove all queues from the allocation config,
// this causes all queues to change to dynamic
for (Set<String> queueNames : allocConf.configuredQueues.values()) {
queueManager.setQueuesToDynamic(queueNames);
queueNames.clear();
}
queueManager.updateAllocationConfiguration(allocConf);
q1 = queueManager.getLeafQueue("root.test.childA", false);
assertNotNull("Queue root.test.childA was deleted", q1);
assertTrue("root.test.childA is not a dynamic queue", q1.isDynamic());
// application finished - the queue does not have runnable app
// the next removeEmptyDynamicQueues() call should remove the queues
notEmptyQueues.remove(q1);
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q1 = queueManager.getLeafQueue("root.test.childA", false);
assertNull("Queue root.test.childA was not deleted", q1);
FSParentQueue p1 = queueManager.getParentQueue("root.test", false);
assertNull("Queue root.test was not deleted", p1);
}
@Test
public void testRemovalOfChildlessParentQueue() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager.updateAllocationConfiguration(allocConf);
FSParentQueue q1 = queueManager.getParentQueue("root.test.childB", false);
assertNotNull("Queue root.test.childB was not created", q1);
assertEquals("createQueue() returned wrong queue",
"root.test.childB", q1.getName());
assertFalse("root.test.childB is a dynamic queue", q1.isDynamic());
// static queues should not be deleted
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q1 = queueManager.getParentQueue("root.test.childB", false);
assertNotNull("Queue root.test.childB was deleted", q1);
// next we remove root.test.childB from the allocation config
allocConf.configuredQueues.get(FSQueueType.PARENT)
.remove("root.test.childB");
queueManager.updateAllocationConfiguration(allocConf);
queueManager.setQueuesToDynamic(Collections.singleton("root.test.childB"));
// the next removeEmptyDynamicQueues() call should clean
// root.test.childB up
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q1 = queueManager.getParentQueue("root.leaf1", false);
assertNull("Queue root.leaf1 was not deleted", q1);
}
@Test
public void testQueueTypeChange() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager.updateAllocationConfiguration(allocConf);
FSQueue q1 = queueManager.getLeafQueue("root.parent1.leaf1", true);
assertNotNull("Queue root.parent1.leaf1 was not created", q1);
assertEquals("createQueue() returned wrong queue",
"root.parent1.leaf1", q1.getName());
assertTrue("root.parent1.leaf1 is not a dynamic queue", q1.isDynamic());
FSQueue p1 = queueManager.getParentQueue("root.parent1", false);
assertNotNull("Queue root.parent1 was not created", p1);
assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic());
// adding root.parent1.leaf1 and root.parent1 to the allocation config
allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.parent1");
allocConf.configuredQueues.get(FSQueueType.LEAF)
.add("root.parent1.leaf1");
// updateAllocationConfiguration() should change both queues over to static
queueManager.updateAllocationConfiguration(allocConf);
q1 = queueManager.getLeafQueue("root.parent1.leaf1", false);
assertFalse("root.parent1.leaf1 is not a static queue", q1.isDynamic());
p1 = queueManager.getParentQueue("root.parent1", false);
assertFalse("root.parent1 is not a static queue", p1.isDynamic());
// removing root.parent1.leaf1 and root.parent1 from the allocation
// config
allocConf.configuredQueues.get(FSQueueType.PARENT).remove("root.parent1");
allocConf.configuredQueues.get(FSQueueType.LEAF)
.remove("root.parent1.leaf1");
// updateAllocationConfiguration() should change both queues
// to dynamic
queueManager.updateAllocationConfiguration(allocConf);
queueManager.setQueuesToDynamic(
ImmutableSet.of("root.parent1", "root.parent1.leaf1"));
q1 = queueManager.getLeafQueue("root.parent1.leaf1", false);
assertTrue("root.parent1.leaf1 is not a dynamic queue", q1.isDynamic());
p1 = queueManager.getParentQueue("root.parent1", false);
assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic());
}
@Test
public void testApplicationAssignmentPreventsRemovalOfDynamicQueue()
throws Exception {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager = new QueueManager(scheduler);
queueManager.initialize(conf);
queueManager.updateAllocationConfiguration(allocConf);
FSLeafQueue q = queueManager.getLeafQueue("root.leaf1", true);
assertNotNull("root.leaf1 does not exist", q);
assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q));
// assigning an application (without an appAttempt so far) to the queue
// removeEmptyDynamicQueues() should not remove the queue
ApplicationId applicationId = ApplicationId.newInstance(1L, 0);
q.addAssignedApp(applicationId);
q = queueManager.getLeafQueue("root.leaf1", false);
assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q = queueManager.getLeafQueue("root.leaf1", false);
assertNotNull("root.leaf1 has been removed", q);
assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, 0);
ActiveUsersManager activeUsersManager =
Mockito.mock(ActiveUsersManager.class);
RMContext rmContext = Mockito.mock(RMContext.class);
// the appAttempt is created
// removeEmptyDynamicQueues() should not remove the queue
FSAppAttempt appAttempt = new FSAppAttempt(scheduler, applicationAttemptId,
"a_user", q, activeUsersManager, rmContext);
q.addApp(appAttempt, true);
queueManager.removeEmptyDynamicQueues();
q = queueManager.getLeafQueue("root.leaf1", false);
assertNotNull("root.leaf1 has been removed", q);
assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
// the appAttempt finished, the queue should be empty
q.removeApp(appAttempt);
q = queueManager.getLeafQueue("root.leaf1", false);
assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q));
// removeEmptyDynamicQueues() should remove the queue
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q = queueManager.getLeafQueue("root.leaf1", false);
assertNull("root.leaf1 has not been removed", q);
}
@Test
public void testRemovalOfIncompatibleNonEmptyQueue()
throws Exception {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.a");
scheduler.allocConf = allocConf;
queueManager.updateAllocationConfiguration(allocConf);
FSLeafQueue q = queueManager.getLeafQueue("root.a", true);
assertNotNull("root.a does not exist", q);
assertTrue("root.a is not empty", queueManager.isEmpty(q));
// we start to run an application on root.a
notEmptyQueues.add(q);
q = queueManager.getLeafQueue("root.a", false);
assertNotNull("root.a does not exist", q);
assertFalse("root.a is empty", queueManager.isEmpty(q));
// root.a should not be removed by removeEmptyDynamicQueues or by
// removePendingIncompatibleQueues
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
q = queueManager.getLeafQueue("root.a", false);
assertNotNull("root.a does not exist", q);
// let's introduce queue incompatibility
allocConf.configuredQueues.get(FSQueueType.LEAF).remove("root.a");
allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.a");
allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.a.b");
queueManager.updateAllocationConfiguration(allocConf);
// since root.a has running applications, it should be still a leaf queue
q = queueManager.getLeafQueue("root.a", false);
assertNotNull("root.a has been removed", q);
assertFalse("root.a is empty", queueManager.isEmpty(q));
// removePendingIncompatibleQueues should still keep root.a as a leaf queue
queueManager.removePendingIncompatibleQueues();
q = queueManager.getLeafQueue("root.a", false);
assertNotNull("root.a has been removed", q);
assertFalse("root.a is empty", queueManager.isEmpty(q));
// when the application finishes, root.a should be a parent queue
notEmptyQueues.clear();
queueManager.removePendingIncompatibleQueues();
queueManager.removeEmptyDynamicQueues();
FSParentQueue p = queueManager.getParentQueue("root.a", false);
assertNotNull("root.a does not exist", p);
}
}