YARN-4502. Fix two AM containers get allocated when AM restart. (Vinod Kumar Vavilapalli via wangda)

This commit is contained in:
Wangda Tan 2016-01-19 09:30:04 +08:00
parent 150f5ae034
commit a44ce3f14f
23 changed files with 275 additions and 202 deletions

View File

@ -70,10 +70,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@ -957,7 +957,7 @@ public synchronized List<Container> getTransferredContainers(
} }
@Override @Override
protected void completedContainer(RMContainer rmContainer, protected void completedContainerInternal(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) { ContainerStatus containerStatus, RMContainerEventType event) {
// do nothing // do nothing
} }

View File

@ -1277,6 +1277,9 @@ Release 2.8.0 - UNRELEASED
YARN-4596. SystemMetricPublisher should not swallow error messages from YARN-4596. SystemMetricPublisher should not swallow error messages from
TimelineClient#putEntities. (Li Lu via jianhe) TimelineClient#putEntities. (Li Lu via jianhe)
YARN-4502. Fix two AM containers get allocated when AM restart.
(Vinod Kumar Vavilapalli via wangda)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -43,7 +43,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@ -51,6 +50,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
@ -257,7 +257,7 @@ private void containerBasedPreemptOrKill(CSQueue root,
// kill it // kill it
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container, new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.KILL_CONTAINER)); SchedulerEventType.KILL_PREEMPTED_CONTAINER));
preempted.remove(container); preempted.remove(container);
} else { } else {
if (preempted.get(container) != null) { if (preempted.get(container) != null) {
@ -268,7 +268,7 @@ private void containerBasedPreemptOrKill(CSQueue root,
//otherwise just send preemption events //otherwise just send preemption events
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent(appAttemptId, container, new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.PREEMPT_CONTAINER)); SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION));
preempted.put(container, clock.getTime()); preempted.put(container, clock.getTime());
} }
} }
@ -764,7 +764,7 @@ private void preemptFrom(FiCaSchedulerApp app,
if (!observeOnly) { if (!observeOnly) {
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
new ContainerPreemptEvent( new ContainerPreemptEvent(
appId, c, SchedulerEventType.DROP_RESERVATION)); appId, c, SchedulerEventType.KILL_RESERVED_CONTAINER));
} }
} }

View File

