YARN-3293. Track and display capacity scheduler health metrics in web

UI. Contributed by Varun Vasudev
This commit is contained in:
Xuan 2015-04-09 23:38:04 -07:00
parent 987c9e12e1
commit afa5d4715a
16 changed files with 1178 additions and 74 deletions

View File

@ -104,6 +104,9 @@ Release 2.8.0 - UNRELEASED
YARN-3294. Allow dumping of Capacity Scheduler debug logs via
web UI for a fixed time period. (Varun Vasudev via xgong)
YARN-3293. Track and display capacity scheduler health metrics
in web UI. (Varun Vasudev via xgong)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -552,4 +552,12 @@ public int getActiveApps() {
public MetricsSystem getMetricsSystem() {
return metricsSystem;
}
public long getAggregateAllocatedContainers() {
return aggregateContainersAllocated.value();
}
public long getAggegatedReleasedContainers() {
return aggregateContainersReleased.value();
}
}

View File

@ -0,0 +1,236 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SchedulerHealth {
static public class DetailedInformation {
long timestamp;
NodeId nodeId;
ContainerId containerId;
String queue;
public DetailedInformation(long timestamp, NodeId nodeId,
ContainerId containerId, String queue) {
this.timestamp = timestamp;
this.nodeId = nodeId;
this.containerId = containerId;
this.queue = queue;
}
public long getTimestamp() {
return timestamp;
}
public NodeId getNodeId() {
return nodeId;
}
public ContainerId getContainerId() {
return containerId;
}
public String getQueue() {
return queue;
}
}
enum Operation {
ALLOCATION, RELEASE, PREEMPTION, RESERVATION, FULFILLED_RESERVATION
}
long lastSchedulerRunTime;
Map<Operation, Resource> lastSchedulerRunDetails;
Map<Operation, DetailedInformation> schedulerHealthDetails;
Map<Operation, Long> schedulerOperationCounts;
// this is for counts since the RM started, never reset
Map<Operation, Long> schedulerOperationAggregateCounts;
public SchedulerHealth() {
lastSchedulerRunDetails = new ConcurrentHashMap<>();
schedulerHealthDetails = new ConcurrentHashMap<>();
schedulerOperationCounts = new ConcurrentHashMap<>();
schedulerOperationAggregateCounts = new ConcurrentHashMap<>();
for (Operation op : Operation.values()) {
lastSchedulerRunDetails.put(op, Resource.newInstance(0, 0));
schedulerOperationCounts.put(op, 0L);
schedulerHealthDetails.put(op, new DetailedInformation(0, null, null,
null));
schedulerOperationAggregateCounts.put(op, 0L);
}
}
public void updateAllocation(long timestamp, NodeId nodeId,
ContainerId containerId, String queue) {
DetailedInformation di =
new DetailedInformation(timestamp, nodeId, containerId, queue);
schedulerHealthDetails.put(Operation.ALLOCATION, di);
}
public void updateRelease(long timestamp, NodeId nodeId,
ContainerId containerId, String queue) {
DetailedInformation di =
new DetailedInformation(timestamp, nodeId, containerId, queue);
schedulerHealthDetails.put(Operation.RELEASE, di);
}
public void updatePreemption(long timestamp, NodeId nodeId,
ContainerId containerId, String queue) {
DetailedInformation di =
new DetailedInformation(timestamp, nodeId, containerId, queue);
schedulerHealthDetails.put(Operation.PREEMPTION, di);
}
public void updateReservation(long timestamp, NodeId nodeId,
ContainerId containerId, String queue) {
DetailedInformation di =
new DetailedInformation(timestamp, nodeId, containerId, queue);
schedulerHealthDetails.put(Operation.RESERVATION, di);
}
public void updateSchedulerRunDetails(long timestamp, Resource allocated,
Resource reserved) {
lastSchedulerRunTime = timestamp;
lastSchedulerRunDetails.put(Operation.ALLOCATION, allocated);
lastSchedulerRunDetails.put(Operation.RESERVATION, reserved);
}
public void updateSchedulerReleaseDetails(long timestamp, Resource released) {
lastSchedulerRunTime = timestamp;
lastSchedulerRunDetails.put(Operation.RELEASE, released);
}
public void updateSchedulerReleaseCounts(long count) {
updateCounts(Operation.RELEASE, count);
}
public void updateSchedulerAllocationCounts(long count) {
updateCounts(Operation.ALLOCATION, count);
}
public void updateSchedulerReservationCounts(long count) {
updateCounts(Operation.RESERVATION, count);
}
public void updateSchedulerFulfilledReservationCounts(long count) {
updateCounts(Operation.FULFILLED_RESERVATION, count);
}
public void updateSchedulerPreemptionCounts(long count) {
updateCounts(Operation.PREEMPTION, count);
}
private void updateCounts(Operation op, long count) {
schedulerOperationCounts.put(op, count);
Long tmp = schedulerOperationAggregateCounts.get(op);
schedulerOperationAggregateCounts.put(op, tmp + count);
}
public long getLastSchedulerRunTime() {
return lastSchedulerRunTime;
}
private Resource getResourceDetails(Operation op) {
return lastSchedulerRunDetails.get(op);
}
public Resource getResourcesAllocated() {
return getResourceDetails(Operation.ALLOCATION);
}
public Resource getResourcesReserved() {
return getResourceDetails(Operation.RESERVATION);
}
public Resource getResourcesReleased() {
return getResourceDetails(Operation.RELEASE);
}
private DetailedInformation getDetailedInformation(Operation op) {
return schedulerHealthDetails.get(op);
}
public DetailedInformation getLastAllocationDetails() {
return getDetailedInformation(Operation.ALLOCATION);
}
public DetailedInformation getLastReleaseDetails() {
return getDetailedInformation(Operation.RELEASE);
}
public DetailedInformation getLastReservationDetails() {
return getDetailedInformation(Operation.RESERVATION);
}
public DetailedInformation getLastPreemptionDetails() {
return getDetailedInformation(Operation.PREEMPTION);
}
private Long getOperationCount(Operation op) {
return schedulerOperationCounts.get(op);
}
public Long getAllocationCount() {
return getOperationCount(Operation.ALLOCATION);
}
public Long getReleaseCount() {
return getOperationCount(Operation.RELEASE);
}
public Long getReservationCount() {
return getOperationCount(Operation.RESERVATION);
}
public Long getPreemptionCount() {
return getOperationCount(Operation.PREEMPTION);
}
private Long getAggregateOperationCount(Operation op) {
return schedulerOperationAggregateCounts.get(op);
}
public Long getAggregateAllocationCount() {
return getAggregateOperationCount(Operation.ALLOCATION);
}
public Long getAggregateReleaseCount() {
return getAggregateOperationCount(Operation.RELEASE);
}
public Long getAggregateReservationCount() {
return getAggregateOperationCount(Operation.RESERVATION);
}
public Long getAggregatePreemptionCount() {
return getAggregateOperationCount(Operation.PREEMPTION);
}
public Long getAggregateFulFilledReservationsCount() {
return getAggregateOperationCount(Operation.FULFILLED_RESERVATION);
}
}

