YARN-325. RM CapacityScheduler can deadlock when getQueueInfo() is called and a container is completing (Arun C Murthy via tgraves)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1431070 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ba5b19fb5d
commit
106e2e27ff
@ -290,6 +290,9 @@ Release 0.23.6 - UNRELEASED
|
|||||||
YARN-320. RM should always be able to renew its own tokens.
|
YARN-320. RM should always be able to renew its own tokens.
|
||||||
(Daryn Sharp via sseth)
|
(Daryn Sharp via sseth)
|
||||||
|
|
||||||
|
YARN-325. RM CapacityScheduler can deadlock when getQueueInfo() is
|
||||||
|
called and a container is completing (Arun C Murthy via tgraves)
|
||||||
|
|
||||||
Release 0.23.5 - UNRELEASED
|
Release 0.23.5 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -20,19 +20,33 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class CSAssignment {
|
public class CSAssignment {
|
||||||
final private Resource resource;
|
final private Resource resource;
|
||||||
private NodeType type;
|
private NodeType type;
|
||||||
|
private final RMContainer excessReservation;
|
||||||
|
private final FiCaSchedulerApp application;
|
||||||
|
|
||||||
public CSAssignment(Resource resource, NodeType type) {
|
public CSAssignment(Resource resource, NodeType type) {
|
||||||
this.resource = resource;
|
this.resource = resource;
|
||||||
this.type = type;
|
this.type = type;
|
||||||
|
this.application = null;
|
||||||
|
this.excessReservation = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) {
|
||||||
|
this.resource = excessReservation.getContainer().getResource();
|
||||||
|
this.type = NodeType.NODE_LOCAL;
|
||||||
|
this.application = application;
|
||||||
|
this.excessReservation = excessReservation;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public Resource getResource() {
|
public Resource getResource() {
|
||||||
return resource;
|
return resource;
|
||||||
}
|
}
|
||||||
@ -45,6 +59,14 @@ public void setType(NodeType type) {
|
|||||||
this.type = type;
|
this.type = type;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public FiCaSchedulerApp getApplication() {
|
||||||
|
return application;
|
||||||
|
}
|
||||||
|
|
||||||
|
public RMContainer getExcessReservation() {
|
||||||
|
return excessReservation;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return resource.getMemory() + ":" + type;
|
return resource.getMemory() + ":" + type;
|
||||||
|
@ -604,7 +604,20 @@ private synchronized void nodeUpdate(RMNode nm,
|
|||||||
reservedApplication.getApplicationId() + " on node: " + nm);
|
reservedApplication.getApplicationId() + " on node: " + nm);
|
||||||
|
|
||||||
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
|
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
|
||||||
queue.assignContainers(clusterResource, node);
|
CSAssignment assignment = queue.assignContainers(clusterResource, node);
|
||||||
|
|
||||||
|
RMContainer excessReservation = assignment.getExcessReservation();
|
||||||
|
if (excessReservation != null) {
|
||||||
|
Container container = excessReservation.getContainer();
|
||||||
|
queue.completedContainer(
|
||||||
|
clusterResource, assignment.getApplication(), node,
|
||||||
|
excessReservation,
|
||||||
|
SchedulerUtils.createAbnormalContainerStatus(
|
||||||
|
container.getId(),
|
||||||
|
SchedulerUtils.UNRESERVED_CONTAINER),
|
||||||
|
RMContainerEventType.RELEASED);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to schedule more if there are no reservations to fulfill
|
// Try to schedule more if there are no reservations to fulfill
|
||||||
|
@ -62,7 +62,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
@ -781,11 +780,9 @@ private synchronized FiCaSchedulerApp getApplication(
|
|||||||
if (reservedContainer != null) {
|
if (reservedContainer != null) {
|
||||||
FiCaSchedulerApp application =
|
FiCaSchedulerApp application =
|
||||||
getApplication(reservedContainer.getApplicationAttemptId());
|
getApplication(reservedContainer.getApplicationAttemptId());
|
||||||
return new CSAssignment(
|
return
|
||||||
assignReservedContainer(application, node, reservedContainer,
|
assignReservedContainer(application, node, reservedContainer,
|
||||||
clusterResource),
|
clusterResource);
|
||||||
NodeType.NODE_LOCAL); // Don't care about locality constraints
|
|
||||||
// for reserved containers
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to assign containers to applications in order
|
// Try to assign containers to applications in order
|
||||||
@ -873,20 +870,14 @@ private synchronized FiCaSchedulerApp getApplication(
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized Resource assignReservedContainer(FiCaSchedulerApp application,
|
private synchronized CSAssignment
|
||||||
|
assignReservedContainer(FiCaSchedulerApp application,
|
||||||
FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
|
FiCaSchedulerNode node, RMContainer rmContainer, Resource clusterResource) {
|
||||||
// Do we still need this reservation?
|
// Do we still need this reservation?
|
||||||
Priority priority = rmContainer.getReservedPriority();
|
Priority priority = rmContainer.getReservedPriority();
|
||||||
if (application.getTotalRequiredResources(priority) == 0) {
|
if (application.getTotalRequiredResources(priority) == 0) {
|
||||||
// Release
|
// Release
|
||||||
Container container = rmContainer.getContainer();
|
return new CSAssignment(application, rmContainer);
|
||||||
completedContainer(clusterResource, application, node,
|
|
||||||
rmContainer,
|
|
||||||
SchedulerUtils.createAbnormalContainerStatus(
|
|
||||||
container.getId(),
|
|
||||||
SchedulerUtils.UNRESERVED_CONTAINER),
|
|
||||||
RMContainerEventType.RELEASED);
|
|
||||||
return container.getResource(); // Ugh, return resource to force re-sort
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to assign if we have sufficient resources
|
// Try to assign if we have sufficient resources
|
||||||
@ -895,7 +886,7 @@ private synchronized Resource assignReservedContainer(FiCaSchedulerApp applicati
|
|||||||
|
|
||||||
// Doesn't matter... since it's already charged for at time of reservation
|
// Doesn't matter... since it's already charged for at time of reservation
|
||||||
// "re-reservation" is *free*
|
// "re-reservation" is *free*
|
||||||
return Resources.none();
|
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized boolean assignToQueue(Resource clusterResource,
|
private synchronized boolean assignToQueue(Resource clusterResource,
|
||||||
|
@ -1181,12 +1181,14 @@ public void testReservationExchange() throws Exception {
|
|||||||
// Now finish another container from app_0 and see the reservation cancelled
|
// Now finish another container from app_0 and see the reservation cancelled
|
||||||
a.completedContainer(clusterResource, app_0, node_0,
|
a.completedContainer(clusterResource, app_0, node_0,
|
||||||
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
|
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
|
||||||
a.assignContainers(clusterResource, node_0);
|
CSAssignment assignment = a.assignContainers(clusterResource, node_0);
|
||||||
assertEquals(4*GB, a.getUsedResources().getMemory());
|
assertEquals(8*GB, a.getUsedResources().getMemory());
|
||||||
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
assertEquals(0*GB, app_1.getCurrentReservation().getMemory());
|
assertEquals(4*GB, app_1.getCurrentReservation().getMemory());
|
||||||
assertEquals(0*GB, node_0.getUsedResource().getMemory());
|
assertEquals(0*GB, node_0.getUsedResource().getMemory());
|
||||||
|
assertEquals(4*GB,
|
||||||
|
assignment.getExcessReservation().getContainer().getResource().getMemory());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user