YARN-460. CS user left in list of active users for the queue even when application finished (tgraves)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1462486 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1b87baa06f
commit
01aabf7363
@ -499,6 +499,9 @@ Release 0.23.7 - UNRELEASED
|
|||||||
YARN-109. .tmp file is not deleted for localized archives (Mayank Bansal
|
YARN-109. .tmp file is not deleted for localized archives (Mayank Bansal
|
||||||
via bobby)
|
via bobby)
|
||||||
|
|
||||||
|
YARN-460. CS user left in list of active users for the queue even when
|
||||||
|
application finished (tgraves)
|
||||||
|
|
||||||
Release 0.23.6 - UNRELEASED
|
Release 0.23.6 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -504,6 +504,14 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
|
|||||||
|
|
||||||
synchronized (application) {
|
synchronized (application) {
|
||||||
|
|
||||||
|
// make sure we aren't stopping/removing the application
|
||||||
|
// when the allocate comes in
|
||||||
|
if (application.isStopped()) {
|
||||||
|
LOG.info("Calling allocate on a stopped " +
|
||||||
|
"application " + applicationAttemptId);
|
||||||
|
return EMPTY_ALLOCATION;
|
||||||
|
}
|
||||||
|
|
||||||
if (!ask.isEmpty()) {
|
if (!ask.isEmpty()) {
|
||||||
|
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
|
@ -92,6 +92,9 @@ public class FiCaSchedulerApp extends SchedulerApplication {
|
|||||||
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
|
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
|
||||||
new HashMap<Priority, Map<NodeId, RMContainer>>();
|
new HashMap<Priority, Map<NodeId, RMContainer>>();
|
||||||
|
|
||||||
|
private boolean isStopped = false;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Count how many times the application has been given an opportunity
|
* Count how many times the application has been given an opportunity
|
||||||
* to schedule a task at each priority. Each time the scheduler
|
* to schedule a task at each priority. Each time the scheduler
|
||||||
@ -132,7 +135,9 @@ public String getUser() {
|
|||||||
|
|
||||||
public synchronized void updateResourceRequests(
|
public synchronized void updateResourceRequests(
|
||||||
List<ResourceRequest> requests) {
|
List<ResourceRequest> requests) {
|
||||||
this.appSchedulingInfo.updateResourceRequests(requests);
|
if (!isStopped) {
|
||||||
|
this.appSchedulingInfo.updateResourceRequests(requests);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
|
public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
|
||||||
@ -168,6 +173,10 @@ public boolean isPending() {
|
|||||||
return this.appSchedulingInfo.isPending();
|
return this.appSchedulingInfo.isPending();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized boolean isStopped() {
|
||||||
|
return this.isStopped;
|
||||||
|
}
|
||||||
|
|
||||||
public String getQueueName() {
|
public String getQueueName() {
|
||||||
return this.appSchedulingInfo.getQueueName();
|
return this.appSchedulingInfo.getQueueName();
|
||||||
}
|
}
|
||||||
@ -183,6 +192,7 @@ public synchronized Collection<RMContainer> getLiveContainers() {
|
|||||||
|
|
||||||
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
|
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
|
||||||
// Cleanup all scheduling information
|
// Cleanup all scheduling information
|
||||||
|
this.isStopped = true;
|
||||||
this.appSchedulingInfo.stop(rmAppAttemptFinalState);
|
this.appSchedulingInfo.stop(rmAppAttemptFinalState);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,6 +245,10 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
|
|||||||
Priority priority, ResourceRequest request,
|
Priority priority, ResourceRequest request,
|
||||||
Container container) {
|
Container container) {
|
||||||
|
|
||||||
|
if (isStopped) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
// Required sanity check - AM can call 'allocate' to update resource
|
// Required sanity check - AM can call 'allocate' to update resource
|
||||||
// request without locking the scheduler, hence we need to check
|
// request without locking the scheduler, hence we need to check
|
||||||
if (getTotalRequiredResources(priority) <= 0) {
|
if (getTotalRequiredResources(priority) <= 0) {
|
||||||
|
@ -252,6 +252,15 @@ public Allocation allocate(
|
|||||||
}
|
}
|
||||||
|
|
||||||
synchronized (application) {
|
synchronized (application) {
|
||||||
|
|
||||||
|
// make sure we aren't stopping/removing the application
|
||||||
|
// when the allocate comes in
|
||||||
|
if (application.isStopped()) {
|
||||||
|
LOG.info("Calling allocate on a stopped " +
|
||||||
|
"application " + applicationAttemptId);
|
||||||
|
return EMPTY_ALLOCATION;
|
||||||
|
}
|
||||||
|
|
||||||
if (!ask.isEmpty()) {
|
if (!ask.isEmpty()) {
|
||||||
LOG.debug("allocate: pre-update" +
|
LOG.debug("allocate: pre-update" +
|
||||||
" applicationId=" + applicationAttemptId +
|
" applicationId=" + applicationAttemptId +
|
||||||
|
Loading…
Reference in New Issue
Block a user