View File

@ -22,40 +22,46 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
public class CSAssignment {
final private Resource resource;
private NodeType type;
private final RMContainer excessReservation;
private final FiCaSchedulerApp application;
private final boolean skipped;
private boolean fulfilledReservation;
private final AssignmentInformation assignmentInformation;
public CSAssignment(Resource resource, NodeType type) {
this(resource, type, null, null, false, false);
}
public CSAssignment(FiCaSchedulerApp application,
RMContainer excessReservation) {
this(excessReservation.getContainer().getResource(), NodeType.NODE_LOCAL,
excessReservation, application, false, false);
}
public CSAssignment(boolean skipped) {
this(Resource.newInstance(0, 0), NodeType.NODE_LOCAL, null, null, skipped,
false);
}
public CSAssignment(Resource resource, NodeType type,
RMContainer excessReservation, FiCaSchedulerApp application,
boolean skipped, boolean fulfilledReservation) {
this.resource = resource;
this.type = type;
this.application = null;
this.excessReservation = null;
this.skipped = false;
}
public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) {
this.resource = excessReservation.getContainer().getResource();
this.type = NodeType.NODE_LOCAL;
this.application = application;
this.excessReservation = excessReservation;
this.skipped = false;
}
public CSAssignment(boolean skipped) {
this.resource = Resources.createResource(0, 0);
this.type = NodeType.NODE_LOCAL;
this.application = null;
this.excessReservation = null;
this.application = application;
this.skipped = skipped;
this.fulfilledReservation = fulfilledReservation;
this.assignmentInformation = new AssignmentInformation();
}
public Resource getResource() {
@ -84,6 +90,35 @@ public boolean getSkipped() {
@Override
public String toString() {
return resource.getMemory() + ":" + type;
String ret = "resource:" + resource.toString();
ret += "; type:" + type;
ret += "; excessReservation:" + excessReservation;
ret +=
"; applicationid:"
+ (application != null ? application.getApplicationId().toString()
: "null");
ret += "; skipped:" + skipped;
ret += "; fulfilled reservation:" + fulfilledReservation;
ret +=
"; allocations(count/resource):"
+ assignmentInformation.getNumAllocations() + "/"
+ assignmentInformation.getAllocated().toString();
ret +=
"; reservations(count/resource):"
+ assignmentInformation.getNumReservations() + "/"
+ assignmentInformation.getReserved().toString();
return ret;
}
public void setFulfilledReservation(boolean fulfilledReservation) {
this.fulfilledReservation = fulfilledReservation;
}
public boolean isFulfilledReservation() {
return this.fulfilledReservation;
}
public AssignmentInformation getAssignmentInformation() {
return this.assignmentInformation;
}
}

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -84,6 +85,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -91,9 +93,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -213,7 +217,8 @@ public Configuration getConf() {
private boolean scheduleAsynchronously;
private AsyncScheduleThread asyncSchedulerThread;
private RMNodeLabelsManager labelManager;
private SchedulerHealth schedulerHealth = new SchedulerHealth();
long lastNodeUpdateTime;
/**
* EXPERT
*/
@ -955,6 +960,8 @@ private synchronized void nodeUpdate(RMNode nm) {
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
}
Resource releaseResources = Resource.newInstance(0, 0);
FiCaSchedulerNode node = getNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
@ -971,13 +978,30 @@ private synchronized void nodeUpdate(RMNode nm) {
}
// Process completed containers
int releasedContainers = 0;
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
RMContainer container = getRMContainer(containerId);
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
completedContainer(container, completedContainer,
RMContainerEventType.FINISHED);
if (container != null) {
releasedContainers++;
Resource rs = container.getAllocatedResource();
if (rs != null) {
Resources.addTo(releaseResources, rs);
}
rs = container.getReservedResource();
if (rs != null) {
Resources.addTo(releaseResources, rs);
}
}
}
schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime,
releaseResources);
schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
// Now node data structures are upto date and ready for scheduling.
if(LOG.isDebugEnabled()) {
LOG.debug("Node being looked for scheduling " + nm
@ -1040,11 +1064,47 @@ private synchronized void updateLabelsOnNode(NodeId nodeId,
node.updateLabels(newLabels);
}
private void updateSchedulerHealth(long now, FiCaSchedulerNode node,
CSAssignment assignment) {
NodeId nodeId = node.getNodeID();
List<AssignmentInformation.AssignmentDetails> allocations =
assignment.getAssignmentInformation().getAllocationDetails();
List<AssignmentInformation.AssignmentDetails> reservations =
assignment.getAssignmentInformation().getReservationDetails();
if (!allocations.isEmpty()) {
ContainerId allocatedContainerId =
allocations.get(allocations.size() - 1).containerId;
String allocatedQueue = allocations.get(allocations.size() - 1).queue;
schedulerHealth.updateAllocation(now, nodeId, allocatedContainerId,
allocatedQueue);
}
if (!reservations.isEmpty()) {
ContainerId reservedContainerId =
reservations.get(reservations.size() - 1).containerId;
String reservedQueue = reservations.get(reservations.size() - 1).queue;
schedulerHealth.updateReservation(now, nodeId, reservedContainerId,
reservedQueue);
}
schedulerHealth.updateSchedulerReservationCounts(assignment
.getAssignmentInformation().getNumReservations());
schedulerHealth.updateSchedulerAllocationCounts(assignment
.getAssignmentInformation().getNumAllocations());
schedulerHealth.updateSchedulerRunDetails(now, assignment
.getAssignmentInformation().getAllocated(), assignment
.getAssignmentInformation().getReserved());
}
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
}
// reset allocation and reservation stats before we start doing any work
updateSchedulerHealth(lastNodeUpdateTime, node,
new CSAssignment(Resources.none(), NodeType.NODE_LOCAL));
CSAssignment assignment;
// Assign new containers...
// 1. Check for reserved applications
@ -1061,15 +1121,26 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
node.getNodeID());
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
CSAssignment assignment =
queue.assignContainers(
assignment = queue.assignContainers(
clusterResource,
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)));
if (assignment.isFulfilledReservation()) {
CSAssignment tmp =
new CSAssignment(reservedContainer.getReservedResource(),
assignment.getType());
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
reservedContainer.getReservedResource());
tmp.getAssignmentInformation().addAllocationDetails(
reservedContainer.getContainerId(), queue.getQueuePath());
tmp.getAssignmentInformation().incrAllocations();
updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
}
RMContainer excessReservation = assignment.getExcessReservation();
if (excessReservation != null) {
Container container = excessReservation.getContainer();
@ -1092,13 +1163,14 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
root.assignContainers(
assignment = root.assignContainers(
clusterResource,
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)));
updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
}
} else {
LOG.info("Skipping scheduling since node " + node.getNodeID() +
@ -1151,6 +1223,7 @@ public void handle(SchedulerEvent event) {
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
RMNode node = nodeUpdatedEvent.getRMNode();
setLastNodeUpdateTime(Time.now());
nodeUpdate(node);
if (!scheduleAsynchronously) {
allocateContainersToNode(getNode(node.getNodeID()));
@ -1319,6 +1392,14 @@ protected synchronized void completedContainer(RMContainer rmContainer,
LOG.info("Application attempt " + application.getApplicationAttemptId()
+ " released container " + container.getId() + " on node: " + node
+ " with event: " + event);
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
schedulerHealth.updatePreemption(Time.now(), container.getNodeId(),
container.getId(), queue.getQueuePath());
schedulerHealth.updateSchedulerPreemptionCounts(1);
} else {
schedulerHealth.updateRelease(lastNodeUpdateTime, container.getNodeId(),
container.getId(), queue.getQueuePath());
}
}
@Lock(Lock.NoLock.class)
@ -1648,4 +1729,12 @@ public Set<String> getPlanQueues() {
}
return ret;
}
public SchedulerHealth getSchedulerHealth() {
return this.schedulerHealth;
}
private synchronized void setLastNodeUpdateTime(long time) {
this.lastNodeUpdateTime = time;
}
}

View File

@ -913,12 +913,17 @@ private synchronized CSAssignment assignReservedContainer(
}
// Try to assign if we have sufficient resources
assignContainersOnNode(clusterResource, node, application, priority,
rmContainer);
CSAssignment tmp =
assignContainersOnNode(clusterResource, node, application, priority,
rmContainer);
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
if (tmp.getAssignmentInformation().getNumAllocations() > 0) {
ret.setFulfilledReservation(true);
}
return ret;
}
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
@ -1172,7 +1177,8 @@ resourceCalculator, required, getMaximumAllocation()
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
Resource assigned = Resources.none();
CSAssignment assigned;
NodeType requestType = null;
MutableObject allocatedContainer = new MutableObject();
@ -1186,14 +1192,15 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
node, application, priority, reservedContainer,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
assigned.getResource(), Resources.none())) {
//update locality statistics
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.NODE_LOCAL,
requestType);
}
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
assigned.setType(NodeType.NODE_LOCAL);
return assigned;
}
}
@ -1214,14 +1221,15 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
node, application, priority, reservedContainer,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
assigned.getResource(), Resources.none())) {
//update locality statistics
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.RACK_LOCAL,
requestType);
}
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
assigned.setType(NodeType.RACK_LOCAL);
return assigned;
}
}
@ -1246,7 +1254,8 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
if (allocatedContainer.getValue() != null) {
application.incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
}
return new CSAssignment(assigned, NodeType.OFF_SWITCH);
assigned.setType(NodeType.OFF_SWITCH);
return assigned;
}
return SKIP_ASSIGNMENT;
@ -1255,10 +1264,9 @@ private CSAssignment assignContainersOnNode(Resource clusterResource,
private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
// First we need to get minimum resource we need unreserve
// minimum-resource-need-unreserve = used + asked - limit
Resource minimumUnreservedResource =
Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
currentResourceLimits.getLimit());
return minimumUnreservedResource;
return Resources.subtract(
Resources.add(queueUsage.getUsed(), askedResource),
currentResourceLimits.getLimit());
}
@Private
@ -1334,7 +1342,7 @@ protected boolean checkLimitsToReserve(Resource clusterResource,
}
private Resource assignNodeLocalContainers(Resource clusterResource,
private CSAssignment assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer) {
@ -1344,11 +1352,11 @@ private Resource assignNodeLocalContainers(Resource clusterResource,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
allocatedContainer);
}
return Resources.none();
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
}
private Resource assignRackLocalContainers(Resource clusterResource,
private CSAssignment assignRackLocalContainers(Resource clusterResource,
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer) {
@ -1358,11 +1366,11 @@ private Resource assignRackLocalContainers(Resource clusterResource,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
allocatedContainer);
}
return Resources.none();
return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
}
private Resource assignOffSwitchContainers(Resource clusterResource,
private CSAssignment assignOffSwitchContainers(Resource clusterResource,
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer) {
@ -1373,7 +1381,7 @@ private Resource assignOffSwitchContainers(Resource clusterResource,
allocatedContainer);
}
return Resources.none();
return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
}
boolean canAssign(FiCaSchedulerApp application, Priority priority,
@ -1443,15 +1451,13 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node,
.getApplicationAttemptId(), application.getNewContainerId());
// Create the container
Container container =
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
.getHttpAddress(), capability, priority, null);
return container;
return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
.getHttpAddress(), capability, priority, null);
}
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer,
MutableObject createdContainer) {
@ -1472,7 +1478,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
if (rmContainer != null) {
unreserve(application, priority, node, rmContainer);
}
return Resources.none();
return new CSAssignment(Resources.none(), type);
}
Resource capability = request.getCapability();
@ -1484,7 +1490,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
return Resources.none();
return new CSAssignment(Resources.none(), type);
}
assert Resources.greaterThan(
@ -1497,7 +1503,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
// something went wrong getting/creating the container
if (container == null) {
LOG.warn("Couldn't get container for allocation!");
return Resources.none();
return new CSAssignment(Resources.none(), type);
}
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
@ -1529,7 +1535,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
// container (That means we *have to* unreserve some resource to
// continue)). If we failed to unreserve some resource,
if (!containerUnreserved) {
return Resources.none();
return new CSAssignment(Resources.none(), type);
}
}
}
@ -1540,7 +1546,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
// Does the application need this resource?
if (allocatedContainer == null) {
return Resources.none();
return new CSAssignment(Resources.none(), type);
}
// Inform the node
@ -1552,7 +1558,13 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
" queue=" + this +
" clusterResource=" + clusterResource);
createdContainer.setValue(allocatedContainer);
return container.getResource();
CSAssignment assignment = new CSAssignment(container.getResource(), type);
assignment.getAssignmentInformation().addAllocationDetails(
container.getId(), getQueuePath());
assignment.getAssignmentInformation().incrAllocations();
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
container.getResource());
return assignment;
} else {
// if we are allowed to allocate but this node doesn't have space, reserve it or
// if this was an already a reserved container, reserve it again
@ -1566,7 +1578,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
// reserve the new container
if (!checkLimitsToReserve(clusterResource,
application, capability)) {
return Resources.none();
return new CSAssignment(Resources.none(), type);
}
}
@ -1581,10 +1593,16 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
" used=" + queueUsage.getUsed() +
" cluster=" + clusterResource);
return request.getCapability();
CSAssignment assignment =
new CSAssignment(request.getCapability(), type);
assignment.getAssignmentInformation().addReservationDetails(
container.getId(), getQueuePath());
assignment.getAssignmentInformation().incrReservations();
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
request.getCapability());
return assignment;
}
return Resources.none();
return new CSAssignment(Resources.none(), type);
}
}

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -415,7 +416,27 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
nodeLabels);
// Track resource utilization in this pass of the scheduler
Resources.addTo(assignment.getResource(), assignedToChild.getResource());
Resources
.addTo(assignment.getResource(), assignedToChild.getResource());
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
assignedToChild.getAssignmentInformation().getAllocated());
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
assignedToChild.getAssignmentInformation().getReserved());
assignment.getAssignmentInformation().incrAllocations(
assignedToChild.getAssignmentInformation().getNumAllocations());
assignment.getAssignmentInformation().incrReservations(
assignedToChild.getAssignmentInformation().getNumReservations());
assignment
.getAssignmentInformation()
.getAllocationDetails()
.addAll(
assignedToChild.getAssignmentInformation().getAllocationDetails());
assignment
.getAssignmentInformation()
.getReservationDetails()
.addAll(
assignedToChild.getAssignmentInformation()
.getReservationDetails());
LOG.info("assignedContainer" +
" queue=" + getQueueName() +

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class AssignmentInformation {
public enum Operation {
ALLOCATION, RESERVATION
}
public static class AssignmentDetails {
public ContainerId containerId;
public String queue;
public AssignmentDetails(ContainerId containerId, String queue) {
this.containerId = containerId;
this.queue = queue;
}
}
private final Map<Operation, Integer> operationCounts;
private final Map<Operation, Resource> operationResources;
private final Map<Operation, List<AssignmentDetails>> operationDetails;
public AssignmentInformation() {
this.operationCounts = new HashMap<>();
this.operationResources = new HashMap<>();
this.operationDetails = new HashMap<>();
for (Operation op : Operation.values()) {
operationCounts.put(op, 0);
operationResources.put(op, Resource.newInstance(0, 0));
operationDetails.put(op, new ArrayList<AssignmentDetails>());
}
}
public int getNumAllocations() {
return operationCounts.get(Operation.ALLOCATION);
}
public void incrAllocations() {
increment(Operation.ALLOCATION, 1);
}
public void incrAllocations(int by) {
increment(Operation.ALLOCATION, by);
}
public int getNumReservations() {
return operationCounts.get(Operation.RESERVATION);
}
public void incrReservations() {
increment(Operation.RESERVATION, 1);
}
public void incrReservations(int by) {
increment(Operation.RESERVATION, by);
}
private void increment(Operation op, int by) {
operationCounts.put(op, operationCounts.get(op) + by);
}
public Resource getAllocated() {
return operationResources.get(Operation.ALLOCATION);
}
public Resource getReserved() {
return operationResources.get(Operation.RESERVATION);
}
private void addAssignmentDetails(Operation op, ContainerId containerId,
String queue) {
operationDetails.get(op).add(new AssignmentDetails(containerId, queue));
}
public void addAllocationDetails(ContainerId containerId, String queue) {
addAssignmentDetails(Operation.ALLOCATION, containerId, queue);
}
public void addReservationDetails(ContainerId containerId, String queue) {
addAssignmentDetails(Operation.RESERVATION, containerId, queue);
}
public List<AssignmentDetails> getAllocationDetails() {
return operationDetails.get(Operation.ALLOCATION);
}
public List<AssignmentDetails> getReservationDetails() {
return operationDetails.get(Operation.RESERVATION);
}
}

View File

@ -21,17 +21,19 @@
import static org.apache.hadoop.yarn.util.StringHelper.join;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UserInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.webapp.AppsBlock;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.ResponseInfo;
import org.apache.hadoop.yarn.webapp.SubView;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
@ -244,7 +246,7 @@ public void render(Block html) {
span(".q", "default")._()._();
} else {
CSQueue root = cs.getRootQueue();
CapacitySchedulerInfo sinfo = new CapacitySchedulerInfo(root);
CapacitySchedulerInfo sinfo = new CapacitySchedulerInfo(root, cs);
csqinfo.csinfo = sinfo;
csqinfo.qinfo = null;
@ -274,6 +276,95 @@ public void render(Block html) {
script().$type("text/javascript").
_("$('#cs').hide();")._()._().
_(RMAppsBlock.class);
html._(HealthBlock.class);
}
}
public static class HealthBlock extends HtmlBlock {
final CapacityScheduler cs;
@Inject
HealthBlock(ResourceManager rm) {
cs = (CapacityScheduler) rm.getResourceScheduler();
}
@Override
public void render(HtmlBlock.Block html) {
SchedulerHealth healthInfo = cs.getSchedulerHealth();
DIV<Hamlet> div = html.div("#health");
div.h4("Aggregate scheduler counts");
TBODY<TABLE<DIV<Hamlet>>> tbody =
div.table("#lastrun").thead().$class("ui-widget-header").tr().th()
.$class("ui-state-default")._("Total Container Allocations(count)")
._().th().$class("ui-state-default")
._("Total Container Releases(count)")._().th()
.$class("ui-state-default")
._("Total Fulfilled Reservations(count)")._().th()
.$class("ui-state-default")._("Total Container Preemptions(count)")
._()._()._().tbody();
tbody
.$class("ui-widget-content")
.tr()
.td(
String.valueOf(cs.getRootQueueMetrics()
.getAggregateAllocatedContainers()))
.td(
String.valueOf(cs.getRootQueueMetrics()
.getAggegatedReleasedContainers()))
.td(healthInfo.getAggregateFulFilledReservationsCount().toString())
.td(healthInfo.getAggregatePreemptionCount().toString())._()._()._();
div.h4("Last scheduler run");
tbody =
div.table("#lastrun").thead().$class("ui-widget-header").tr().th()
.$class("ui-state-default")._("Time")._().th()
.$class("ui-state-default")._("Allocations(count - resources)")._()
.th().$class("ui-state-default")._("Reservations(count - resources)")
._().th().$class("ui-state-default")._("Releases(count - resources)")
._()._()._().tbody();
tbody
.$class("ui-widget-content")
.tr()
.td(Times.format(healthInfo.getLastSchedulerRunTime()))
.td(
healthInfo.getAllocationCount().toString() + " - "
+ healthInfo.getResourcesAllocated().toString())
.td(
healthInfo.getReservationCount().toString() + " - "
+ healthInfo.getResourcesReserved().toString())
.td(
healthInfo.getReleaseCount().toString() + " - "
+ healthInfo.getResourcesReleased().toString())._()._()._();
Map<String, SchedulerHealth.DetailedInformation> info = new HashMap<>();
info.put("Allocation", healthInfo.getLastAllocationDetails());
info.put("Reservation", healthInfo.getLastReservationDetails());
info.put("Release", healthInfo.getLastReleaseDetails());
info.put("Preemption", healthInfo.getLastPreemptionDetails());
for (Map.Entry<String, SchedulerHealth.DetailedInformation> entry : info
.entrySet()) {
String containerId = "N/A";
String nodeId = "N/A";
String queue = "N/A";
String table = "#" + entry.getKey();
div.h4("Last " + entry.getKey());
tbody =
div.table(table).thead().$class("ui-widget-header").tr().th()
.$class("ui-state-default")._("Time")._().th()
.$class("ui-state-default")._("Container Id")._().th()
.$class("ui-state-default")._("Node Id")._().th()
.$class("ui-state-default")._("Queue")._()._()._().tbody();
SchedulerHealth.DetailedInformation di = entry.getValue();
if (di.getTimestamp() != 0) {
containerId = di.getContainerId().toString();
nodeId = di.getNodeId().toString();
queue = di.getQueue();
}
tbody.$class("ui-widget-content").tr()
.td(Times.format(di.getTimestamp())).td(containerId).td(nodeId)
.td(queue)._()._()._();
}
div._();
}
}

View File

@ -53,7 +53,7 @@ public JAXBContextResolver() throws Exception {
NodesInfo.class, RemoteExceptionData.class,
CapacitySchedulerQueueInfoList.class, ResourceInfo.class,
UsersInfo.class, UserInfo.class, ApplicationStatisticsInfo.class,
StatisticsItemInfo.class };
StatisticsItemInfo.class, CapacitySchedulerHealthInfo.class };
// these dao classes need root unwrapping
final Class[] rootUnwrappedTypes =
{ NewApplication.class, ApplicationSubmissionContextInfo.class,

View File

@ -228,7 +228,7 @@ public SchedulerTypeInfo getSchedulerInfo() {
if (rs instanceof CapacityScheduler) {
CapacityScheduler cs = (CapacityScheduler) rs;
CSQueue root = cs.getRootQueue();
sinfo = new CapacitySchedulerInfo(root);
sinfo = new CapacitySchedulerInfo(root, cs);
} else if (rs instanceof FairScheduler) {
FairScheduler fs = (FairScheduler) rs;
sinfo = new FairSchedulerInfo(fs);

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@XmlAccessorType(XmlAccessType.FIELD)
public class CapacitySchedulerHealthInfo {
@XmlAccessorType(XmlAccessType.FIELD)
public static class OperationInformation {
String nodeId;
String containerId;
String queue;
OperationInformation() {
}
OperationInformation(SchedulerHealth.DetailedInformation di) {
this.nodeId = di.getNodeId() == null ? "N/A" : di.getNodeId().toString();
this.containerId =
di.getContainerId() == null ? "N/A" : di.getContainerId().toString();
this.queue = di.getQueue() == null ? "N/A" : di.getQueue();
}
public String getNodeId() {
return nodeId;
}
public String getContainerId() {
return containerId;
}
public String getQueue() {
return queue;
}
}
@XmlAccessorType(XmlAccessType.FIELD)
public static class LastRunDetails {
String operation;
long count;
ResourceInfo resources;
LastRunDetails() {
}
LastRunDetails(String operation, long count, Resource resource) {
this.operation = operation;
this.count = count;
this.resources = new ResourceInfo(resource);
}
public String getOperation() {
return operation;
}
public long getCount() {
return count;
}
public ResourceInfo getResources() {
return resources;
}
}
long lastrun;
Map<String, OperationInformation> operationsInfo;
List<LastRunDetails> lastRunDetails;
CapacitySchedulerHealthInfo() {
}
public long getLastrun() {
return lastrun;
}
CapacitySchedulerHealthInfo(CapacityScheduler cs) {
SchedulerHealth ht = cs.getSchedulerHealth();
lastrun = ht.getLastSchedulerRunTime();
operationsInfo = new HashMap<>();
operationsInfo.put("last-allocation",
new OperationInformation(ht.getLastAllocationDetails()));
operationsInfo.put("last-release",
new OperationInformation(ht.getLastReleaseDetails()));
operationsInfo.put("last-preemption",
new OperationInformation(ht.getLastPreemptionDetails()));
operationsInfo.put("last-reservation",
new OperationInformation(ht.getLastReservationDetails()));
lastRunDetails = new ArrayList<>();
lastRunDetails.add(new LastRunDetails("releases", ht.getReleaseCount(), ht
.getResourcesReleased()));
lastRunDetails.add(new LastRunDetails("allocations", ht
.getAllocationCount(), ht.getResourcesAllocated()));
lastRunDetails.add(new LastRunDetails("reservations", ht
.getReservationCount(), ht.getResourcesReserved()));
}
}

View File

@ -25,6 +25,7 @@
import javax.xml.bind.annotation.XmlType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@XmlRootElement(name = "capacityScheduler")
@ -37,6 +38,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
protected float maxCapacity;
protected String queueName;
protected CapacitySchedulerQueueInfoList queues;
protected CapacitySchedulerHealthInfo health;
@XmlTransient
static final float EPSILON = 1e-8f;
@ -44,7 +46,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
public CapacitySchedulerInfo() {
} // JAXB needs this
public CapacitySchedulerInfo(CSQueue parent) {
public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) {
this.queueName = parent.getQueueName();
this.usedCapacity = parent.getUsedCapacity() * 100;
this.capacity = parent.getCapacity() * 100;
@ -54,6 +56,7 @@ public CapacitySchedulerInfo(CSQueue parent) {
this.maxCapacity = max * 100;
queues = getQueues(parent);
health = new CapacitySchedulerHealthInfo(cs);
}
public float getCapacity() {

View File

@ -0,0 +1,351 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assume.assumeTrue;
public class TestSchedulerHealth {
private ResourceManager resourceManager;
public void setup() {
resourceManager = new ResourceManager() {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}
};
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
resourceManager.init(conf);
resourceManager.getRMContext().getContainerTokenSecretManager()
.rollMasterKey();
resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
}
@Test
public void testCounts() {
SchedulerHealth sh = new SchedulerHealth();
int value = 1;
for (int i = 0; i < 2; ++i) {
sh.updateSchedulerPreemptionCounts(value);
sh.updateSchedulerAllocationCounts(value);
sh.updateSchedulerReservationCounts(value);
sh.updateSchedulerReleaseCounts(value);
Assert.assertEquals(value, sh.getAllocationCount().longValue());
Assert.assertEquals(value, sh.getReleaseCount().longValue());
Assert.assertEquals(value, sh.getReservationCount().longValue());
Assert.assertEquals(value, sh.getPreemptionCount().longValue());
Assert.assertEquals(value * (i + 1), sh.getAggregateAllocationCount()
.longValue());
Assert.assertEquals(value * (i + 1), sh.getAggregateReleaseCount()
.longValue());
Assert.assertEquals(value * (i + 1), sh.getAggregateReservationCount()
.longValue());
Assert.assertEquals(value * (i + 1), sh.getAggregatePreemptionCount()
.longValue());
}
}
@Test
public void testOperationDetails() {
SchedulerHealth sh = new SchedulerHealth();
long now = Time.now();
sh.updateRelease(now, NodeId.newInstance("testhost", 1234),
ContainerId.fromString("container_1427562107907_0002_01_000001"),
"testqueue");
Assert.assertEquals("container_1427562107907_0002_01_000001", sh
.getLastReleaseDetails().getContainerId().toString());
Assert.assertEquals("testhost:1234", sh.getLastReleaseDetails().getNodeId()
.toString());
Assert.assertEquals("testqueue", sh.getLastReleaseDetails().getQueue());
Assert.assertEquals(now, sh.getLastReleaseDetails().getTimestamp());
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
now = Time.now();
sh.updateReservation(now, NodeId.newInstance("testhost1", 1234),
ContainerId.fromString("container_1427562107907_0003_01_000001"),
"testqueue1");
Assert.assertEquals("container_1427562107907_0003_01_000001", sh
.getLastReservationDetails().getContainerId().toString());
Assert.assertEquals("testhost1:1234", sh.getLastReservationDetails()
.getNodeId().toString());
Assert
.assertEquals("testqueue1", sh.getLastReservationDetails().getQueue());
Assert.assertEquals(now, sh.getLastReservationDetails().getTimestamp());
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
now = Time.now();
sh.updateAllocation(now, NodeId.newInstance("testhost2", 1234),
ContainerId.fromString("container_1427562107907_0004_01_000001"),
"testqueue2");
Assert.assertEquals("container_1427562107907_0004_01_000001", sh
.getLastAllocationDetails().getContainerId().toString());
Assert.assertEquals("testhost2:1234", sh.getLastAllocationDetails()
.getNodeId().toString());
Assert.assertEquals("testqueue2", sh.getLastAllocationDetails().getQueue());
Assert.assertEquals(now, sh.getLastAllocationDetails().getTimestamp());
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
now = Time.now();
sh.updatePreemption(now, NodeId.newInstance("testhost3", 1234),
ContainerId.fromString("container_1427562107907_0005_01_000001"),
"testqueue3");
Assert.assertEquals("container_1427562107907_0005_01_000001", sh
.getLastPreemptionDetails().getContainerId().toString());
Assert.assertEquals("testhost3:1234", sh.getLastPreemptionDetails()
.getNodeId().toString());
Assert.assertEquals("testqueue3", sh.getLastPreemptionDetails().getQueue());
Assert.assertEquals(now, sh.getLastPreemptionDetails().getTimestamp());
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
}
@Test
public void testResourceUpdate() {
SchedulerHealth sh = new SchedulerHealth();
long now = Time.now();
sh.updateSchedulerRunDetails(now, Resource.newInstance(1024, 1),
Resource.newInstance(2048, 1));
Assert.assertEquals(now, sh.getLastSchedulerRunTime());
Assert.assertEquals(Resource.newInstance(1024, 1),
sh.getResourcesAllocated());
Assert.assertEquals(Resource.newInstance(2048, 1),
sh.getResourcesReserved());
now = Time.now();
sh.updateSchedulerReleaseDetails(now, Resource.newInstance(3072, 1));
Assert.assertEquals(now, sh.getLastSchedulerRunTime());
Assert.assertEquals(Resource.newInstance(3072, 1),
sh.getResourcesReleased());
}
private NodeManager registerNode(String hostName, int containerManagerPort,
int httpPort, String rackName, Resource capability) throws IOException,
YarnException {
NodeManager nm =
new NodeManager(hostName, containerManagerPort, httpPort, rackName,
capability, resourceManager);
NodeAddedSchedulerEvent nodeAddEvent1 =
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
.get(nm.getNodeId()));
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
return nm;
}
private void nodeUpdate(NodeManager nm) {
RMNode node =
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
}
@Test
public void testCapacitySchedulerAllocation() throws Exception {
setup();
boolean isCapacityScheduler =
resourceManager.getResourceScheduler() instanceof CapacityScheduler;
assumeTrue("This test is only supported on Capacity Scheduler",
isCapacityScheduler);
// Register node1
String host_0 = "host_0";
NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(5 * 1024, 1));
// ResourceRequest priorities
Priority priority_0 =
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
.create(0);
Priority priority_1 =
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
.create(1);
// Submit an application
Application application_0 =
new Application("user_0", "default", resourceManager);
application_0.submit();
application_0.addNodeManager(host_0, 1234, nm_0);
Resource capability_0_0 = Resources.createResource(1024, 1);
application_0.addResourceRequestSpec(priority_1, capability_0_0);
Resource capability_0_1 = Resources.createResource(2 * 1024, 1);
application_0.addResourceRequestSpec(priority_0, capability_0_1);
Task task_0_0 =
new Task(application_0, priority_1, new String[] { host_0 });
application_0.addTask(task_0_0);
Task task_0_1 =
new Task(application_0, priority_0, new String[] { host_0 });
application_0.addTask(task_0_1);
// Send resource requests to the scheduler
application_0.schedule();
// Send a heartbeat to kick the tires on the Scheduler
nodeUpdate(nm_0);
SchedulerHealth sh =
((CapacityScheduler) resourceManager.getResourceScheduler())
.getSchedulerHealth();
Assert.assertEquals(2, sh.getAllocationCount().longValue());
Assert.assertEquals(Resource.newInstance(3 * 1024, 2),
sh.getResourcesAllocated());
Assert.assertEquals(2, sh.getAggregateAllocationCount().longValue());
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
.getNodeId().toString());
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
.getQueue());
Task task_0_2 =
new Task(application_0, priority_0, new String[] { host_0 });
application_0.addTask(task_0_2);
application_0.schedule();
nodeUpdate(nm_0);
Assert.assertEquals(1, sh.getAllocationCount().longValue());
Assert.assertEquals(Resource.newInstance(2 * 1024, 1),
sh.getResourcesAllocated());
Assert.assertEquals(3, sh.getAggregateAllocationCount().longValue());
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
.getNodeId().toString());
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
.getQueue());
}
@Test
public void testCapacitySchedulerReservation() throws Exception {
setup();
boolean isCapacityScheduler =
resourceManager.getResourceScheduler() instanceof CapacityScheduler;
assumeTrue("This test is only supported on Capacity Scheduler",
isCapacityScheduler);
// Register nodes
String host_0 = "host_0";
NodeManager nm_0 =
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(2 * 1024, 1));
String host_1 = "host_1";
NodeManager nm_1 =
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
Resources.createResource(5 * 1024, 1));
nodeUpdate(nm_0);
nodeUpdate(nm_1);
// ResourceRequest priorities
Priority priority_0 =
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
.create(0);
Priority priority_1 =
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
.create(1);
// Submit an application
Application application_0 =
new Application("user_0", "default", resourceManager);
application_0.submit();
application_0.addNodeManager(host_0, 1234, nm_0);
application_0.addNodeManager(host_1, 1234, nm_1);
Resource capability_0_0 = Resources.createResource(1024, 1);
application_0.addResourceRequestSpec(priority_1, capability_0_0);
Resource capability_0_1 = Resources.createResource(2 * 1024, 1);
application_0.addResourceRequestSpec(priority_0, capability_0_1);
Task task_0_0 =
new Task(application_0, priority_1, new String[] { host_0 });
application_0.addTask(task_0_0);
// Send resource requests to the scheduler
application_0.schedule();
// Send a heartbeat to kick the tires on the Scheduler
nodeUpdate(nm_0);
SchedulerHealth sh =
((CapacityScheduler) resourceManager.getResourceScheduler())
.getSchedulerHealth();
Assert.assertEquals(1, sh.getAllocationCount().longValue());
Assert.assertEquals(Resource.newInstance(1024, 1),
sh.getResourcesAllocated());
Assert.assertEquals(1, sh.getAggregateAllocationCount().longValue());
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
.getNodeId().toString());
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
.getQueue());
Task task_0_1 =
new Task(application_0, priority_0, new String[] { host_0 });
application_0.addTask(task_0_1);
application_0.schedule();
nodeUpdate(nm_0);
Assert.assertEquals(0, sh.getAllocationCount().longValue());
Assert.assertEquals(1, sh.getReservationCount().longValue());
Assert.assertEquals(Resource.newInstance(2 * 1024, 1),
sh.getResourcesReserved());
Assert.assertEquals(1, sh.getAggregateAllocationCount().longValue());
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
.getNodeId().toString());
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
.getQueue());
}
}

