MAPREDUCE-3050. Add ability to get resource usage information for applications and nodes. Contributed by Robert Evans.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1177859 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
960d7643fb
commit
dcf9d475e0
@ -1487,6 +1487,9 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-2996. Add uber-ness information to JobHistory. (Jonathan Eagles
|
||||
via acmurthy)
|
||||
|
||||
MAPREDUCE-3050. Add ability to get resource usage information for
|
||||
applications and nodes. (Robert Evans via acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -351,7 +351,7 @@ private NodeReport createNodeReports(RMNode rmNode) {
|
||||
report.setNodeHealthStatus(rmNode.getNodeHealthStatus());
|
||||
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport schedulerNodeReport = scheduler
|
||||
.getNodeReport(rmNode.getNodeID());
|
||||
report.setUsed(schedulerNodeReport.getUsedResources());
|
||||
report.setUsed(schedulerNodeReport.getUsedResource());
|
||||
report.setNumContainers(schedulerNodeReport.getNumContainers());
|
||||
return report;
|
||||
}
|
||||
|
@ -280,27 +280,27 @@ public void unreserveResource(String user, Resource res) {
|
||||
parent.unreserveResource(user, res);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public int getAppsSubmitted() {
|
||||
return appsSubmitted.value();
|
||||
}
|
||||
|
||||
|
||||
public int getAppsRunning() {
|
||||
return appsRunning.value();
|
||||
}
|
||||
|
||||
|
||||
public int getAppsPending() {
|
||||
return appsPending.value();
|
||||
}
|
||||
|
||||
|
||||
public int getAppsCompleted() {
|
||||
return appsCompleted.value();
|
||||
}
|
||||
|
||||
|
||||
public int getAppsKilled() {
|
||||
return appsKilled.value();
|
||||
}
|
||||
|
||||
|
||||
public int getAppsFailed() {
|
||||
return appsFailed.value();
|
||||
}
|
||||
@ -312,7 +312,7 @@ public int getAllocatedGB() {
|
||||
public int getAllocatedContainers() {
|
||||
return allocatedContainers.value();
|
||||
}
|
||||
|
||||
|
||||
public int getAvailableGB() {
|
||||
return availableGB.value();
|
||||
}
|
||||
|
@ -136,6 +136,10 @@ public Resource getResource(Priority priority) {
|
||||
return this.appSchedulingInfo.getResource(priority);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this application pending?
|
||||
* @return true if it is else false.
|
||||
*/
|
||||
public boolean isPending() {
|
||||
return this.appSchedulingInfo.isPending();
|
||||
}
|
||||
@ -144,6 +148,10 @@ public String getQueueName() {
|
||||
return this.appSchedulingInfo.getQueueName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of live containers
|
||||
* @return All of the live containers
|
||||
*/
|
||||
public synchronized Collection<RMContainer> getLiveContainers() {
|
||||
return new ArrayList<RMContainer>(liveContainers.values());
|
||||
}
|
||||
@ -419,7 +427,11 @@ public synchronized float getLocalityWaitFactor(
|
||||
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
|
||||
}
|
||||
|
||||
public synchronized List<RMContainer> getAllReservedContainers() {
|
||||
/**
|
||||
* Get the list of reserved containers
|
||||
* @return All of the reserved containers.
|
||||
*/
|
||||
public synchronized List<RMContainer> getReservedContainers() {
|
||||
List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
|
||||
for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
|
||||
this.reservedContainers.entrySet()) {
|
||||
@ -447,5 +459,4 @@ public synchronized Resource getHeadroom() {
|
||||
public Queue getQueue() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,68 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
||||
/**
|
||||
* Represents an application attempt, and the resources that the attempt is
|
||||
* using.
|
||||
*/
|
||||
@Evolving
|
||||
@LimitedPrivate("yarn")
|
||||
public class SchedulerAppReport {
|
||||
|
||||
private final Collection<RMContainer> live;
|
||||
private final Collection<RMContainer> reserved;
|
||||
private final boolean pending;
|
||||
|
||||
public SchedulerAppReport(SchedulerApp app) {
|
||||
this.live = app.getLiveContainers();
|
||||
this.reserved = app.getReservedContainers();
|
||||
this.pending = app.isPending();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of live containers
|
||||
* @return All of the live containers
|
||||
*/
|
||||
public Collection<RMContainer> getLiveContainers() {
|
||||
return live;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of reserved containers
|
||||
* @return All of the reserved containers.
|
||||
*/
|
||||
public Collection<RMContainer> getReservedContainers() {
|
||||
return reserved;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this application pending?
|
||||
* @return true if it is else false.
|
||||
*/
|
||||
public boolean isPending() {
|
||||
return pending;
|
||||
}
|
||||
}
|
@ -28,19 +28,34 @@
|
||||
@Private
|
||||
@Stable
|
||||
public class SchedulerNodeReport {
|
||||
private final Resource usedResources;
|
||||
private final int numContainers;
|
||||
private final Resource used;
|
||||
private final Resource avail;
|
||||
private final int num;
|
||||
|
||||
public SchedulerNodeReport(Resource used, int numContainers) {
|
||||
this.usedResources = used;
|
||||
this.numContainers = numContainers;
|
||||
public SchedulerNodeReport(SchedulerNode node) {
|
||||
this.used = node.getUsedResource();
|
||||
this.avail = node.getAvailableResource();
|
||||
this.num = node.getNumContainers();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the amount of resources currently used by the node.
|
||||
*/
|
||||
public Resource getUsedResource() {
|
||||
return used;
|
||||
}
|
||||
|
||||
public Resource getUsedResources() {
|
||||
return usedResources;
|
||||
/**
|
||||
* @return the amount of resources currently available on the node
|
||||
*/
|
||||
public Resource getAvailableResource() {
|
||||
return avail;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of containers currently running on this node.
|
||||
*/
|
||||
public int getNumContainers() {
|
||||
return numContainers;
|
||||
return num;
|
||||
}
|
||||
}
|
||||
|
@ -21,8 +21,9 @@
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
@ -57,7 +58,6 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
|
||||
/**
|
||||
* Get acls for queues for current user.
|
||||
* @return acls for queues for current user
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
@ -101,26 +101,24 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
|
||||
* @param nodeId
|
||||
* @return the {@link SchedulerNodeReport} for the node
|
||||
*/
|
||||
@Private
|
||||
@LimitedPrivate("yarn")
|
||||
@Stable
|
||||
public SchedulerNodeReport getNodeReport(NodeId nodeId);
|
||||
|
||||
/**
|
||||
* Get used resources on the node
|
||||
* @param nodeId node
|
||||
* @return used resources on the node
|
||||
* Get the Scheduler app for a given app attempt Id.
|
||||
* @param appAttemptId the id of the application attempt
|
||||
* @return SchedulerApp for this given attempt.
|
||||
*/
|
||||
@Private
|
||||
@LimitedPrivate("yarn")
|
||||
@Stable
|
||||
Resource getUsedResource(NodeId nodeId);
|
||||
SchedulerAppReport getSchedulerAppInfo(ApplicationAttemptId appAttemptId);
|
||||
|
||||
/**
|
||||
* Get available resources on the node
|
||||
* @param nodeId node
|
||||
* @return available resources on the node
|
||||
* Get the root queue for the scheduler.
|
||||
* @return the root queue for the scheduler.
|
||||
*/
|
||||
@Private
|
||||
@Stable
|
||||
Resource getAvailableResource(NodeId nodeId);
|
||||
|
||||
@LimitedPrivate("yarn")
|
||||
@Evolving
|
||||
QueueMetrics getRootQueueMetrics();
|
||||
}
|
||||
|
@ -58,8 +58,10 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
@ -128,10 +130,15 @@ public int compare(SchedulerApp a1, SchedulerApp a2) {
|
||||
|
||||
public CapacityScheduler() {}
|
||||
|
||||
@Override
|
||||
public QueueMetrics getRootQueueMetrics() {
|
||||
return root.getMetrics();
|
||||
}
|
||||
|
||||
public CSQueue getRootQueue() {
|
||||
return root;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CapacitySchedulerConfiguration getConfiguration() {
|
||||
return conf;
|
||||
@ -152,14 +159,6 @@ public Resource getMaximumResourceCapability() {
|
||||
return maximumAllocation;
|
||||
}
|
||||
|
||||
public synchronized Resource getUsedResource(NodeId nodeId) {
|
||||
return nodes.get(nodeId).getUsedResource();
|
||||
}
|
||||
|
||||
public synchronized Resource getAvailableResource(NodeId nodeId) {
|
||||
return nodes.get(nodeId).getAvailableResource();
|
||||
}
|
||||
|
||||
public synchronized int getNumClusterNodes() {
|
||||
return numNodeManagers;
|
||||
}
|
||||
@ -401,7 +400,7 @@ private synchronized void doneApplication(
|
||||
}
|
||||
|
||||
// Release all reserved containers
|
||||
for (RMContainer rmContainer : application.getAllReservedContainers()) {
|
||||
for (RMContainer rmContainer : application.getReservedContainers()) {
|
||||
completedContainer(rmContainer,
|
||||
SchedulerUtils.createAbnormalContainerStatus(
|
||||
rmContainer.getContainerId(),
|
||||
@ -733,6 +732,13 @@ SchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
|
||||
return applications.get(applicationAttemptId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchedulerAppReport getSchedulerAppInfo(
|
||||
ApplicationAttemptId applicationAttemptId) {
|
||||
SchedulerApp app = getApplication(applicationAttemptId);
|
||||
return app == null ? null : new SchedulerAppReport(app);
|
||||
}
|
||||
|
||||
@Lock(Lock.NoLock.class)
|
||||
SchedulerNode getNode(NodeId nodeId) {
|
||||
return nodes.get(nodeId);
|
||||
@ -764,8 +770,7 @@ public void recover(RMState state) throws Exception {
|
||||
@Override
|
||||
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
|
||||
SchedulerNode node = getNode(nodeId);
|
||||
return new SchedulerNodeReport(
|
||||
node.getUsedResource(), node.getNumContainers());
|
||||
return node == null ? null : new SchedulerNodeReport(node);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1052,6 +1052,7 @@ private Container getContainer(RMContainer rmContainer,
|
||||
createContainer(application, node, capability, priority);
|
||||
}
|
||||
|
||||
|
||||
public Container createContainer(SchedulerApp application, SchedulerNode node,
|
||||
Resource capability, Priority priority) {
|
||||
Container container =
|
||||
|
@ -141,7 +141,7 @@ public ParentQueue(CapacitySchedulerContext cs,
|
||||
maximumCapacity, absoluteMaxCapacity, state, acls);
|
||||
|
||||
this.queueComparator = comparator;
|
||||
this.childQueues = new TreeSet<CSQueue>(comparator);
|
||||
this.childQueues = new TreeSet<CSQueue>(queueComparator);
|
||||
|
||||
LOG.info("Initialized parent-queue " + queueName +
|
||||
" name=" + queueName +
|
||||
|
@ -70,6 +70,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
@ -285,6 +286,13 @@ private SchedulerApp getApplication(
|
||||
return applications.get(applicationAttemptId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchedulerAppReport getSchedulerAppInfo(
|
||||
ApplicationAttemptId applicationAttemptId) {
|
||||
SchedulerApp app = getApplication(applicationAttemptId);
|
||||
return app == null ? null : new SchedulerAppReport(app);
|
||||
}
|
||||
|
||||
private SchedulerNode getNode(NodeId nodeId) {
|
||||
return nodes.get(nodeId);
|
||||
}
|
||||
@ -762,8 +770,7 @@ public void recover(RMState state) {
|
||||
@Override
|
||||
public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
|
||||
SchedulerNode node = getNode(nodeId);
|
||||
return new SchedulerNodeReport(
|
||||
node.getUsedResource(), node.getNumContainers());
|
||||
return node == null ? null : new SchedulerNodeReport(node);
|
||||
}
|
||||
|
||||
private RMContainer getRMContainer(ContainerId containerId) {
|
||||
@ -772,4 +779,9 @@ private RMContainer getRMContainer(ContainerId containerId) {
|
||||
return (application == null) ? null : application.getRMContainer(containerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueMetrics getRootQueueMetrics() {
|
||||
return DEFAULT_QUEUE.getMetrics();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -28,10 +28,12 @@
|
||||
import org.apache.hadoop.yarn.api.records.AMResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
@ -71,8 +73,9 @@ public void test() throws Exception {
|
||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
Assert.assertEquals(2 * GB, rm.getResourceScheduler().getUsedResource(
|
||||
nm1.getNodeId()).getMemory());
|
||||
SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
|
||||
nm1.getNodeId());
|
||||
Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory());
|
||||
|
||||
RMApp app2 = rm.submitApp(2048);
|
||||
// kick the scheduling, 2GB given to AM, remaining 2 GB on nm2
|
||||
@ -80,8 +83,9 @@ public void test() throws Exception {
|
||||
RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
|
||||
MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
|
||||
am2.registerAppAttempt();
|
||||
Assert.assertEquals(2 * GB, rm.getResourceScheduler().getUsedResource(
|
||||
nm2.getNodeId()).getMemory());
|
||||
SchedulerNodeReport report_nm2 = rm.getResourceScheduler().getNodeReport(
|
||||
nm2.getNodeId());
|
||||
Assert.assertEquals(2 * GB, report_nm2.getUsedResource().getMemory());
|
||||
|
||||
// add request for containers
|
||||
am1.addRequests(new String[] { "h1", "h2" }, GB, 1, 1);
|
||||
@ -114,16 +118,14 @@ public void test() throws Exception {
|
||||
Assert.assertEquals(1, allocated2.size());
|
||||
Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory());
|
||||
Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId());
|
||||
|
||||
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
||||
report_nm2 = rm.getResourceScheduler().getNodeReport(nm2.getNodeId());
|
||||
Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory());
|
||||
Assert.assertEquals(2 * GB, report_nm2.getAvailableResource().getMemory());
|
||||
|
||||
Assert.assertEquals(0, rm.getResourceScheduler().getAvailableResource(
|
||||
nm1.getNodeId()).getMemory());
|
||||
Assert.assertEquals(2 * GB, rm.getResourceScheduler()
|
||||
.getAvailableResource(nm2.getNodeId()).getMemory());
|
||||
|
||||
Assert.assertEquals(6 * GB, rm.getResourceScheduler().getUsedResource(
|
||||
nm1.getNodeId()).getMemory());
|
||||
Assert.assertEquals(2 * GB, rm.getResourceScheduler().getUsedResource(
|
||||
nm2.getNodeId()).getMemory());
|
||||
Assert.assertEquals(6 * GB, report_nm1.getUsedResource().getMemory());
|
||||
Assert.assertEquals(2 * GB, report_nm2.getUsedResource().getMemory());
|
||||
|
||||
Container c1 = allocated1.get(0);
|
||||
Assert.assertEquals(GB, c1.getResource().getMemory());
|
||||
@ -138,8 +140,8 @@ public void test() throws Exception {
|
||||
}
|
||||
Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
|
||||
Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
|
||||
Assert.assertEquals(5 * GB, rm.getResourceScheduler().getUsedResource(
|
||||
nm1.getNodeId()).getMemory());
|
||||
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
||||
Assert.assertEquals(5 * GB, report_nm1.getUsedResource().getMemory());
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user