YARN-7411. Inter-Queue preemption's computeFixpointAllocation need to handle absolute resources while computing normalizedGuarantee. (Sunil G via wangda)
Change-Id: I41b1d7558c20fc4eb2050d40134175a2ef6330cb
This commit is contained in:
parent
aa3f62740f
commit
034b312d9f
@ -26,7 +26,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceInformationProto;
|
||||
@ -152,17 +151,6 @@ private void initResources() {
|
||||
.newInstance(ResourceInformation.VCORES);
|
||||
this.setMemorySize(p.getMemory());
|
||||
this.setVirtualCores(p.getVirtualCores());
|
||||
|
||||
// Update missing resource information on respective index.
|
||||
updateResourceInformationMap(types);
|
||||
}
|
||||
|
||||
private void updateResourceInformationMap(ResourceInformation[] types) {
|
||||
for (int i = 0; i < types.length; i++) {
|
||||
if (resources[i] == null) {
|
||||
resources[i] = ResourceInformation.newInstance(types[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static ResourceInformation newDefaultInformation(
|
||||
|
@ -111,6 +111,14 @@ public Resource multiplyAndNormalizeUp(Resource r, double by,
|
||||
stepFactor.getMemorySize()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource multiplyAndNormalizeUp(Resource r, double[] by,
|
||||
Resource stepFactor) {
|
||||
return Resources.createResource(
|
||||
roundUp((long) (r.getMemorySize() * by[0] + 0.5),
|
||||
stepFactor.getMemorySize()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource multiplyAndNormalizeDown(Resource r, double by,
|
||||
Resource stepFactor) {
|
||||
|
@ -495,6 +495,27 @@ private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource multiplyAndNormalizeUp(Resource r, double[] by,
|
||||
Resource stepFactor) {
|
||||
Resource ret = Resource.newInstance(r);
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
ResourceInformation rResourceInformation = r.getResourceInformation(i);
|
||||
ResourceInformation stepFactorResourceInformation = stepFactor
|
||||
.getResourceInformation(i);
|
||||
|
||||
long rValue = rResourceInformation.getValue();
|
||||
long stepFactorValue = UnitsConversionUtil.convert(
|
||||
stepFactorResourceInformation.getUnits(),
|
||||
rResourceInformation.getUnits(),
|
||||
stepFactorResourceInformation.getValue());
|
||||
ret.setResourceValue(i, ResourceCalculator
|
||||
.roundUp((long) Math.ceil(rValue * by[i]), stepFactorValue));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource multiplyAndNormalizeUp(Resource r, double by,
|
||||
Resource stepFactor) {
|
||||
|
@ -125,7 +125,19 @@ public abstract long computeAvailableContainers(
|
||||
*/
|
||||
public abstract Resource multiplyAndNormalizeUp(
|
||||
Resource r, double by, Resource stepFactor);
|
||||
|
||||
|
||||
/**
|
||||
* Multiply resource <code>r</code> by factor <code>by</code>
|
||||
* and normalize up using step-factor <code>stepFactor</code>.
|
||||
*
|
||||
* @param r resource to be multiplied
|
||||
* @param by multiplier array for all resource types
|
||||
* @param stepFactor factor by which to normalize up
|
||||
* @return resulting normalized resource
|
||||
*/
|
||||
public abstract Resource multiplyAndNormalizeUp(
|
||||
Resource r, double[] by, Resource stepFactor);
|
||||
|
||||
/**
|
||||
* Multiply resource <code>r</code> by factor <code>by</code>
|
||||
* and normalize down using step-factor <code>stepFactor</code>.
|
||||
|
@ -347,6 +347,11 @@ public static Resource multiplyAndAddTo(
|
||||
return lhs;
|
||||
}
|
||||
|
||||
public static Resource multiplyAndNormalizeUp(ResourceCalculator calculator,
|
||||
Resource lhs, double[] by, Resource factor) {
|
||||
return calculator.multiplyAndNormalizeUp(lhs, by, factor);
|
||||
}
|
||||
|
||||
public static Resource multiplyAndNormalizeUp(
|
||||
ResourceCalculator calculator,Resource lhs, double by, Resource factor) {
|
||||
return calculator.multiplyAndNormalizeUp(lhs, by, factor);
|
||||
|
@ -19,8 +19,11 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
|
||||
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -198,18 +201,33 @@ protected void computeFixpointAllocation(Resource totGuarant,
|
||||
private void resetCapacity(Resource clusterResource,
|
||||
Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
|
||||
Resource activeCap = Resource.newInstance(0, 0);
|
||||
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
|
||||
|
||||
if (ignoreGuar) {
|
||||
for (TempQueuePerPartition q : queues) {
|
||||
q.normalizedGuarantee = 1.0f / queues.size();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
q.normalizedGuarantee[i] = 1.0f / queues.size();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (TempQueuePerPartition q : queues) {
|
||||
Resources.addTo(activeCap, q.getGuaranteed());
|
||||
}
|
||||
for (TempQueuePerPartition q : queues) {
|
||||
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
|
||||
q.getGuaranteed(), activeCap);
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
ResourceInformation nResourceInformation = q.getGuaranteed()
|
||||
.getResourceInformation(i);
|
||||
ResourceInformation dResourceInformation = activeCap
|
||||
.getResourceInformation(i);
|
||||
|
||||
long nValue = nResourceInformation.getValue();
|
||||
long dValue = UnitsConversionUtil.convert(
|
||||
dResourceInformation.getUnits(), nResourceInformation.getUnits(),
|
||||
dResourceInformation.getValue());
|
||||
if (dValue != 0) {
|
||||
q.normalizedGuarantee[i] = (float) nValue / dValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,9 +22,11 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
@ -46,7 +48,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
|
||||
Resource untouchableExtra;
|
||||
Resource preemptableExtra;
|
||||
|
||||
double normalizedGuarantee;
|
||||
double[] normalizedGuarantee;
|
||||
|
||||
private Resource effMinRes;
|
||||
private Resource effMaxRes;
|
||||
@ -88,7 +90,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
|
||||
pendingDeductReserved = Resources.createResource(0);
|
||||
}
|
||||
|
||||
this.normalizedGuarantee = Float.NaN;
|
||||
this.normalizedGuarantee = new double[ResourceUtils
|
||||
.getNumberOfKnownResourceTypes()];
|
||||
this.children = new ArrayList<>();
|
||||
this.apps = new ArrayList<>();
|
||||
this.untouchableExtra = Resource.newInstance(0, 0);
|
||||
@ -240,8 +243,9 @@ public String toString() {
|
||||
sb.append(" NAME: " + queueName).append(" CUR: ").append(current)
|
||||
.append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
|
||||
.append(" GAR: ").append(getGuaranteed()).append(" NORM: ")
|
||||
.append(normalizedGuarantee).append(" IDEAL_ASSIGNED: ")
|
||||
.append(idealAssigned).append(" IDEAL_PREEMPT: ").append(toBePreempted)
|
||||
.append(Arrays.toString(normalizedGuarantee))
|
||||
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
|
||||
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
|
||||
.append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
|
||||
.append(" UNTOUCHABLE: ").append(untouchableExtra)
|
||||
.append(" PREEMPTABLE: ").append(preemptableExtra).append("\n");
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
@ -56,6 +57,7 @@
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -533,6 +535,18 @@ private Resource parseResourceFromString(String p) {
|
||||
} else {
|
||||
res = Resources.createResource(Integer.valueOf(resource[0]),
|
||||
Integer.valueOf(resource[1]));
|
||||
if (resource.length > 2) {
|
||||
// Using the same order of resources from ResourceUtils, set resource
|
||||
// informations.
|
||||
ResourceInformation[] storedResourceInfo = ResourceUtils
|
||||
.getResourceTypesArray();
|
||||
for (int i = 2; i < resource.length; i++) {
|
||||
res.setResourceInformation(storedResourceInfo[i].getName(),
|
||||
ResourceInformation.newInstance(storedResourceInfo[i].getName(),
|
||||
storedResourceInfo[i].getUnits(),
|
||||
Integer.valueOf(resource[i])));
|
||||
}
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -18,11 +18,17 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Mockito.never;
|
||||
@ -613,4 +619,74 @@ public void testNodePartitionPreemptionWithVCoreResource() throws IOException {
|
||||
verify(mDisp, never()).handle(
|
||||
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNormalizeGuaranteeWithMultipleResource() throws IOException {
|
||||
// Initialize resource map
|
||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
String RESOURCE_1 = "res1";
|
||||
|
||||
// Initialize mandatory resources
|
||||
ResourceInformation memory = ResourceInformation.newInstance(
|
||||
ResourceInformation.MEMORY_MB.getName(),
|
||||
ResourceInformation.MEMORY_MB.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||
ResourceInformation vcores = ResourceInformation.newInstance(
|
||||
ResourceInformation.VCORES.getName(),
|
||||
ResourceInformation.VCORES.getUnits(),
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||
riMap.put(ResourceInformation.MEMORY_URI, memory);
|
||||
riMap.put(ResourceInformation.VCORES_URI, vcores);
|
||||
riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
|
||||
ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
|
||||
/**
|
||||
* Queue structure is:
|
||||
*
|
||||
* <pre>
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* / \ / \
|
||||
* a1 a2 b1 b2
|
||||
* </pre>
|
||||
*
|
||||
* a1 and b2 are using most of resources.
|
||||
* a2 and b1 needs more resources. Both are under served.
|
||||
* hence demand will consider both queue's need while trying to
|
||||
* do preemption.
|
||||
*/
|
||||
String labelsConfig =
|
||||
"=100,true;";
|
||||
String nodesConfig =
|
||||
"n1=;"; // n1 is default partition
|
||||
String queuesConfig =
|
||||
// guaranteed,max,used,pending
|
||||
"root(=[100:100:10 100:100:10 100:100:10 100:100:10]);" + //root
|
||||
"-a(=[50:80:4 100:100:10 80:90:10 30:20:4]);" + // a
|
||||
"--a1(=[25:30:2 100:50:10 80:90:10 0]);" + // a1
|
||||
"--a2(=[25:50:2 100:50:10 0 30:20:4]);" + // a2
|
||||
"-b(=[50:20:6 100:100:10 20:10 40:50:8]);" + // b
|
||||
"--b1(=[25:5:4 100:20:10 0 20:10:4]);" + // b1
|
||||
"--b2(=[25:15:2 100:20:10 20:10 20:10:4])"; // b2
|
||||
String appsConfig=
|
||||
//queueName\t(priority,resource,host,expression,#repeat,reserved)
|
||||
"a1\t" // app1 in a1
|
||||
+ "(1,8:9:1,n1,,10,false);" +
|
||||
"b2\t" // app2 in b2
|
||||
+ "(1,2:1,n1,,10,false)"; // 80 of y
|
||||
|
||||
buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
|
||||
policy.editSchedule();
|
||||
|
||||
verify(mDisp, times(7)).handle(
|
||||
argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
|
||||
|
||||
riMap.remove(RESOURCE_1);
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user