View File

@ -1660,7 +1660,7 @@ public void testMoveAppQueueMetricsCheck() throws Exception {
CapacityScheduler cs =
(CapacityScheduler) resourceManager.getResourceScheduler();
CSQueue origRootQ = cs.getRootQueue();
CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ);
CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ, cs);
int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
int origNumAppsRoot = origRootQ.getNumApplications();
@ -1669,7 +1669,7 @@ public void testMoveAppQueueMetricsCheck() throws Exception {
CSQueue newRootQ = cs.getRootQueue();
int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
int newNumAppsRoot = newRootQ.getNumApplications();
CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ);
CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ, cs);
CapacitySchedulerLeafQueueInfo origOldA1 =
(CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
CapacitySchedulerLeafQueueInfo origNewA1 =

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.StringReader;
@ -314,11 +315,14 @@ private void verifyClusterScheduler(JSONObject json) throws JSONException,
JSONObject info = json.getJSONObject("scheduler");
assertEquals("incorrect number of elements", 1, info.length());
info = info.getJSONObject("schedulerInfo");
assertEquals("incorrect number of elements", 6, info.length());
assertEquals("incorrect number of elements", 7, info.length());
verifyClusterSchedulerGeneric(info.getString("type"),
(float) info.getDouble("usedCapacity"),
(float) info.getDouble("capacity"),
(float) info.getDouble("maxCapacity"), info.getString("queueName"));
JSONObject health = info.getJSONObject("health");
assertNotNull(health);
assertEquals("incorrect number of elements", 3, health.length());
JSONArray arr = info.getJSONObject("queues").getJSONArray("queue");
assertEquals("incorrect number of elements", 2, arr.length());