YARN-5890. FairScheduler should log information about AM-resource-usage and max-AM-share for queues
(Contributed by Yufei Gu via Daniel Templeton)
This commit is contained in:
parent
b407d53195
commit
3b9d3acd20
@ -81,6 +81,7 @@ public FSLeafQueue(String name, FairScheduler scheduler,
|
|||||||
this.lastTimeAtMinShare = scheduler.getClock().getTime();
|
this.lastTimeAtMinShare = scheduler.getClock().getTime();
|
||||||
activeUsersManager = new ActiveUsersManager(getMetrics());
|
activeUsersManager = new ActiveUsersManager(getMetrics());
|
||||||
amResourceUsage = Resource.newInstance(0, 0);
|
amResourceUsage = Resource.newInstance(0, 0);
|
||||||
|
getMetrics().setAMResourceUsage(amResourceUsage);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addApp(FSAppAttempt app, boolean runnable) {
|
void addApp(FSAppAttempt app, boolean runnable) {
|
||||||
@ -132,6 +133,7 @@ boolean removeApp(FSAppAttempt app) {
|
|||||||
// running an unmanaged AM.
|
// running an unmanaged AM.
|
||||||
if (runnable && app.isAmRunning()) {
|
if (runnable && app.isAmRunning()) {
|
||||||
Resources.subtractFrom(amResourceUsage, app.getAMResource());
|
Resources.subtractFrom(amResourceUsage, app.getAMResource());
|
||||||
|
getMetrics().setAMResourceUsage(amResourceUsage);
|
||||||
}
|
}
|
||||||
|
|
||||||
return runnable;
|
return runnable;
|
||||||
@ -468,19 +470,14 @@ public ActiveUsersManager getActiveUsersManager() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check whether this queue can run this application master under the
|
* Compute the maximum resource AM can use. The value is the result of
|
||||||
* maxAMShare limit.
|
* multiplying FairShare and maxAMShare. If FairShare is zero, use
|
||||||
*
|
* min(maxShare, available resource) instead to prevent zero value for
|
||||||
* @param amResource resources required to run the AM
|
* maximum AM resource since it forbids any job running in the queue.
|
||||||
* @return true if this queue can run
|
*
|
||||||
*/
|
* @return the maximum resource AM can use
|
||||||
boolean canRunAppAM(Resource amResource) {
|
*/
|
||||||
if (Math.abs(maxAMShare - -1.0f) < 0.0001) {
|
private Resource computeMaxAMResource() {
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If FairShare is zero, use min(maxShare, available resource) to compute
|
|
||||||
// maxAMResource
|
|
||||||
Resource maxResource = Resources.clone(getFairShare());
|
Resource maxResource = Resources.clone(getFairShare());
|
||||||
if (maxResource.getMemorySize() == 0) {
|
if (maxResource.getMemorySize() == 0) {
|
||||||
maxResource.setMemorySize(
|
maxResource.setMemorySize(
|
||||||
@ -494,7 +491,23 @@ boolean canRunAppAM(Resource amResource) {
|
|||||||
getMaxShare().getVirtualCores()));
|
getMaxShare().getVirtualCores()));
|
||||||
}
|
}
|
||||||
|
|
||||||
Resource maxAMResource = Resources.multiply(maxResource, maxAMShare);
|
return Resources.multiply(maxResource, maxAMShare);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether this queue can run the Application Master under the
|
||||||
|
* maxAMShare limit.
|
||||||
|
*
|
||||||
|
* @param amResource resources required to run the AM
|
||||||
|
* @return true if this queue can run
|
||||||
|
*/
|
||||||
|
public boolean canRunAppAM(Resource amResource) {
|
||||||
|
if (Math.abs(maxAMShare - -1.0f) < 0.0001) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
Resource maxAMResource = computeMaxAMResource();
|
||||||
|
getMetrics().setMaxAMShare(maxAMResource);
|
||||||
Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
|
Resource ifRunAMResource = Resources.add(amResourceUsage, amResource);
|
||||||
return Resources.fitsIn(ifRunAMResource, maxAMResource);
|
return Resources.fitsIn(ifRunAMResource, maxAMResource);
|
||||||
}
|
}
|
||||||
@ -502,6 +515,7 @@ boolean canRunAppAM(Resource amResource) {
|
|||||||
void addAMResourceUsage(Resource amResource) {
|
void addAMResourceUsage(Resource amResource) {
|
||||||
if (amResource != null) {
|
if (amResource != null) {
|
||||||
Resources.addTo(amResourceUsage, amResource);
|
Resources.addTo(amResourceUsage, amResource);
|
||||||
|
getMetrics().setAMResourceUsage(amResourceUsage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +41,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract class FSQueue implements Queue, Schedulable {
|
public abstract class FSQueue implements Queue, Schedulable {
|
||||||
@ -160,6 +162,11 @@ public int getMaxRunningApps() {
|
|||||||
return maxRunningApps;
|
return maxRunningApps;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected float getMaxAMShare() {
|
||||||
|
return maxAMShare;
|
||||||
|
}
|
||||||
|
|
||||||
public void setMaxAMShare(float maxAMShare){
|
public void setMaxAMShare(float maxAMShare){
|
||||||
this.maxAMShare = maxAMShare;
|
this.maxAMShare = maxAMShare;
|
||||||
}
|
}
|
||||||
|
@ -41,6 +41,10 @@ public class FSQueueMetrics extends QueueMetrics {
|
|||||||
@Metric("Maximum share of memory in MB") MutableGaugeLong maxShareMB;
|
@Metric("Maximum share of memory in MB") MutableGaugeLong maxShareMB;
|
||||||
@Metric("Maximum share of CPU in vcores") MutableGaugeLong maxShareVCores;
|
@Metric("Maximum share of CPU in vcores") MutableGaugeLong maxShareVCores;
|
||||||
@Metric("Maximum number of applications") MutableGaugeInt maxApps;
|
@Metric("Maximum number of applications") MutableGaugeInt maxApps;
|
||||||
|
@Metric("Maximum AM share of memory in MB") MutableGaugeLong maxAMShareMB;
|
||||||
|
@Metric("Maximum AM share of CPU in vcores") MutableGaugeInt maxAMShareVCores;
|
||||||
|
@Metric("AM resource usage of memory in MB") MutableGaugeLong amResourceUsageMB;
|
||||||
|
@Metric("AM resource usage of CPU in vcores") MutableGaugeInt amResourceUsageVCores;
|
||||||
|
|
||||||
private String schedulingPolicy;
|
private String schedulingPolicy;
|
||||||
|
|
||||||
@ -109,6 +113,62 @@ public void setMaxApps(int max) {
|
|||||||
maxApps.set(max);
|
maxApps.set(max);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the maximum memory size AM can use in MB.
|
||||||
|
*
|
||||||
|
* @return the maximum memory size AM can use
|
||||||
|
*/
|
||||||
|
public long getMaxAMShareMB() {
|
||||||
|
return maxAMShareMB.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the maximum number of VCores AM can use.
|
||||||
|
*
|
||||||
|
* @return the maximum number of VCores AM can use
|
||||||
|
*/
|
||||||
|
public int getMaxAMShareVCores() {
|
||||||
|
return maxAMShareVCores.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the maximum resource AM can use.
|
||||||
|
*
|
||||||
|
* @param resource the maximum resource AM can use
|
||||||
|
*/
|
||||||
|
public void setMaxAMShare(Resource resource) {
|
||||||
|
maxAMShareMB.set(resource.getMemorySize());
|
||||||
|
maxAMShareVCores.set(resource.getVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the AM memory usage in MB.
|
||||||
|
*
|
||||||
|
* @return the AM memory usage
|
||||||
|
*/
|
||||||
|
public long getAMResourceUsageMB() {
|
||||||
|
return amResourceUsageMB.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the AM VCore usage.
|
||||||
|
*
|
||||||
|
* @return the AM VCore usage
|
||||||
|
*/
|
||||||
|
public int getAMResourceUsageVCores() {
|
||||||
|
return amResourceUsageVCores.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the AM resource usage.
|
||||||
|
*
|
||||||
|
* @param resource the AM resource usage
|
||||||
|
*/
|
||||||
|
public void setAMResourceUsage(Resource resource) {
|
||||||
|
amResourceUsageMB.set(resource.getMemorySize());
|
||||||
|
amResourceUsageVCores.set(resource.getVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
public String getSchedulingPolicy() {
|
public String getSchedulingPolicy() {
|
||||||
return schedulingPolicy;
|
return schedulingPolicy;
|
||||||
}
|
}
|
||||||
|
@ -594,6 +594,143 @@ public void testFairShareWithZeroWeight() throws IOException {
|
|||||||
assertEquals(0, queue.getFairShare().getMemorySize());
|
assertEquals(0, queue.getFairShare().getMemorySize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if we compute the maximum AM resource correctly.
|
||||||
|
*
|
||||||
|
* @throws IOException if scheduler reinitialization fails
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testComputeMaxAMResource() throws IOException {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"queueFSZeroWithMax\">");
|
||||||
|
out.println("<weight>0</weight>");
|
||||||
|
out.println("<maxAMShare>0.5</maxAMShare>");
|
||||||
|
out.println("<maxResources>4096 mb 4 vcores</maxResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queueFSZeroWithAVL\">");
|
||||||
|
out.println("<weight>0.0</weight>");
|
||||||
|
out.println("<maxAMShare>0.5</maxAMShare>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queueFSNonZero\">");
|
||||||
|
out.println("<weight>1</weight>");
|
||||||
|
out.println("<maxAMShare>0.5</maxAMShare>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<defaultQueueSchedulingPolicy>drf" +
|
||||||
|
"</defaultQueueSchedulingPolicy>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
long memCapacity = 20 * GB;
|
||||||
|
int cpuCapacity = 20;
|
||||||
|
RMNode node =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(memCapacity,
|
||||||
|
cpuCapacity), 0, "127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
||||||
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
||||||
|
scheduler.handle(nodeEvent);
|
||||||
|
scheduler.update();
|
||||||
|
|
||||||
|
Resource amResource = Resource.newInstance(1 * GB, 1);
|
||||||
|
int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority();
|
||||||
|
|
||||||
|
// queueFSZeroWithMax
|
||||||
|
FSLeafQueue queueFSZeroWithMax = scheduler.getQueueManager().
|
||||||
|
getLeafQueue("queueFSZeroWithMax", true);
|
||||||
|
ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
|
||||||
|
createApplicationWithAMResource(attId1, "queueFSZeroWithMax", "user1",
|
||||||
|
amResource);
|
||||||
|
createSchedulingRequestExistingApplication(1 * GB, 1, amPriority, attId1);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
// queueFSZeroWithMax's weight is 0.0, so its fair share should be 0, we use
|
||||||
|
// the min(maxShare, available resource) to compute maxAMShare, in this
|
||||||
|
// case, we use maxShare, since it is smaller than available resource.
|
||||||
|
assertEquals("QueueFSZeroWithMax's fair share should be zero",
|
||||||
|
0, queueFSZeroWithMax.getFairShare().getMemorySize());
|
||||||
|
assertEquals("QueueFSZeroWithMax's maximum AM resource should be "
|
||||||
|
+ "maxShare * maxAMShare",
|
||||||
|
(long)(queueFSZeroWithMax.getMaxShare().getMemorySize() *
|
||||||
|
queueFSZeroWithMax.getMaxAMShare()),
|
||||||
|
queueFSZeroWithMax.getMetrics().getMaxAMShareMB());
|
||||||
|
assertEquals("QueueFSZeroWithMax's maximum AM resource should be "
|
||||||
|
+ "maxShare * maxAMShare",
|
||||||
|
(long)(queueFSZeroWithMax.getMaxShare().getVirtualCores() *
|
||||||
|
queueFSZeroWithMax.getMaxAMShare()),
|
||||||
|
queueFSZeroWithMax.getMetrics().getMaxAMShareVCores());
|
||||||
|
assertEquals("QueueFSZeroWithMax's AM resource usage should be the same to "
|
||||||
|
+ "AM resource request",
|
||||||
|
amResource.getMemorySize(),
|
||||||
|
queueFSZeroWithMax.getMetrics().getAMResourceUsageMB());
|
||||||
|
|
||||||
|
// queueFSZeroWithAVL
|
||||||
|
amResource = Resources.createResource(1 * GB, 1);
|
||||||
|
FSLeafQueue queueFSZeroWithAVL = scheduler.getQueueManager().
|
||||||
|
getLeafQueue("queueFSZeroWithAVL", true);
|
||||||
|
ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
|
||||||
|
createApplicationWithAMResource(attId2, "queueFSZeroWithAVL", "user1",
|
||||||
|
amResource);
|
||||||
|
createSchedulingRequestExistingApplication(1 * GB, 1, amPriority, attId2);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
// queueFSZeroWithAVL's weight is 0.0, so its fair share is 0, and we use
|
||||||
|
// the min(maxShare, available resource) to compute maxAMShare, in this
|
||||||
|
// case, we use available resource since it is smaller than the
|
||||||
|
// default maxShare.
|
||||||
|
assertEquals("QueueFSZeroWithAVL's fair share should be zero",
|
||||||
|
0, queueFSZeroWithAVL.getFairShare().getMemorySize());
|
||||||
|
assertEquals("QueueFSZeroWithAVL's maximum AM resource should be "
|
||||||
|
+ " available resource * maxAMShare",
|
||||||
|
(long) ((memCapacity - amResource.getMemorySize()) *
|
||||||
|
queueFSZeroWithAVL.getMaxAMShare()),
|
||||||
|
queueFSZeroWithAVL.getMetrics().getMaxAMShareMB());
|
||||||
|
assertEquals("QueueFSZeroWithAVL's maximum AM resource should be "
|
||||||
|
+ " available resource * maxAMShare",
|
||||||
|
(long) ((cpuCapacity - amResource.getVirtualCores()) *
|
||||||
|
queueFSZeroWithAVL.getMaxAMShare()),
|
||||||
|
queueFSZeroWithAVL.getMetrics().getMaxAMShareVCores());
|
||||||
|
assertEquals("QueueFSZeroWithMax's AM resource usage should be the same to "
|
||||||
|
+ "AM resource request",
|
||||||
|
amResource.getMemorySize(),
|
||||||
|
queueFSZeroWithAVL.getMetrics().getAMResourceUsageMB());
|
||||||
|
|
||||||
|
// queueFSNonZero
|
||||||
|
amResource = Resources.createResource(1 * GB, 1);
|
||||||
|
FSLeafQueue queueFSNonZero = scheduler.getQueueManager().
|
||||||
|
getLeafQueue("queueFSNonZero", true);
|
||||||
|
ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
|
||||||
|
createApplicationWithAMResource(attId3, "queueFSNonZero", "user1",
|
||||||
|
amResource);
|
||||||
|
createSchedulingRequestExistingApplication(1 * GB, 1, amPriority, attId3);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
// queueFSNonZero's weight is 1, so its fair share is not 0, and we use the
|
||||||
|
// fair share to compute maxAMShare
|
||||||
|
assertNotEquals("QueueFSNonZero's fair share shouldn't be zero",
|
||||||
|
0, queueFSNonZero.getFairShare().getMemorySize());
|
||||||
|
assertEquals("QueueFSNonZero's maximum AM resource should be "
|
||||||
|
+ " fair share * maxAMShare",
|
||||||
|
(long)(memCapacity * queueFSNonZero.getMaxAMShare()),
|
||||||
|
queueFSNonZero.getMetrics().getMaxAMShareMB());
|
||||||
|
assertEquals("QueueFSNonZero's maximum AM resource should be "
|
||||||
|
+ " fair share * maxAMShare",
|
||||||
|
(long)(cpuCapacity * queueFSNonZero.getMaxAMShare()),
|
||||||
|
queueFSNonZero.getMetrics().getMaxAMShareVCores());
|
||||||
|
assertEquals("QueueFSNonZero's AM resource usage should be the same to "
|
||||||
|
+ "AM resource request",
|
||||||
|
amResource.getMemorySize(),
|
||||||
|
queueFSNonZero.getMetrics().getAMResourceUsageMB());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException {
|
public void testFairShareWithZeroWeightNoneZeroMinRes() throws IOException {
|
||||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
Loading…
Reference in New Issue
Block a user