YARN-7172. ResourceCalculator.fitsIn() should not take a cluster resource parameter. (Sen Zhao via wangda)

Change-Id: Icc3670c9381ce7591ca69ec12da5aa52d3612d34
This commit is contained in:
Wangda Tan 2017-09-17 21:20:43 -07:00
parent 8d7cc22ac2
commit e81596d06d
13 changed files with 55 additions and 78 deletions

View File

@ -123,8 +123,7 @@ public Resource multiplyAndNormalizeDown(Resource r, double by,
}
@Override
public boolean fitsIn(Resource cluster,
Resource smaller, Resource bigger) {
public boolean fitsIn(Resource smaller, Resource bigger) {
return smaller.getMemorySize() <= bigger.getMemorySize();
}

View File

@ -538,7 +538,7 @@ private Resource multiplyAndNormalize(Resource r, double by,
}
@Override
public boolean fitsIn(Resource cluster, Resource smaller, Resource bigger) {
public boolean fitsIn(Resource smaller, Resource bigger) {
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) {
ResourceInformation sResourceInformation = smaller

View File

@ -225,8 +225,7 @@ public abstract float divide(
/**
* Check if a smaller resource can be contained by bigger resource.
*/
public abstract boolean fitsIn(Resource cluster,
Resource smaller, Resource bigger);
public abstract boolean fitsIn(Resource smaller, Resource bigger);
/**
* Check if resource has any major resource types (which are all NodeManagers

View File

@ -437,9 +437,9 @@ public static boolean fitsIn(Resource smaller, Resource bigger) {
return true;
}
public static boolean fitsIn(ResourceCalculator rc, Resource cluster,
public static boolean fitsIn(ResourceCalculator rc,
Resource smaller, Resource bigger) {
return rc.fitsIn(cluster, smaller, bigger);
return rc.fitsIn(smaller, bigger);
}
public static Resource componentwiseMin(Resource lhs, Resource rhs) {

View File

@ -24,14 +24,13 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class TestResourceCalculator {
@ -64,29 +63,28 @@ public TestResourceCalculator(ResourceCalculator rs) {
@Test(timeout = 10000)
public void testFitsIn() {
Resource cluster = Resource.newInstance(1024, 1);
if (resourceCalculator instanceof DefaultResourceCalculator) {
assertTrue(resourceCalculator.fitsIn(cluster,
Assert.assertTrue(resourceCalculator.fitsIn(
Resource.newInstance(1, 2), Resource.newInstance(2, 1)));
assertTrue(resourceCalculator.fitsIn(cluster,
Assert.assertTrue(resourceCalculator.fitsIn(
Resource.newInstance(1, 2), Resource.newInstance(2, 2)));
assertTrue(resourceCalculator.fitsIn(cluster,
Assert.assertTrue(resourceCalculator.fitsIn(
Resource.newInstance(1, 2), Resource.newInstance(1, 2)));
assertTrue(resourceCalculator.fitsIn(cluster,
Assert.assertTrue(resourceCalculator.fitsIn(
Resource.newInstance(1, 2), Resource.newInstance(1, 1)));
assertFalse(resourceCalculator.fitsIn(cluster,
Assert.assertFalse(resourceCalculator.fitsIn(
Resource.newInstance(2, 1), Resource.newInstance(1, 2)));
} else if (resourceCalculator instanceof DominantResourceCalculator) {
assertFalse(resourceCalculator.fitsIn(cluster,
Assert.assertFalse(resourceCalculator.fitsIn(
Resource.newInstance(1, 2), Resource.newInstance(2, 1)));
assertTrue(resourceCalculator.fitsIn(cluster,
Assert.assertTrue(resourceCalculator.fitsIn(
Resource.newInstance(1, 2), Resource.newInstance(2, 2)));
assertTrue(resourceCalculator.fitsIn(cluster,
Assert.assertTrue(resourceCalculator.fitsIn(
Resource.newInstance(1, 2), Resource.newInstance(1, 2)));
assertFalse(resourceCalculator.fitsIn(cluster,
Assert.assertFalse(resourceCalculator.fitsIn(
Resource.newInstance(1, 2), Resource.newInstance(1, 1)));
assertFalse(resourceCalculator.fitsIn(cluster,
Assert.assertFalse(resourceCalculator.fitsIn(
Resource.newInstance(2, 1), Resource.newInstance(1, 2)));
}
}

View File

@ -300,8 +300,7 @@ public static void checkSchedContainerChangeRequest(
// Target resource of the increase request is more than NM can offer
ResourceScheduler scheduler = rmContext.getScheduler();
RMNode rmNode = request.getSchedulerNode().getRMNode();
if (!Resources.fitsIn(scheduler.getResourceCalculator(),
scheduler.getClusterResource(), targetResource,
if (!Resources.fitsIn(scheduler.getResourceCalculator(), targetResource,
rmNode.getTotalCapability())) {
String msg = "Target resource=" + targetResource + " of containerId="
+ containerId + " is more than node's total resource="

View File

@ -156,8 +156,8 @@ public static boolean tryPreemptContainerAndDeductResToObtain(
if (null != toObtainByPartition
&& Resources.greaterThan(rc, clusterResource, toObtainByPartition,
Resources.none())
&& Resources.fitsIn(rc, clusterResource,
rmContainer.getAllocatedResource(), totalPreemptionAllowed)
&& Resources.fitsIn(rc, rmContainer.getAllocatedResource(),
totalPreemptionAllowed)
&& !Resources.isAnyMajorResourceZero(rc, toObtainByPartition)) {
Resources.subtractFrom(toObtainByPartition,
rmContainer.getAllocatedResource());

View File

@ -229,8 +229,7 @@ private boolean canPreemptEnoughResourceForAsked(Resource requiredResource,
// If we already can allocate the reserved container after preemption,
// skip following steps
if (Resources.fitsIn(rc, clusterResource, lacking,
Resources.none())) {
if (Resources.fitsIn(rc, lacking, Resources.none())) {
return true;
}
@ -270,7 +269,7 @@ private boolean canPreemptEnoughResourceForAsked(Resource requiredResource,
}
// Lacking <= 0 means we can allocate the reserved container
if (Resources.fitsIn(rc, clusterResource, lacking, Resources.none())) {
if (Resources.fitsIn(rc, lacking, Resources.none())) {
return true;
}
}

View File

@ -87,8 +87,8 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
// Get list of nodes for preemption, ordered by preemption cost
List<NodeForPreemption> nodesForPreemption = getNodesForPreemption(
clusterResource, queueToPreemptableResourceByPartition,
selectedCandidates, totalPreemptedResourceAllowed);
queueToPreemptableResourceByPartition, selectedCandidates,
totalPreemptedResourceAllowed);
for (NodeForPreemption nfp : nodesForPreemption) {
RMContainer reservedContainer = nfp.schedulerNode.getReservedContainer();
@ -97,9 +97,8 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
}
NodeForPreemption preemptionResult = getPreemptionCandidatesOnNode(
nfp.schedulerNode, clusterResource,
queueToPreemptableResourceByPartition, selectedCandidates,
totalPreemptedResourceAllowed, false);
nfp.schedulerNode, queueToPreemptableResourceByPartition,
selectedCandidates, totalPreemptedResourceAllowed, false);
if (null != preemptionResult) {
for (RMContainer c : preemptionResult.selectedContainers) {
ApplicationAttemptId appId = c.getApplicationAttemptId();
@ -135,8 +134,7 @@ private Resource getPreemptableResource(String queueName,
return preemptable;
}
private boolean tryToPreemptFromQueue(Resource cluster, String queueName,
String partitionName,
private boolean tryToPreemptFromQueue(String queueName, String partitionName,
Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
Resource required, Resource totalPreemptionAllowed, boolean readOnly) {
Resource preemptable = getPreemptableResource(queueName, partitionName,
@ -145,11 +143,11 @@ private boolean tryToPreemptFromQueue(Resource cluster, String queueName,
return false;
}
if (!Resources.fitsIn(rc, cluster, required, preemptable)) {
if (!Resources.fitsIn(rc, required, preemptable)) {
return false;
}
if (!Resources.fitsIn(rc, cluster, required, totalPreemptionAllowed)) {
if (!Resources.fitsIn(rc, required, totalPreemptionAllowed)) {
return false;
}
@ -165,7 +163,6 @@ private boolean tryToPreemptFromQueue(Resource cluster, String queueName,
/**
* Try to check if we can preempt resources for reserved container in given node
* @param node
* @param cluster
* @param queueToPreemptableResourceByPartition it's a map of
* <queueName, <partition, preemptable-resource>>
* @param readOnly do we want to modify preemptable resource after we selected
@ -174,7 +171,7 @@ private boolean tryToPreemptFromQueue(Resource cluster, String queueName,
* to satisfy reserved resource
*/
private NodeForPreemption getPreemptionCandidatesOnNode(
FiCaSchedulerNode node, Resource cluster,
FiCaSchedulerNode node,
Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptionAllowed, boolean readOnly) {
@ -204,8 +201,7 @@ private NodeForPreemption getPreemptionCandidatesOnNode(
String partition = node.getPartition();
// Avoid preempt any container if required <= available + killable
if (Resources.fitsIn(rc, cluster, reservedContainer.getReservedResource(),
cur)) {
if (Resources.fitsIn(rc, reservedContainer.getReservedResource(), cur)) {
return null;
}
@ -232,9 +228,9 @@ private NodeForPreemption getPreemptionCandidatesOnNode(
// Can we preempt container c?
// Check if we have quota to preempt this container
boolean canPreempt = tryToPreemptFromQueue(cluster, containerQueueName,
partition, queueToPreemptableResourceByPartition,
c.getAllocatedResource(), totalPreemptionAllowed, readOnly);
boolean canPreempt = tryToPreemptFromQueue(containerQueueName, partition,
queueToPreemptableResourceByPartition, c.getAllocatedResource(),
totalPreemptionAllowed, readOnly);
// If we can, add to selected container, and change resource accordingly.
if (canPreempt) {
@ -246,7 +242,7 @@ private NodeForPreemption getPreemptionCandidatesOnNode(
Resources.addTo(totalSelected, c.getAllocatedResource());
}
Resources.addTo(cur, c.getAllocatedResource());
if (Resources.fitsIn(rc, cluster,
if (Resources.fitsIn(rc,
reservedContainer.getReservedResource(), cur)) {
canAllocateReservedContainer = true;
break;
@ -282,7 +278,7 @@ private NodeForPreemption getPreemptionCandidatesOnNode(
return nfp;
}
private List<NodeForPreemption> getNodesForPreemption(Resource cluster,
private List<NodeForPreemption> getNodesForPreemption(
Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptionAllowed) {
@ -292,7 +288,7 @@ private List<NodeForPreemption> getNodesForPreemption(Resource cluster,
for (FiCaSchedulerNode node : preemptionContext.getScheduler()
.getAllNodes()) {
if (node.getReservedContainer() != null) {
NodeForPreemption nfp = getPreemptionCandidatesOnNode(node, cluster,
NodeForPreemption nfp = getPreemptionCandidatesOnNode(node,
queueToPreemptableResourceByPartition, selectedCandidates,
totalPreemptionAllowed, true);
if (null != nfp) {

View File

@ -909,7 +909,7 @@ public boolean accept(Resource cluster,
maxResourceLimit = labelManager.getResourceByLabel(
schedulerContainer.getNodePartition(), cluster);
}
if (!Resources.fitsIn(resourceCalculator, cluster,
if (!Resources.fitsIn(resourceCalculator,
Resources.add(queueUsage.getUsed(partition), netAllocated),
maxResourceLimit)) {
if (LOG.isDebugEnabled()) {

View File

@ -548,10 +548,7 @@ private ContainerAllocation assignContainer(Resource clusterResource,
toKillContainers.add(killableContainer);
Resources.addTo(availableAndKillable,
killableContainer.getAllocatedResource());
if (Resources.fitsIn(rc,
clusterResource,
capability,
availableAndKillable)) {
if (Resources.fitsIn(rc, capability, availableAndKillable)) {
// Stop if we find enough spaces
availableContainers = 1;
break;
@ -579,8 +576,7 @@ private ContainerAllocation assignContainer(Resource clusterResource,
// under the limit.
resourceNeedToUnReserve = capability;
}
unreservedContainer =
application.findNodeToUnreserve(clusterResource, node,
unreservedContainer = application.findNodeToUnreserve(node,
schedulerKey, resourceNeedToUnReserve);
// When (minimum-unreserved-resource > 0 OR we cannot allocate
// new/reserved

View File

@ -314,7 +314,6 @@ private boolean anyContainerInFinalState(
}
private boolean commonCheckContainerAllocation(
Resource cluster,
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation,
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
// Make sure node is not reserved by anyone else
@ -355,8 +354,7 @@ private boolean commonCheckContainerAllocation(
}
}
}
if (!Resources.fitsIn(rc, cluster,
allocation.getAllocatedOrReservedResource(),
if (!Resources.fitsIn(rc, allocation.getAllocatedOrReservedResource(),
availableResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Node doesn't have enough available resource, asked="
@ -419,8 +417,7 @@ public boolean accept(Resource cluster,
// Common part of check container allocation regardless if it is a
// increase container or regular container
commonCheckContainerAllocation(cluster, allocation,
schedulerContainer);
commonCheckContainerAllocation(allocation, schedulerContainer);
} else {
// Container reserved first time will be NEW, after the container
// accepted & confirmed, it will become RESERVED state
@ -721,9 +718,8 @@ public Allocation getAllocation(ResourceCalculator resourceCalculator,
}
@VisibleForTesting
public NodeId getNodeIdToUnreserve(
SchedulerRequestKey schedulerKey, Resource resourceNeedUnreserve,
ResourceCalculator rc, Resource clusterResource) {
public NodeId getNodeIdToUnreserve(SchedulerRequestKey schedulerKey,
Resource resourceNeedUnreserve, ResourceCalculator resourceCalculator) {
// first go around make this algorithm simple and just grab first
// reservation that has enough resources
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
@ -738,7 +734,7 @@ public NodeId getNodeIdToUnreserve(
// make sure we unreserve one with at least the same amount of
// resources, otherwise could affect capacity limits
if (Resources.fitsIn(rc, clusterResource, resourceNeedUnreserve,
if (Resources.fitsIn(resourceCalculator, resourceNeedUnreserve,
reservedResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
@ -806,14 +802,13 @@ public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node,
}
@VisibleForTesting
public RMContainer findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
Resource minimumUnreservedResource) {
public RMContainer findNodeToUnreserve(FiCaSchedulerNode node,
SchedulerRequestKey schedulerKey, Resource minimumUnreservedResource) {
try {
readLock.lock();
// need to unreserve some other container first
NodeId idToUnreserve = getNodeIdToUnreserve(schedulerKey,
minimumUnreservedResource, rc, clusterResource);
minimumUnreservedResource, rc);
if (idToUnreserve == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("checked to see if could unreserve for app but nothing "

View File

@ -896,8 +896,7 @@ public void testGetAppToUnreserve() throws Exception {
String host_1 = "host_1";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
8 * GB);
Resource clusterResource = Resources.createResource(2 * 8 * GB);
// Setup resource-requests
Priority p = TestUtils.createMockPriority(5);
@ -933,28 +932,27 @@ public void testGetAppToUnreserve() throws Exception {
node_0.getNodeID(), "user", rmContext);
// no reserved containers
NodeId unreserveId =
app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
NodeId unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator());
assertEquals(null, unreserveId);
// no reserved containers - reserve then unreserve
app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
app_0.unreserve(priorityMap, node_0, rmContainer_1);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
cs.getResourceCalculator());
assertEquals(null, unreserveId);
// no container large enough is reserved
app_0.reserve(node_0, priorityMap, rmContainer_1, container_1);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
cs.getResourceCalculator());
assertEquals(null, unreserveId);
// reserve one that is now large enough
app_0.reserve(node_1, priorityMap, rmContainer, container);
unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability,
cs.getResourceCalculator(), clusterResource);
cs.getResourceCalculator());
assertEquals(node_1.getNodeID(), unreserveId);
}
@ -1001,16 +999,14 @@ public void testFindNodeToUnreserve() throws Exception {
node_1.getNodeID(), "user", rmContext);
// nothing reserved
RMContainer toUnreserveContainer =
app_0.findNodeToUnreserve(csContext.getClusterResource(), node_1,
RMContainer toUnreserveContainer = app_0.findNodeToUnreserve(node_1,
priorityMap, capability);
assertTrue(toUnreserveContainer == null);
// reserved but scheduler doesn't know about that node.
app_0.reserve(node_1, priorityMap, rmContainer, container);
node_1.reserveResource(app_0, priorityMap, rmContainer);
toUnreserveContainer =
app_0.findNodeToUnreserve(csContext.getClusterResource(), node_1,
toUnreserveContainer = app_0.findNodeToUnreserve(node_1,
priorityMap, capability);
assertTrue(toUnreserveContainer == null);
}