@ -49,7 +49,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition;
@ -97,7 +96,7 @@ RMContainerEventType.ACQUIRED, new AcquiredTransition())
.addTransition(RMContainerState.ALLOCATED, RMContainerState.EXPIRED, .addTransition(RMContainerState.ALLOCATED, RMContainerState.EXPIRED,
RMContainerEventType.EXPIRE, new FinishedTransition()) RMContainerEventType.EXPIRE, new FinishedTransition())
.addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED, .addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED,
RMContainerEventType.KILL, new ContainerRescheduledTransition()) RMContainerEventType.KILL, new FinishedTransition())
// Transitions from ACQUIRED state // Transitions from ACQUIRED state
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
@ -521,7 +520,8 @@ private static final class AcquiredTransition extends BaseTransition {
@Override @Override
public void transition(RMContainerImpl container, RMContainerEvent event) { public void transition(RMContainerImpl container, RMContainerEvent event) {
// Clear ResourceRequest stored in RMContainer // Clear ResourceRequest stored in RMContainer, we don't need to remember
// this anymore.
container.setResourceRequests(null); container.setResourceRequests(null);
// Register with containerAllocationExpirer. // Register with containerAllocationExpirer.
@ -597,17 +597,6 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
} }
} }
private static final class ContainerRescheduledTransition extends
FinishedTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Tell scheduler to recover request of this container to app
container.eventHandler.handle(new ContainerRescheduledEvent(container));
super.transition(container, event);
}
}
private static class FinishedTransition extends BaseTransition { private static class FinishedTransition extends BaseTransition {
@Override @Override

View File

@ -511,20 +511,28 @@ private RMContainer recoverAndCreateContainer(NMContainerStatus status,
* Recover resource request back from RMContainer when a container is * Recover resource request back from RMContainer when a container is
* preempted before AM pulled the same. If container is pulled by * preempted before AM pulled the same. If container is pulled by
* AM, then RMContainer will not have resource request to recover. * AM, then RMContainer will not have resource request to recover.
* @param rmContainer * @param rmContainer rmContainer
*/ */
protected void recoverResourceRequestForContainer(RMContainer rmContainer) { private void recoverResourceRequestForContainer(RMContainer rmContainer) {
List<ResourceRequest> requests = rmContainer.getResourceRequests(); List<ResourceRequest> requests = rmContainer.getResourceRequests();
// If container state is moved to ACQUIRED, request will be empty. // If container state is moved to ACQUIRED, request will be empty.
if (requests == null) { if (requests == null) {
return; return;
} }
// Add resource request back to Scheduler.
SchedulerApplicationAttempt schedulerAttempt // Add resource request back to Scheduler ApplicationAttempt.
= getCurrentAttemptForContainer(rmContainer.getContainerId());
// We lookup the application-attempt here again using
// getCurrentApplicationAttempt() because there is only one app-attempt at
// any point in the scheduler. But in corner cases, AMs can crash,
// corresponding containers get killed and recovered to the same-attempt,
// but because the app-attempt is extinguished right after, the recovered
// requests don't serve any purpose, but that's okay.
SchedulerApplicationAttempt schedulerAttempt =
getCurrentAttemptForContainer(rmContainer.getContainerId());
if (schedulerAttempt != null) { if (schedulerAttempt != null) {
schedulerAttempt.recoverResourceRequests(requests); schedulerAttempt.recoverResourceRequestsForContainer(requests);
} }
} }
@ -559,8 +567,30 @@ public void clearPendingContainerCache() {
} }
} }
@VisibleForTesting
@Private
// clean up a completed container // clean up a completed container
protected abstract void completedContainer(RMContainer rmContainer, public void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Container " + containerStatus.getContainerId()
+ " completed with event " + event
+ ", but corresponding RMContainer doesn't exist.");
return;
}
completedContainerInternal(rmContainer, containerStatus, event);
// If the container is getting killed in ACQUIRED state, the requester (AM
// for regular containers and RM itself for AM container) will not know what
// happened. Simply add the ResourceRequest back again so that requester
// doesn't need to do anything conditionally.
recoverResourceRequestForContainer(rmContainer);
}
// clean up a completed container
protected abstract void completedContainerInternal(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event); ContainerStatus containerStatus, RMContainerEventType event);
protected void releaseContainers(List<ContainerId> containers, protected void releaseContainers(List<ContainerId> containers,

View File

@ -289,12 +289,15 @@ public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
* application, by asking for more resources and releasing resources acquired * application, by asking for more resources and releasing resources acquired
* by the application. * by the application.
* *
* @param requests resources to be acquired * @param requests
* @param recoverPreemptedRequest recover ResourceRequest on preemption * resources to be acquired
* @param recoverPreemptedRequestForAContainer
* recover ResourceRequest on preemption
* @return true if any resource was updated, false otherwise * @return true if any resource was updated, false otherwise
*/ */
public synchronized boolean updateResourceRequests( public synchronized boolean updateResourceRequests(
List<ResourceRequest> requests, boolean recoverPreemptedRequest) { List<ResourceRequest> requests,
boolean recoverPreemptedRequestForAContainer) {
// Flag to track if any incoming requests update "ANY" requests // Flag to track if any incoming requests update "ANY" requests
boolean anyResourcesUpdated = false; boolean anyResourcesUpdated = false;
@ -315,7 +318,7 @@ public synchronized boolean updateResourceRequests(
// Increment number of containers if recovering preempted resources // Increment number of containers if recovering preempted resources
ResourceRequest lastRequest = asks.get(resourceName); ResourceRequest lastRequest = asks.get(resourceName);
if (recoverPreemptedRequest && lastRequest != null) { if (recoverPreemptedRequestForAContainer && lastRequest != null) {
request.setNumContainers(lastRequest.getNumContainers() + 1); request.setNumContainers(lastRequest.getNumContainers() + 1);
} }

View File

@ -31,7 +31,7 @@ public interface PreemptableResourceScheduler extends ResourceScheduler {
* ask the scheduler to drop the reservation for the given container. * ask the scheduler to drop the reservation for the given container.
* @param container Reference to reserved container allocation. * @param container Reference to reserved container allocation.
*/ */
void dropContainerReservation(RMContainer container); void killReservedContainer(RMContainer container);
/** /**
* Ask the scheduler to obtain back the container from a specific application * Ask the scheduler to obtain back the container from a specific application
@ -39,12 +39,12 @@ public interface PreemptableResourceScheduler extends ResourceScheduler {
* @param aid the application from which we want to get a container back * @param aid the application from which we want to get a container back
* @param container the container we want back * @param container the container we want back
*/ */
void preemptContainer(ApplicationAttemptId aid, RMContainer container); void markContainerForPreemption(ApplicationAttemptId aid, RMContainer container);
/** /**
* Ask the scheduler to forcibly interrupt the container given as input * Ask the scheduler to forcibly interrupt the container given as input
* @param container * @param container
*/ */
void killContainer(RMContainer container); void killPreemptedContainer(RMContainer container);
} }

View File

@ -322,7 +322,7 @@ public synchronized boolean updateResourceRequests(
return false; return false;
} }
public synchronized void recoverResourceRequests( public synchronized void recoverResourceRequestsForContainer(
List<ResourceRequest> requests) { List<ResourceRequest> requests) {
if (!isStopped) { if (!isStopped) {
appSchedulingInfo.updateResourceRequests(requests, true); appSchedulingInfo.updateResourceRequests(requests, true);

View File

@ -93,7 +93,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@ -115,13 +114,14 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -865,7 +865,7 @@ private synchronized void doneApplicationAttempt(
LOG.info("Skip killing " + rmContainer.getContainerId()); LOG.info("Skip killing " + rmContainer.getContainerId());
continue; continue;
} }
completedContainer( super.completedContainer(
rmContainer, rmContainer,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
@ -874,7 +874,7 @@ private synchronized void doneApplicationAttempt(
// Release all reserved containers // Release all reserved containers
for (RMContainer rmContainer : attempt.getReservedContainers()) { for (RMContainer rmContainer : attempt.getReservedContainers()) {
completedContainer( super.completedContainer(
rmContainer, rmContainer,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(), "Application Complete"), rmContainer.getContainerId(), "Application Complete"),
@ -1047,7 +1047,7 @@ private synchronized void nodeUpdate(RMNode nm) {
for (ContainerStatus completedContainer : completedContainers) { for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId(); ContainerId containerId = completedContainer.getContainerId();
RMContainer container = getRMContainer(containerId); RMContainer container = getRMContainer(containerId);
completedContainer(container, completedContainer, super.completedContainer(container, completedContainer,
RMContainerEventType.FINISHED); RMContainerEventType.FINISHED);
if (container != null) { if (container != null) {
releasedContainers++; releasedContainers++;
@ -1128,7 +1128,7 @@ private synchronized void updateLabelsOnNode(NodeId nodeId,
// Unreserve container on this node // Unreserve container on this node
RMContainer reservedContainer = node.getReservedContainer(); RMContainer reservedContainer = node.getReservedContainer();
if (null != reservedContainer) { if (null != reservedContainer) {
dropContainerReservation(reservedContainer); killReservedContainer(reservedContainer);
} }
// Update node labels after we've done this // Update node labels after we've done this
@ -1372,42 +1372,35 @@ public void handle(SchedulerEvent event) {
ContainerExpiredSchedulerEvent containerExpiredEvent = ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event; (ContainerExpiredSchedulerEvent) event;
ContainerId containerId = containerExpiredEvent.getContainerId(); ContainerId containerId = containerExpiredEvent.getContainerId();
completedContainer(getRMContainer(containerId), super.completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
containerId, containerId,
SchedulerUtils.EXPIRED_CONTAINER), SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE); RMContainerEventType.EXPIRE);
} }
break; break;
case DROP_RESERVATION: case KILL_RESERVED_CONTAINER:
{ {
ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event; ContainerPreemptEvent killReservedContainerEvent =
RMContainer container = dropReservationEvent.getContainer(); (ContainerPreemptEvent) event;
dropContainerReservation(container); RMContainer container = killReservedContainerEvent.getContainer();
killReservedContainer(container);
} }
break; break;
case PREEMPT_CONTAINER: case MARK_CONTAINER_FOR_PREEMPTION:
{ {
ContainerPreemptEvent preemptContainerEvent = ContainerPreemptEvent preemptContainerEvent =
(ContainerPreemptEvent)event; (ContainerPreemptEvent)event;
ApplicationAttemptId aid = preemptContainerEvent.getAppId(); ApplicationAttemptId aid = preemptContainerEvent.getAppId();
RMContainer containerToBePreempted = preemptContainerEvent.getContainer(); RMContainer containerToBePreempted = preemptContainerEvent.getContainer();
preemptContainer(aid, containerToBePreempted); markContainerForPreemption(aid, containerToBePreempted);
} }
break; break;
case KILL_CONTAINER: case KILL_PREEMPTED_CONTAINER:
{ {
ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event; ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event;
RMContainer containerToBeKilled = killContainerEvent.getContainer(); RMContainer containerToBeKilled = killContainerEvent.getContainer();
killContainer(containerToBeKilled); killPreemptedContainer(containerToBeKilled);
}
break;
case CONTAINER_RESCHEDULED:
{
ContainerRescheduledEvent containerRescheduledEvent =
(ContainerRescheduledEvent) event;
RMContainer container = containerRescheduledEvent.getContainer();
recoverResourceRequestForContainer(container);
} }
break; break;
default: default:
@ -1462,7 +1455,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
// Remove running containers // Remove running containers
List<RMContainer> runningContainers = node.getRunningContainers(); List<RMContainer> runningContainers = node.getRunningContainers();
for (RMContainer container : runningContainers) { for (RMContainer container : runningContainers) {
completedContainer(container, super.completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), container.getContainerId(),
SchedulerUtils.LOST_CONTAINER), SchedulerUtils.LOST_CONTAINER),
@ -1472,7 +1465,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
// Remove reservations, if any // Remove reservations, if any
RMContainer reservedContainer = node.getReservedContainer(); RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) { if (reservedContainer != null) {
completedContainer(reservedContainer, super.completedContainer(reservedContainer,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
reservedContainer.getContainerId(), reservedContainer.getContainerId(),
SchedulerUtils.LOST_CONTAINER), SchedulerUtils.LOST_CONTAINER),
@ -1488,13 +1481,9 @@ private synchronized void removeNode(RMNode nodeInfo) {
@Lock(CapacityScheduler.class) @Lock(CapacityScheduler.class)
@Override @Override
protected synchronized void completedContainer(RMContainer rmContainer, protected synchronized void completedContainerInternal(
ContainerStatus containerStatus, RMContainerEventType event) { RMContainer rmContainer, ContainerStatus containerStatus,
if (rmContainer == null) { RMContainerEventType event) {
LOG.info("Container " + containerStatus.getContainerId() +
" completed with event " + event);
return;
}
Container container = rmContainer.getContainer(); Container container = rmContainer.getContainer();
@ -1596,11 +1585,14 @@ public void recover(RMState state) throws Exception {
} }
@Override @Override
public void dropContainerReservation(RMContainer container) { public void killReservedContainer(RMContainer container) {
if(LOG.isDebugEnabled()){ if(LOG.isDebugEnabled()){
LOG.debug("DROP_RESERVATION:" + container.toString()); LOG.debug(SchedulerEventType.KILL_RESERVED_CONTAINER + ":"
+ container.toString());
} }
completedContainer(container, // To think: What happens if this is no longer a reserved container, for
// e.g if the reservation became an allocation.
super.completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), container.getContainerId(),
SchedulerUtils.UNRESERVED_CONTAINER), SchedulerUtils.UNRESERVED_CONTAINER),
@ -1608,25 +1600,28 @@ public void dropContainerReservation(RMContainer container) {
} }
@Override @Override
public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) { public void markContainerForPreemption(ApplicationAttemptId aid,
RMContainer cont) {
if(LOG.isDebugEnabled()){ if(LOG.isDebugEnabled()){
LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() + LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION
" container: " + cont.toString()); + ": appAttempt:" + aid.toString() + " container: "
+ cont.toString());
} }
FiCaSchedulerApp app = getApplicationAttempt(aid); FiCaSchedulerApp app = getApplicationAttempt(aid);
if (app != null) { if (app != null) {
app.addPreemptContainer(cont.getContainerId()); app.markContainerForPreemption(cont.getContainerId());
} }
} }
@Override @Override
public void killContainer(RMContainer cont) { public void killPreemptedContainer(RMContainer cont) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("KILL_CONTAINER: container" + cont.toString()); LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container"
+ cont.toString());
} }
completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( super.completedContainer(cont, SchedulerUtils
cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), .createPreemptedContainerStatus(cont.getContainerId(),
RMContainerEventType.KILL); SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
} }
@Override @Override

View File

@ -301,7 +301,7 @@ public synchronized Resource getTotalPendingRequests() {
return ret; return ret;
} }
public synchronized void addPreemptContainer(ContainerId cont) { public synchronized void markContainerForPreemption(ContainerId cont) {
// ignore already completed containers // ignore already completed containers
if (liveContainers.containsKey(cont)) { if (liveContainers.containsKey(cont)) {
containersToPreempt.add(cont); containersToPreempt.add(cont);

View File

@ -16,15 +16,14 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
/** /**
* Simple event class used to communicate containers unreservations, preemption, killing * Simple event class used to communicate kill reserved containers, mark
* containers for preemption and kill already preemption-marked containers.
*/ */
public class ContainerPreemptEvent extends SchedulerEvent { public class ContainerPreemptEvent extends SchedulerEvent {

View File

@ -1,35 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
public class ContainerRescheduledEvent extends SchedulerEvent {
private RMContainer container;
public ContainerRescheduledEvent(RMContainer container) {
super(SchedulerEventType.CONTAINER_RESCHEDULED);
this.container = container;
}
public RMContainer getContainer() {
return container;
}
}

View File

@ -38,11 +38,10 @@ public enum SchedulerEventType {
// Source: ContainerAllocationExpirer // Source: ContainerAllocationExpirer
CONTAINER_EXPIRED, CONTAINER_EXPIRED,
// Source: RMContainer
CONTAINER_RESCHEDULED,
// Source: SchedulingEditPolicy // Source: SchedulingEditPolicy
DROP_RESERVATION, KILL_RESERVED_CONTAINER,
PREEMPT_CONTAINER, MARK_CONTAINER_FOR_PREEMPTION, // Mark a container for preemption
KILL_CONTAINER // in the near future
KILL_PREEMPTED_CONTAINER // Kill a container previously marked for
// preemption
} }

View File

@ -85,7 +85,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@ -498,7 +497,7 @@ protected void warnOrKillContainer(RMContainer container) {
// TODO: Not sure if this ever actually adds this to the list of cleanup // TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()). // containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL); super.completedContainer(container, status, RMContainerEventType.KILL);
LOG.info("Killing container" + container + LOG.info("Killing container" + container +
" (after waiting for preemption for " + " (after waiting for preemption for " +
(getClock().getTime() - time) + "ms)"); (getClock().getTime() - time) + "ms)");
@ -807,7 +806,7 @@ private synchronized void removeApplicationAttempt(
LOG.info("Skip killing " + rmContainer.getContainerId()); LOG.info("Skip killing " + rmContainer.getContainerId());
continue; continue;
} }
completedContainer(rmContainer, super.completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(), rmContainer.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION), SchedulerUtils.COMPLETED_APPLICATION),
@ -816,7 +815,7 @@ private synchronized void removeApplicationAttempt(
// Release all reserved containers // Release all reserved containers
for (RMContainer rmContainer : attempt.getReservedContainers()) { for (RMContainer rmContainer : attempt.getReservedContainers()) {
completedContainer(rmContainer, super.completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(), rmContainer.getContainerId(),
"Application Complete"), "Application Complete"),
@ -843,13 +842,9 @@ private synchronized void removeApplicationAttempt(
* Clean up a completed container. * Clean up a completed container.
*/ */
@Override @Override
protected synchronized void completedContainer(RMContainer rmContainer, protected synchronized void completedContainerInternal(
ContainerStatus containerStatus, RMContainerEventType event) { RMContainer rmContainer, ContainerStatus containerStatus,
if (rmContainer == null) { RMContainerEventType event) {
LOG.info("Container " + containerStatus.getContainerId()
+ " completed with event " + event);
return;
}
Container container = rmContainer.getContainer(); Container container = rmContainer.getContainer();
@ -919,7 +914,7 @@ private synchronized void removeNode(RMNode rmNode) {
// Remove running containers // Remove running containers
List<RMContainer> runningContainers = node.getRunningContainers(); List<RMContainer> runningContainers = node.getRunningContainers();
for (RMContainer container : runningContainers) { for (RMContainer container : runningContainers) {
completedContainer(container, super.completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), container.getContainerId(),
SchedulerUtils.LOST_CONTAINER), SchedulerUtils.LOST_CONTAINER),
@ -929,7 +924,7 @@ private synchronized void removeNode(RMNode rmNode) {
// Remove reservations, if any // Remove reservations, if any
RMContainer reservedContainer = node.getReservedContainer(); RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) { if (reservedContainer != null) {
completedContainer(reservedContainer, super.completedContainer(reservedContainer,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
reservedContainer.getContainerId(), reservedContainer.getContainerId(),
SchedulerUtils.LOST_CONTAINER), SchedulerUtils.LOST_CONTAINER),
@ -1057,7 +1052,7 @@ private synchronized void nodeUpdate(RMNode nm) {
for (ContainerStatus completedContainer : completedContainers) { for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId(); ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId); LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId), super.completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED); completedContainer, RMContainerEventType.FINISHED);
} }
@ -1302,21 +1297,12 @@ public void handle(SchedulerEvent event) {
ContainerExpiredSchedulerEvent containerExpiredEvent = ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent)event; (ContainerExpiredSchedulerEvent)event;
ContainerId containerId = containerExpiredEvent.getContainerId(); ContainerId containerId = containerExpiredEvent.getContainerId();
completedContainer(getRMContainer(containerId), super.completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
containerId, containerId,
SchedulerUtils.EXPIRED_CONTAINER), SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE); RMContainerEventType.EXPIRE);
break; break;
case CONTAINER_RESCHEDULED:
if (!(event instanceof ContainerRescheduledEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
ContainerRescheduledEvent containerRescheduledEvent =
(ContainerRescheduledEvent) event;
RMContainer container = containerRescheduledEvent.getContainer();
recoverResourceRequestForContainer(container);
break;
default: default:
LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
} }

View File

@ -74,10 +74,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -86,7 +86,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@ -468,7 +467,7 @@ private synchronized void doneApplicationAttempt(
LOG.info("Skip killing " + container.getContainerId()); LOG.info("Skip killing " + container.getContainerId());
continue; continue;
} }
completedContainer(container, super.completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL); RMContainerEventType.KILL);
@ -739,7 +738,7 @@ private synchronized void nodeUpdate(RMNode rmNode) {
for (ContainerStatus completedContainer : completedContainers) { for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId(); ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId); LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId), super.completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED); completedContainer, RMContainerEventType.FINISHED);
} }
@ -858,21 +857,13 @@ public void handle(SchedulerEvent event) {
ContainerExpiredSchedulerEvent containerExpiredEvent = ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event; (ContainerExpiredSchedulerEvent) event;
ContainerId containerid = containerExpiredEvent.getContainerId(); ContainerId containerid = containerExpiredEvent.getContainerId();
completedContainer(getRMContainer(containerid), super.completedContainer(getRMContainer(containerid),
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
containerid, containerid,
SchedulerUtils.EXPIRED_CONTAINER), SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE); RMContainerEventType.EXPIRE);
} }
break; break;
case CONTAINER_RESCHEDULED:
{
ContainerRescheduledEvent containerRescheduledEvent =
(ContainerRescheduledEvent) event;
RMContainer container = containerRescheduledEvent.getContainer();
recoverResourceRequestForContainer(container);
}
break;
default: default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
} }
@ -880,12 +871,9 @@ public void handle(SchedulerEvent event) {
@Lock(FifoScheduler.class) @Lock(FifoScheduler.class)
@Override @Override
protected synchronized void completedContainer(RMContainer rmContainer, protected synchronized void completedContainerInternal(
ContainerStatus containerStatus, RMContainerEventType event) { RMContainer rmContainer, ContainerStatus containerStatus,
if (rmContainer == null) { RMContainerEventType event) {
LOG.info("Null container completed...");
return;
}
// Get the application for the finished container // Get the application for the finished container
Container container = rmContainer.getContainer(); Container container = rmContainer.getContainer();
@ -931,7 +919,7 @@ private synchronized void removeNode(RMNode nodeInfo) {
} }
// Kill running containers // Kill running containers
for(RMContainer container : node.getRunningContainers()) { for(RMContainer container : node.getRunningContainers()) {
completedContainer(container, super.completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), container.getContainerId(),
SchedulerUtils.LOST_CONTAINER), SchedulerUtils.LOST_CONTAINER),

View File

@ -275,7 +275,8 @@ public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
} }
container = getResourceScheduler().getRMContainer(containerId); container = getResourceScheduler().getRMContainer(containerId);
System.out.println("Waiting for container " + containerId + " to be allocated."); System.out.println("Waiting for container " + containerId + " to be "
+ containerState + ", container is null right now.");
Thread.sleep(100); Thread.sleep(100);
if (timeoutMillisecs <= timeoutSecs * 100) { if (timeoutMillisecs <= timeoutSecs * 100) {

View File

@ -29,8 +29,8 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.junit.Assert; import org.junit.Assert;
@ -55,20 +55,22 @@ public void testSchedulerEventDispatcherForPreemptionEvents() {
ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class); ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class);
RMContainer container = mock(RMContainer.class); RMContainer container = mock(RMContainer.class);
ContainerPreemptEvent event1 = new ContainerPreemptEvent( ContainerPreemptEvent event1 = new ContainerPreemptEvent(
appAttemptId, container, SchedulerEventType.DROP_RESERVATION); appAttemptId, container, SchedulerEventType.KILL_RESERVED_CONTAINER);
rmDispatcher.getEventHandler().handle(event1); rmDispatcher.getEventHandler().handle(event1);
ContainerPreemptEvent event2 = new ContainerPreemptEvent( ContainerPreemptEvent event2 =
appAttemptId, container, SchedulerEventType.KILL_CONTAINER); new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.KILL_PREEMPTED_CONTAINER);
rmDispatcher.getEventHandler().handle(event2); rmDispatcher.getEventHandler().handle(event2);
ContainerPreemptEvent event3 = new ContainerPreemptEvent( ContainerPreemptEvent event3 =
appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER); new ContainerPreemptEvent(appAttemptId, container,
SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION);
rmDispatcher.getEventHandler().handle(event3); rmDispatcher.getEventHandler().handle(event3);
// Wait for events to be processed by scheduler dispatcher. // Wait for events to be processed by scheduler dispatcher.
Thread.sleep(1000); Thread.sleep(1000);
verify(sched, times(3)).handle(any(SchedulerEvent.class)); verify(sched, times(3)).handle(any(SchedulerEvent.class));
verify(sched).dropContainerReservation(container); verify(sched).killReservedContainer(container);
verify(sched).preemptContainer(appAttemptId, container); verify(sched).markContainerForPreemption(appAttemptId, container);
verify(sched).killContainer(container); verify(sched).killPreemptedContainer(container);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Assert.fail(); Assert.fail();
} finally { } finally {

View File

@ -566,7 +566,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
ContainerId amContainer = ContainerId amContainer =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Preempt the first attempt; // Preempt the first attempt;
scheduler.killContainer(scheduler.getRMContainer(amContainer)); scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED); am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());
@ -582,7 +582,7 @@ public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
// Preempt the second attempt. // Preempt the second attempt.
ContainerId amContainer2 = ContainerId amContainer2 =
ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
scheduler.killContainer(scheduler.getRMContainer(amContainer2)); scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2));
am2.waitForState(RMAppAttemptState.FAILED); am2.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry());
@ -677,7 +677,7 @@ public void testPreemptedAMRestartOnRMRestart() throws Exception {
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
// Forcibly preempt the am container; // Forcibly preempt the am container;
scheduler.killContainer(scheduler.getRMContainer(amContainer)); scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer));
am1.waitForState(RMAppAttemptState.FAILED); am1.waitForState(RMAppAttemptState.FAILED);
Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry());

