YARN-6042. Dump scheduler and queue state information into FairScheduler DEBUG log. (Yufei Gu via rchiang)
This commit is contained in:
parent
229c7c9f89
commit
4db9cc70d0
@ -321,3 +321,12 @@ log4j.appender.EWMA=org.apache.hadoop.yarn.util.Log4jWarningErrorMetricsAppender
|
||||
log4j.appender.EWMA.cleanupInterval=${yarn.ewma.cleanupInterval}
|
||||
log4j.appender.EWMA.messageAgeLimitSeconds=${yarn.ewma.messageAgeLimitSeconds}
|
||||
log4j.appender.EWMA.maxUniqueMessages=${yarn.ewma.maxUniqueMessages}
|
||||
|
||||
# Fair scheduler requests log on state dump
|
||||
log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.statedump=DEBUG,FSLOGGER
|
||||
log4j.appender.FSLOGGER=org.apache.log4j.RollingFileAppender
|
||||
log4j.appender.FSLOGGER.File=${hadoop.log.dir}/fairscheduler-statedump.log
|
||||
log4j.appender.FSLOGGER.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.FSLOGGER.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
|
||||
log4j.appender.FSLOGGER.MaxFileSize=${hadoop.log.maxfilesize}
|
||||
log4j.appender.FSLOGGER.MaxBackupIndex=${hadoop.log.maxbackupindex}
|
@ -835,25 +835,27 @@ private Resource assignContainer(
|
||||
return capability;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Resource request: " + capability + " exceeds the available"
|
||||
+ " resources of the node.");
|
||||
}
|
||||
|
||||
// The desired container won't fit here, so reserve
|
||||
if (isReservable(capability) &&
|
||||
reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
|
||||
type, schedulerKey)) {
|
||||
if (isWaitingForAMContainer()) {
|
||||
updateAMDiagnosticMsg(capability,
|
||||
" exceed the available resources of the node and the request is"
|
||||
+ " reserved");
|
||||
updateAMDiagnosticMsg(capability, " exceeds the available resources of "
|
||||
+ "the node and the request is reserved)");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getName() + "'s resource request is reserved.");
|
||||
}
|
||||
return FairScheduler.CONTAINER_RESERVED;
|
||||
} else {
|
||||
if (isWaitingForAMContainer()) {
|
||||
updateAMDiagnosticMsg(capability,
|
||||
" exceed the available resources of the node and the request cannot"
|
||||
+ " be reserved");
|
||||
}
|
||||
updateAMDiagnosticMsg(capability, " exceeds the available resources of "
|
||||
+ "the node and the request cannot be reserved)");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Couldn't creating reservation for " +
|
||||
getName() + ",at priority " + schedulerKey.getPriority());
|
||||
LOG.debug("Couldn't create reservation for app: " + getName()
|
||||
+ ", at priority " + schedulerKey.getPriority());
|
||||
}
|
||||
return Resources.none();
|
||||
}
|
||||
@ -1034,10 +1036,9 @@ private boolean hasContainerForNode(SchedulerRequestKey key,
|
||||
ret = false;
|
||||
} else if (!getQueue().fitsInMaxShare(resource)) {
|
||||
// The requested container must fit in queue maximum share
|
||||
if (isWaitingForAMContainer()) {
|
||||
updateAMDiagnosticMsg(resource,
|
||||
" exceeds current queue or its parents maximum resource allowed).");
|
||||
}
|
||||
updateAMDiagnosticMsg(resource,
|
||||
" exceeds current queue or its parents maximum resource allowed).");
|
||||
|
||||
ret = false;
|
||||
}
|
||||
|
||||
@ -1309,15 +1310,13 @@ public void updateDemand() {
|
||||
@Override
|
||||
public Resource assignContainer(FSSchedulerNode node) {
|
||||
if (isOverAMShareLimit()) {
|
||||
if (isWaitingForAMContainer()) {
|
||||
PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk();
|
||||
updateAMDiagnosticMsg(amAsk.getPerAllocationResource(),
|
||||
" exceeds maximum AM resource allowed).");
|
||||
}
|
||||
|
||||
PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk();
|
||||
updateAMDiagnosticMsg(amAsk.getPerAllocationResource(),
|
||||
" exceeds maximum AM resource allowed).");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skipping allocation because maxAMShare limit would " +
|
||||
"be exceeded");
|
||||
LOG.debug("AM resource request: " + amAsk.getPerAllocationResource()
|
||||
+ " exceeds maximum AM resource allowed, "
|
||||
+ getQueue().dumpState());
|
||||
}
|
||||
return Resources.none();
|
||||
}
|
||||
@ -1331,6 +1330,10 @@ public Resource assignContainer(FSSchedulerNode node) {
|
||||
* @param reason the reason why AM doesn't get the resource
|
||||
*/
|
||||
private void updateAMDiagnosticMsg(Resource resource, String reason) {
|
||||
if (!isWaitingForAMContainer()) {
|
||||
return;
|
||||
}
|
||||
|
||||
StringBuilder diagnosticMessageBldr = new StringBuilder();
|
||||
diagnosticMessageBldr.append(" (Resource request: ");
|
||||
diagnosticMessageBldr.append(resource);
|
||||
|
@ -617,4 +617,25 @@ private boolean isStarvedForFairShare() {
|
||||
boolean isStarved() {
|
||||
return isStarvedForMinShare() || isStarvedForFairShare();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void dumpStateInternal(StringBuilder sb) {
|
||||
sb.append("{Name: " + getName() +
|
||||
", Weight: " + weights +
|
||||
", Policy: " + policy.getName() +
|
||||
", FairShare: " + getFairShare() +
|
||||
", SteadyFairShare: " + getSteadyFairShare() +
|
||||
", MaxShare: " + maxShare +
|
||||
", MinShare: " + minShare +
|
||||
", ResourceUsage: " + getResourceUsage() +
|
||||
", Demand: " + getDemand() +
|
||||
", Runnable: " + getNumRunnableApps() +
|
||||
", NumPendingApps: " + getNumPendingApps() +
|
||||
", NonRunnable: " + getNumNonRunnableApps() +
|
||||
", MaxAMShare: " + maxAMShare +
|
||||
", MaxAMResource: " + computeMaxAMResource() +
|
||||
", AMResourceUsage: " + getAmResourceUsage() +
|
||||
", LastTimeAtMinShare: " + lastTimeAtMinShare +
|
||||
"}");
|
||||
}
|
||||
}
|
||||
|
@ -292,4 +292,25 @@ public void recoverContainer(Resource clusterResource,
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void dumpStateInternal(StringBuilder sb) {
|
||||
sb.append("{Name: " + getName() +
|
||||
", Weight: " + weights +
|
||||
", Policy: " + policy.getName() +
|
||||
", FairShare: " + getFairShare() +
|
||||
", SteadyFairShare: " + getSteadyFairShare() +
|
||||
", MaxShare: " + maxShare +
|
||||
", MinShare: " + minShare +
|
||||
", ResourceUsage: " + getResourceUsage() +
|
||||
", Demand: " + getDemand() +
|
||||
", MaxAMShare: " + maxAMShare +
|
||||
", Runnable: " + getNumRunnableApps() +
|
||||
"}");
|
||||
|
||||
for(FSQueue child : getChildQueues()) {
|
||||
sb.append(", ");
|
||||
child.dumpStateInternal(sb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -393,11 +393,22 @@ public abstract void collectSchedulerApplications(
|
||||
* @return true if check passes (can assign) or false otherwise
|
||||
*/
|
||||
boolean assignContainerPreCheck(FSSchedulerNode node) {
|
||||
if (!Resources.fitsIn(getResourceUsage(), maxShare)
|
||||
|| node.getReservedContainer() != null) {
|
||||
if (node.getReservedContainer() != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Assigning container failed on node '" + node.getNodeName()
|
||||
+ " because it has reserved containers.");
|
||||
}
|
||||
return false;
|
||||
} else if (!Resources.fitsIn(getResourceUsage(), maxShare)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Assigning container failed on node '" + node.getNodeName()
|
||||
+ " because queue resource usage is larger than MaxShare: "
|
||||
+ dumpState());
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -453,6 +464,11 @@ boolean fitsInMaxShare(Resource additionalResource) {
|
||||
Resources.add(getResourceUsage(), additionalResource);
|
||||
|
||||
if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Resource usage plus resource request: " + usagePlusAddition
|
||||
+ " exceeds maximum resource allowed:" + getMaxShare()
|
||||
+ " in queue " + getName());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -491,4 +507,23 @@ public boolean verifyAndSetPolicyFromConf(AllocationConfiguration queueConf) {
|
||||
setPolicy(queuePolicy);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively dump states of all queues.
|
||||
*
|
||||
* @return a string which holds all queue states
|
||||
*/
|
||||
public String dumpState() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
dumpStateInternal(sb);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Recursively dump states of all queues.
|
||||
*
|
||||
* @param sb the {code StringBuilder} which holds queue states
|
||||
*/
|
||||
protected abstract void dumpStateInternal(StringBuilder sb);
|
||||
}
|
||||
|
@ -139,7 +139,9 @@ public class FairScheduler extends
|
||||
private boolean usePortForNodeName;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
||||
|
||||
private static final Log STATE_DUMP_LOG =
|
||||
LogFactory.getLog(FairScheduler.class.getName() + ".statedump");
|
||||
|
||||
private static final ResourceCalculator RESOURCE_CALCULATOR =
|
||||
new DefaultResourceCalculator();
|
||||
private static final ResourceCalculator DOMINANT_RESOURCE_CALCULATOR =
|
||||
@ -151,7 +153,7 @@ public class FairScheduler extends
|
||||
|
||||
// How often fair shares are re-calculated (ms)
|
||||
protected long updateInterval;
|
||||
private final int UPDATE_DEBUG_FREQUENCY = 5;
|
||||
private final int UPDATE_DEBUG_FREQUENCY = 25;
|
||||
private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
||||
|
||||
@VisibleForTesting
|
||||
@ -344,6 +346,21 @@ public void run() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dump scheduler state including states of all queues.
|
||||
*/
|
||||
private void dumpSchedulerState() {
|
||||
FSQueue rootQueue = queueMgr.getRootQueue();
|
||||
Resource clusterResource = getClusterResource();
|
||||
LOG.debug("FairScheduler state: Cluster Capacity: " + clusterResource +
|
||||
" Allocations: " + rootMetrics.getAllocatedResources() +
|
||||
" Availability: " + Resource.newInstance(
|
||||
rootMetrics.getAvailableMB(), rootMetrics.getAvailableVirtualCores()) +
|
||||
" Demand: " + rootQueue.getDemand());
|
||||
|
||||
STATE_DUMP_LOG.debug(rootQueue.dumpState());
|
||||
}
|
||||
|
||||
/**
|
||||
* Recompute the internal variables used by the scheduler - per-job weights,
|
||||
* fair shares, deficits, minimum slot allocations, and amount of used and
|
||||
@ -368,12 +385,7 @@ public void update() {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (--updatesToSkipForDebug < 0) {
|
||||
updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
|
||||
LOG.debug("Cluster Capacity: " + clusterResource +
|
||||
" Allocations: " + rootMetrics.getAllocatedResources() +
|
||||
" Availability: " + Resource.newInstance(
|
||||
rootMetrics.getAvailableMB(),
|
||||
rootMetrics.getAvailableVirtualCores()) +
|
||||
" Demand: " + rootQueue.getDemand());
|
||||
dumpSchedulerState();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -5146,4 +5146,75 @@ public void testUpdateDemand() throws IOException {
|
||||
Resources.equals(aQueue.getDemand(), maxResource) &&
|
||||
Resources.equals(bQueue.getDemand(), maxResource));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDumpState() throws IOException {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"parent\">");
|
||||
out.println(" <queue name=\"child1\">");
|
||||
out.println(" <weight>1</weight>");
|
||||
out.println(" </queue>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
ControlledClock clock = new ControlledClock();
|
||||
scheduler.setClock(clock);
|
||||
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
FSLeafQueue child1 =
|
||||
scheduler.getQueueManager().getLeafQueue("parent.child1", false);
|
||||
Resource resource = Resource.newInstance(4 * GB, 4);
|
||||
child1.setMaxShare(resource);
|
||||
FSAppAttempt app = mock(FSAppAttempt.class);
|
||||
Mockito.when(app.getDemand()).thenReturn(resource);
|
||||
Mockito.when(app.getResourceUsage()).thenReturn(resource);
|
||||
child1.addAppSchedulable(app);
|
||||
child1.updateDemand();
|
||||
|
||||
String childQueueString = "{Name: root.parent.child1,"
|
||||
+ " Weight: <memory weight=1.0, cpu weight=1.0>,"
|
||||
+ " Policy: fair,"
|
||||
+ " FairShare: <memory:0, vCores:0>,"
|
||||
+ " SteadyFairShare: <memory:0, vCores:0>,"
|
||||
+ " MaxShare: <memory:4096, vCores:4>,"
|
||||
+ " MinShare: <memory:0, vCores:0>,"
|
||||
+ " ResourceUsage: <memory:4096, vCores:4>,"
|
||||
+ " Demand: <memory:4096, vCores:4>,"
|
||||
+ " Runnable: 1,"
|
||||
+ " NumPendingApps: 0,"
|
||||
+ " NonRunnable: 0,"
|
||||
+ " MaxAMShare: 0.5,"
|
||||
+ " MaxAMResource: <memory:0, vCores:0>,"
|
||||
+ " AMResourceUsage: <memory:0, vCores:0>,"
|
||||
+ " LastTimeAtMinShare: " + clock.getTime()
|
||||
+ "}";
|
||||
|
||||
assertTrue(child1.dumpState().equals(childQueueString));
|
||||
FSParentQueue parent =
|
||||
scheduler.getQueueManager().getParentQueue("parent", false);
|
||||
parent.setMaxShare(resource);
|
||||
|
||||
String parentQueueString = "{Name: root.parent,"
|
||||
+ " Weight: <memory weight=1.0, cpu weight=1.0>,"
|
||||
+ " Policy: fair,"
|
||||
+ " FairShare: <memory:0, vCores:0>,"
|
||||
+ " SteadyFairShare: <memory:0, vCores:0>,"
|
||||
+ " MaxShare: <memory:4096, vCores:4>,"
|
||||
+ " MinShare: <memory:0, vCores:0>,"
|
||||
+ " ResourceUsage: <memory:4096, vCores:4>,"
|
||||
+ " Demand: <memory:0, vCores:0>,"
|
||||
+ " MaxAMShare: 0.5,"
|
||||
+ " Runnable: 0}";
|
||||
|
||||
assertTrue(parent.dumpState().equals(
|
||||
parentQueueString + ", " + childQueueString));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user