YARN-8990. Fix fair scheduler race condition in app submit and queue cleanup. (Contributed by Wilfred Spiegelenburg)
This commit is contained in:
parent
89b49167a5
commit
524a7523c4
@ -651,4 +651,18 @@ public void addAssignedApp(ApplicationId applicationId) {
|
|||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is called when an application is removed from this queue
|
||||||
|
* during the submit process.
|
||||||
|
* @param applicationId the application's id
|
||||||
|
*/
|
||||||
|
public void removeAssignedApp(ApplicationId applicationId) {
|
||||||
|
writeLock.lock();
|
||||||
|
try {
|
||||||
|
assignedApps.remove(applicationId);
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -473,7 +473,7 @@ protected void addApplication(ApplicationId applicationId,
|
|||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
RMApp rmApp = rmContext.getRMApps().get(applicationId);
|
||||||
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
|
FSLeafQueue queue = assignToQueue(rmApp, queueName, user, applicationId);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -499,6 +499,7 @@ protected void addApplication(ApplicationId applicationId,
|
|||||||
applicationId, queue.getName(),
|
applicationId, queue.getName(),
|
||||||
invalidAMResourceRequests, queue.getMaxShare());
|
invalidAMResourceRequests, queue.getMaxShare());
|
||||||
rejectApplicationWithMessage(applicationId, msg);
|
rejectApplicationWithMessage(applicationId, msg);
|
||||||
|
queue.removeAssignedApp(applicationId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -513,6 +514,7 @@ protected void addApplication(ApplicationId applicationId,
|
|||||||
+ " cannot submit applications to queue " + queue.getName()
|
+ " cannot submit applications to queue " + queue.getName()
|
||||||
+ "(requested queuename is " + queueName + ")";
|
+ "(requested queuename is " + queueName + ")";
|
||||||
rejectApplicationWithMessage(applicationId, msg);
|
rejectApplicationWithMessage(applicationId, msg);
|
||||||
|
queue.removeAssignedApp(applicationId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -520,7 +522,6 @@ protected void addApplication(ApplicationId applicationId,
|
|||||||
new SchedulerApplication<FSAppAttempt>(queue, user);
|
new SchedulerApplication<FSAppAttempt>(queue, user);
|
||||||
applications.put(applicationId, application);
|
applications.put(applicationId, application);
|
||||||
queue.getMetrics().submitApp(user);
|
queue.getMetrics().submitApp(user);
|
||||||
queue.addAssignedApp(applicationId);
|
|
||||||
|
|
||||||
LOG.info("Accepted application " + applicationId + " from user: " + user
|
LOG.info("Accepted application " + applicationId + " from user: " + user
|
||||||
+ ", in queue: " + queue.getName()
|
+ ", in queue: " + queue.getName()
|
||||||
@ -597,11 +598,19 @@ protected void addApplicationAttempt(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper method that attempts to assign the app to a queue. The method is
|
* Helper method for the tests to assign the app to a queue.
|
||||||
* responsible to call the appropriate event-handler if the app is rejected.
|
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
|
FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
|
||||||
|
return assignToQueue(rmApp, queueName, user, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method that attempts to assign the app to a queue. The method is
|
||||||
|
* responsible to call the appropriate event-handler if the app is rejected.
|
||||||
|
*/
|
||||||
|
private FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user,
|
||||||
|
ApplicationId applicationId) {
|
||||||
FSLeafQueue queue = null;
|
FSLeafQueue queue = null;
|
||||||
String appRejectMsg = null;
|
String appRejectMsg = null;
|
||||||
|
|
||||||
@ -611,7 +620,7 @@ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
|
|||||||
if (queueName == null) {
|
if (queueName == null) {
|
||||||
appRejectMsg = "Application rejected by queue placement policy";
|
appRejectMsg = "Application rejected by queue placement policy";
|
||||||
} else {
|
} else {
|
||||||
queue = queueMgr.getLeafQueue(queueName, true);
|
queue = queueMgr.getLeafQueue(queueName, true, applicationId);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
appRejectMsg = queueName + " is not a leaf queue";
|
appRejectMsg = queueName + " is not a leaf queue";
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
import org.xml.sax.SAXException;
|
import org.xml.sax.SAXException;
|
||||||
@ -71,7 +72,7 @@ private void execute() {
|
|||||||
Boolean removed =
|
Boolean removed =
|
||||||
removeEmptyIncompatibleQueues(queueToCreate, queueType).orElse(null);
|
removeEmptyIncompatibleQueues(queueToCreate, queueType).orElse(null);
|
||||||
if (Boolean.TRUE.equals(removed)) {
|
if (Boolean.TRUE.equals(removed)) {
|
||||||
FSQueue queue = getQueue(queueToCreate, true, queueType, false);
|
FSQueue queue = getQueue(queueToCreate, true, queueType, false, null);
|
||||||
if (queue != null &&
|
if (queue != null &&
|
||||||
// if queueToCreate is present in the allocation config, set it
|
// if queueToCreate is present in the allocation config, set it
|
||||||
// to static
|
// to static
|
||||||
@ -124,30 +125,49 @@ public void initialize(Configuration conf) throws IOException,
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a leaf queue by name, creating it if the create param is
|
* Get a leaf queue by name, creating it if the create param is
|
||||||
* true and is necessary.
|
* <code>true</code> and the queue does not exist.
|
||||||
* If the queue is not or can not be a leaf queue, i.e. it already exists as a
|
* If the queue is not or can not be a leaf queue, i.e. it already exists as
|
||||||
* parent queue, or one of the parents in its name is already a leaf queue,
|
* a parent queue, or one of the parents in its name is already a leaf queue,
|
||||||
* null is returned.
|
* <code>null</code> is returned.
|
||||||
*
|
*
|
||||||
* The root part of the name is optional, so a queue underneath the root
|
* The root part of the name is optional, so a queue underneath the root
|
||||||
* named "queue1" could be referred to as just "queue1", and a queue named
|
* named "queue1" could be referred to as just "queue1", and a queue named
|
||||||
* "queue2" underneath a parent named "parent1" that is underneath the root
|
* "queue2" underneath a parent named "parent1" that is underneath the root
|
||||||
* could be referred to as just "parent1.queue2".
|
* could be referred to as just "parent1.queue2".
|
||||||
|
* @param name name of the queue
|
||||||
|
* @param create <code>true</code> if the queue must be created if it does
|
||||||
|
* not exist, <code>false</code> otherwise
|
||||||
|
* @return the leaf queue or <code>null</code> if the queue cannot be found
|
||||||
*/
|
*/
|
||||||
public FSLeafQueue getLeafQueue(String name, boolean create) {
|
public FSLeafQueue getLeafQueue(String name, boolean create) {
|
||||||
return getLeafQueue(name, create, true);
|
return getLeafQueue(name, create, null, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
private FSLeafQueue getLeafQueue(
|
/**
|
||||||
String name,
|
* Get a leaf queue by name, creating it if the create param is
|
||||||
boolean create,
|
* <code>true</code> and the queue does not exist.
|
||||||
|
* If the queue is not or can not be a leaf queue, i.e. it already exists as
|
||||||
|
* a parent queue, or one of the parents in its name is already a leaf queue,
|
||||||
|
* <code>null</code> is returned.
|
||||||
|
*
|
||||||
|
* If the application will be assigned to the queue if the applicationId is
|
||||||
|
* not <code>null</code>
|
||||||
|
* @param name name of the queue
|
||||||
|
* @param create <code>true</code> if the queue must be created if it does
|
||||||
|
* not exist, <code>false</code> otherwise
|
||||||
|
* @param applicationId the application ID to assign to the queue
|
||||||
|
* @return the leaf queue or <code>null</code> if teh queue cannot be found
|
||||||
|
*/
|
||||||
|
public FSLeafQueue getLeafQueue(String name, boolean create,
|
||||||
|
ApplicationId applicationId) {
|
||||||
|
return getLeafQueue(name, create, applicationId, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private FSLeafQueue getLeafQueue(String name, boolean create,
|
||||||
|
ApplicationId applicationId,
|
||||||
boolean recomputeSteadyShares) {
|
boolean recomputeSteadyShares) {
|
||||||
FSQueue queue = getQueue(
|
FSQueue queue = getQueue(name, create, FSQueueType.LEAF,
|
||||||
name,
|
recomputeSteadyShares, applicationId);
|
||||||
create,
|
|
||||||
FSQueueType.LEAF,
|
|
||||||
recomputeSteadyShares
|
|
||||||
);
|
|
||||||
if (queue instanceof FSParentQueue) {
|
if (queue instanceof FSParentQueue) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -168,42 +188,55 @@ public boolean removeLeafQueue(String name) {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a parent queue by name, creating it if the create param is
|
* Get a parent queue by name, creating it if the create param is
|
||||||
* true and is necessary.
|
* <code>true</code> and the queue does not exist.
|
||||||
* If the queue is not or can not be a parent queue,
|
* If the queue is not or can not be a parent queue, i.e. it already exists
|
||||||
* i.e. it already exists as a
|
* as a leaf queue, or one of the parents in its name is already a leaf
|
||||||
* leaf queue, or one of the parents in its name is already a leaf queue,
|
* queue, <code>null</code> is returned.
|
||||||
* null is returned.
|
|
||||||
*
|
*
|
||||||
* The root part of the name is optional, so a queue underneath the root
|
* The root part of the name is optional, so a queue underneath the root
|
||||||
* named "queue1" could be referred to as just "queue1", and a queue named
|
* named "queue1" could be referred to as just "queue1", and a queue named
|
||||||
* "queue2" underneath a parent named "parent1" that is underneath the root
|
* "queue2" underneath a parent named "parent1" that is underneath the root
|
||||||
* could be referred to as just "parent1.queue2".
|
* could be referred to as just "parent1.queue2".
|
||||||
|
* @param name name of the queue
|
||||||
|
* @param create <code>true</code> if the queue must be created if it does
|
||||||
|
* not exist, <code>false</code> otherwise
|
||||||
|
* @return the parent queue or <code>null</code> if the queue cannot be found
|
||||||
*/
|
*/
|
||||||
public FSParentQueue getParentQueue(String name, boolean create) {
|
public FSParentQueue getParentQueue(String name, boolean create) {
|
||||||
return getParentQueue(name, create, true);
|
return getParentQueue(name, create, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public FSParentQueue getParentQueue(
|
/**
|
||||||
String name,
|
* Get a parent queue by name, creating it if the create param is
|
||||||
boolean create,
|
* <code>true</code> and the queue does not exist.
|
||||||
|
* If the queue is not or can not be a parent queue, i.e. it already exists
|
||||||
|
* as a leaf queue, or one of the parents in its name is already a leaf
|
||||||
|
* queue, <code>null</code> is returned.
|
||||||
|
*
|
||||||
|
* The root part of the name is optional, so a queue underneath the root
|
||||||
|
* named "queue1" could be referred to as just "queue1", and a queue named
|
||||||
|
* "queue2" underneath a parent named "parent1" that is underneath the root
|
||||||
|
* could be referred to as just "parent1.queue2".
|
||||||
|
* @param name name of the queue
|
||||||
|
* @param create <code>true</code> if the queue must be created if it does
|
||||||
|
* not exist, <code>false</code> otherwise
|
||||||
|
* @param recomputeSteadyShares <code>true</code> if the steady fair share
|
||||||
|
* should be recalculated when a queue is added,
|
||||||
|
* <code>false</code> otherwise
|
||||||
|
* @return the parent queue or <code>null</code> if the queue cannot be found
|
||||||
|
*/
|
||||||
|
public FSParentQueue getParentQueue(String name, boolean create,
|
||||||
boolean recomputeSteadyShares) {
|
boolean recomputeSteadyShares) {
|
||||||
FSQueue queue = getQueue(
|
FSQueue queue = getQueue(name, create, FSQueueType.PARENT,
|
||||||
name,
|
recomputeSteadyShares, null);
|
||||||
create,
|
|
||||||
FSQueueType.PARENT,
|
|
||||||
recomputeSteadyShares
|
|
||||||
);
|
|
||||||
if (queue instanceof FSLeafQueue) {
|
if (queue instanceof FSLeafQueue) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return (FSParentQueue) queue;
|
return (FSParentQueue) queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
private FSQueue getQueue(
|
private FSQueue getQueue(String name, boolean create, FSQueueType queueType,
|
||||||
String name,
|
boolean recomputeSteadyShares, ApplicationId applicationId) {
|
||||||
boolean create,
|
|
||||||
FSQueueType queueType,
|
|
||||||
boolean recomputeSteadyShares) {
|
|
||||||
boolean recompute = recomputeSteadyShares;
|
boolean recompute = recomputeSteadyShares;
|
||||||
name = ensureRootPrefix(name);
|
name = ensureRootPrefix(name);
|
||||||
FSQueue queue;
|
FSQueue queue;
|
||||||
@ -215,8 +248,14 @@ private FSQueue getQueue(
|
|||||||
} else {
|
} else {
|
||||||
recompute = false;
|
recompute = false;
|
||||||
}
|
}
|
||||||
|
// At this point the queue exists and we need to assign the app if to the
|
||||||
|
// but only to a leaf queue
|
||||||
|
if (applicationId != null && queue instanceof FSLeafQueue) {
|
||||||
|
((FSLeafQueue)queue).addAssignedApp(applicationId);
|
||||||
}
|
}
|
||||||
if (recompute) {
|
}
|
||||||
|
// Don't recompute if it is an existing queue or no change was made
|
||||||
|
if (recompute && queue != null) {
|
||||||
rootQueue.recomputeSteadyShares();
|
rootQueue.recomputeSteadyShares();
|
||||||
}
|
}
|
||||||
return queue;
|
return queue;
|
||||||
@ -614,7 +653,7 @@ private void ensureQueueExistsAndIsCompatibleAndIsStatic(
|
|||||||
incompatibleQueuesPendingRemoval.add(
|
incompatibleQueuesPendingRemoval.add(
|
||||||
new IncompatibleQueueRemovalTask(name, queueType));
|
new IncompatibleQueueRemovalTask(name, queueType));
|
||||||
} else {
|
} else {
|
||||||
FSQueue queue = getQueue(name, true, queueType, false);
|
FSQueue queue = getQueue(name, true, queueType, false, null);
|
||||||
if (queue != null) {
|
if (queue != null) {
|
||||||
queue.setDynamic(false);
|
queue.setDynamic(false);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user