View File

@ -23,8 +23,8 @@
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.PREEMPT_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -67,7 +67,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@ -77,6 +76,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
@ -289,7 +289,7 @@ public void testExpireKill() {
List<ContainerPreemptEvent> events = evtCaptor.getAllValues(); List<ContainerPreemptEvent> events = evtCaptor.getAllValues();
for (ContainerPreemptEvent e : events.subList(20, 20)) { for (ContainerPreemptEvent e : events.subList(20, 20)) {
assertEquals(appC, e.getAppId()); assertEquals(appC, e.getAppId());
assertEquals(KILL_CONTAINER, e.getType()); assertEquals(KILL_PREEMPTED_CONTAINER, e.getType());
} }
} }
@ -935,7 +935,7 @@ static class IsPreemptionRequestFor
private final ApplicationAttemptId appAttId; private final ApplicationAttemptId appAttId;
private final SchedulerEventType type; private final SchedulerEventType type;
IsPreemptionRequestFor(ApplicationAttemptId appAttId) { IsPreemptionRequestFor(ApplicationAttemptId appAttId) {
this(appAttId, PREEMPT_CONTAINER); this(appAttId, MARK_CONTAINER_FOR_PREEMPTION);
} }
IsPreemptionRequestFor(ApplicationAttemptId appAttId, IsPreemptionRequestFor(ApplicationAttemptId appAttId,
SchedulerEventType type) { SchedulerEventType type) {

View File

@ -29,6 +29,7 @@
import java.util.Map; import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
@ -46,11 +47,15 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -493,6 +498,114 @@ public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
} }
} }
/**
* Test to verify that ResourceRequests recovery back to the right app-attempt
* after a container gets killed at ACQUIRED state: YARN-4502.
*
* @throws Exception
*/
@Test
public void testResourceRequestRecoveryToTheRightAppAttempt()
throws Exception {
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm = new MockRM(conf);
try {
rm.start();
RMApp rmApp =
rm.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1,
null, "Test", false, true);
MockNM node =
new MockNM("127.0.0.1:1234", 10240, rm.getResourceTrackerService());
node.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(rmApp, rm, node);
ApplicationAttemptId applicationAttemptOneID =
am1.getApplicationAttemptId();
ContainerId am1ContainerID =
ContainerId.newContainerId(applicationAttemptOneID, 1);
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
node.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != 1) {
node.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
// launch a 2nd container, for testing running-containers transfer.
node.nodeHeartbeat(applicationAttemptOneID, 2, ContainerState.RUNNING);
ContainerId runningContainerID =
ContainerId.newContainerId(applicationAttemptOneID, 2);
rm.waitForState(node, runningContainerID, RMContainerState.RUNNING);
// 3rd container is in Allocated state.
int ALLOCATED_CONTAINER_PRIORITY = 1047;
am1.allocate("127.0.0.1", 1024, 1, ALLOCATED_CONTAINER_PRIORITY,
new ArrayList<ContainerId>(), null);
node.nodeHeartbeat(true);
ContainerId allocatedContainerID =
ContainerId.newContainerId(applicationAttemptOneID, 3);
rm.waitForContainerAllocated(node, allocatedContainerID);
rm.waitForState(node, allocatedContainerID, RMContainerState.ALLOCATED);
RMContainer allocatedContainer =
rm.getResourceScheduler().getRMContainer(allocatedContainerID);
// Capture scheduler app-attempt before AM crash.
SchedulerApplicationAttempt firstSchedulerAppAttempt =
((AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm
.getResourceScheduler())
.getApplicationAttempt(applicationAttemptOneID);
// AM crashes, and a new app-attempt gets created
node.nodeHeartbeat(applicationAttemptOneID, 1, ContainerState.COMPLETE);
rm.waitForState(node, am1ContainerID, RMContainerState.COMPLETED);
RMAppAttempt rmAppAttempt2 = MockRM.waitForAttemptScheduled(rmApp, rm);
ApplicationAttemptId applicationAttemptTwoID =
rmAppAttempt2.getAppAttemptId();
Assert.assertEquals(2, applicationAttemptTwoID.getAttemptId());
// All outstanding allocated containers will be killed (irrespective of
// keep-alive of container across app-attempts)
Assert.assertEquals(RMContainerState.KILLED,
allocatedContainer.getState());
// The core part of this test
// The killed containers' ResourceRequests are recovered back to the
// original app-attempt, not the new one
for (ResourceRequest request : firstSchedulerAppAttempt
.getAppSchedulingInfo().getAllResourceRequests()) {
if (request.getPriority().getPriority() == 0) {
Assert.assertEquals(0, request.getNumContainers());
} else if (request.getPriority().getPriority() == ALLOCATED_CONTAINER_PRIORITY) {
Assert.assertEquals(1, request.getNumContainers());
}
}
// Also, only one running container should be transferred after AM
// launches
MockRM.launchAM(rmApp, rm, node);
List<Container> transferredContainers =
rm.getResourceScheduler().getTransferredContainers(
applicationAttemptTwoID);
Assert.assertEquals(1, transferredContainers.size());
Assert.assertEquals(runningContainerID, transferredContainers.get(0)
.getId());
} finally {
rm.stop();
}
}
private void verifyMaximumResourceCapability( private void verifyMaximumResourceCapability(
Resource expectedMaximumResource, YarnScheduler scheduler) { Resource expectedMaximumResource, YarnScheduler scheduler) {

View File

@ -205,7 +205,7 @@ public void testApplicationPriorityAllocation() throws Exception {
if (++counter > 2) { if (++counter > 2) {
break; break;
} }
cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
} }
// check node report, 12 GB used and 4 GB available // check node report, 12 GB used and 4 GB available
@ -513,7 +513,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority()
if (++counter > 2) { if (++counter > 2) {
break; break;
} }
cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove(); iterator.remove();
} }
@ -543,7 +543,7 @@ public void testApplicationPriorityAllocationWithChangeInPriority()
if (++counter > 1) { if (++counter > 1) {
break; break;
} }
cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove(); iterator.remove();
} }

View File

@ -1170,7 +1170,7 @@ public void testPreemptionInfo() throws Exception {
// kill the 3 containers // kill the 3 containers
for (Container c : allocatedContainers) { for (Container c : allocatedContainers) {
cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
} }
// check values // check values
@ -1179,7 +1179,7 @@ public void testPreemptionInfo() throws Exception {
Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);
// kill app0-attempt0 AM container // kill app0-attempt0 AM container
cs.killContainer(schedulerAppAttempt.getRMContainer(app0 cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0
.getCurrentAppAttempt().getMasterContainer().getId())); .getCurrentAppAttempt().getMasterContainer().getId()));
// wait for app0 failed // wait for app0 failed
@ -1202,7 +1202,7 @@ public void testPreemptionInfo() throws Exception {
allocatedContainers = allocatedContainers =
am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);
for (Container c : allocatedContainers) { for (Container c : allocatedContainers) {
cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId()));
} }
// check values // check values
@ -1251,7 +1251,7 @@ public void testRecoverRequestAfterPreemption() throws Exception {
} }
// Call killContainer to preempt the container // Call killContainer to preempt the container
cs.killContainer(rmContainer); cs.killPreemptedContainer(rmContainer);
Assert.assertEquals(3, requests.size()); Assert.assertEquals(3, requests.size());
for (ResourceRequest request : requests) { for (ResourceRequest request : requests) {

View File

@ -18,13 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
@ -52,6 +50,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
@ -95,10 +94,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@ -4779,7 +4779,7 @@ public void testRecoverRequestAfterPreemption() throws Exception {
assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
.size()); .size());
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId);
// ResourceRequest will be empty once NodeUpdate is completed // ResourceRequest will be empty once NodeUpdate is completed
Assert.assertNull(app.getResourceRequest(priority, host)); Assert.assertNull(app.getResourceRequest(priority, host));
@ -4797,7 +4797,8 @@ public void testRecoverRequestAfterPreemption() throws Exception {
scheduler.warnOrKillContainer(rmContainer); scheduler.warnOrKillContainer(rmContainer);
// Trigger container rescheduled event // Trigger container rescheduled event
scheduler.handle(new ContainerRescheduledEvent(rmContainer)); scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,
SchedulerEventType.KILL_PREEMPTED_CONTAINER));
List<ResourceRequest> requests = rmContainer.getResourceRequests(); List<ResourceRequest> requests = rmContainer.getResourceRequests();
// Once recovered, resource request will be present again in app // Once recovered, resource request will be present again in app
@ -4820,7 +4821,6 @@ Collections.<ResourceRequest> emptyList(),
Assert.assertTrue(containers.size() == 1); Assert.assertTrue(containers.size() == 1);
} }
@SuppressWarnings("resource")
@Test @Test
public void testBlacklistNodes() throws Exception { public void testBlacklistNodes() throws Exception {
scheduler.init(conf); scheduler.init(conf);