YARN-5270. Solve miscellaneous issues caused by YARN-4844. Contributed by Wangda Tan

This commit is contained in:
Jian He 2016-07-11 22:36:20 -07:00
parent f292624bd8
commit 819224dcf9
37 changed files with 175 additions and 130 deletions

View File

@ -135,7 +135,6 @@
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
@ -667,7 +666,7 @@ public TaskAttemptImpl(TaskId taskId, int i,
//TODO:create the resource reqt for this Task attempt
this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
this.resourceCapability.setMemory(
this.resourceCapability.setMemorySize(
getMemoryRequired(conf, taskId.getTaskType()));
this.resourceCapability.setVirtualCores(
getCpuRequired(conf, taskId.getTaskType()));

View File

@ -389,7 +389,7 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
}
}
// set the resources
reqEvent.getCapability().setMemory(mapResourceRequest.getMemorySize());
reqEvent.getCapability().setMemorySize(mapResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(
mapResourceRequest.getVirtualCores());
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
@ -417,7 +417,7 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
}
}
// set the resources
reqEvent.getCapability().setMemory(reduceResourceRequest.getMemorySize());
reqEvent.getCapability().setMemorySize(reduceResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(
reduceResourceRequest.getVirtualCores());
if (reqEvent.getEarlierAttemptFailed()) {

View File

@ -254,7 +254,7 @@ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
// the job can reaches the final state when MRAppMaster shuts down.
this.successfullyUnregistered.set(unregistered);
this.assignedQueue = assignedQueue;
this.resource = Resource.newInstance(1234L, 2L);
this.resource = Resource.newInstance(1234L, 2);
}
@Override

View File

@ -89,7 +89,7 @@ public void testFromYarn() throws Exception {
ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class);
Resource r = Records.newRecord(Resource.class);
r.setMemory(2048);
r.setMemorySize(2048);
appUsageRpt.setNeededResources(r);
appUsageRpt.setNumReservedContainers(1);
appUsageRpt.setNumUsedContainers(3);
@ -128,7 +128,7 @@ public void testFromYarnApplicationReport() {
ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class);
Resource r = Records.newRecord(Resource.class);
r.setMemory(2048);
r.setMemorySize(2048);
appUsageRpt.setNeededResources(r);
appUsageRpt.setNumReservedContainers(1);
appUsageRpt.setNumUsedContainers(3);

View File

@ -337,7 +337,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
// Setup resource requirements
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemory(
capability.setMemorySize(
conf.getInt(
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
)

View File

@ -323,7 +323,7 @@ private void startAMFromSLSTraces(Resource containerResource,
if (jsonTask.containsKey("container.memory")) {
int containerMemory = Integer.parseInt(
jsonTask.get("container.memory").toString());
res.setMemory(containerMemory);
res.setMemorySize(containerMemory);
}
if (jsonTask.containsKey("container.vcores")) {

View File

@ -542,11 +542,11 @@ public Long getValue() {
}
);
metrics.register("variable.cluster.allocated.vcores",
new Gauge<Long>() {
new Gauge<Integer>() {
@Override
public Long getValue() {
public Integer getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0L;
return 0;
} else {
return scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
}
@ -566,11 +566,11 @@ public Long getValue() {
}
);
metrics.register("variable.cluster.available.vcores",
new Gauge<Long>() {
new Gauge<Integer>() {
@Override
public Long getValue() {
public Integer getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0L;
return 0;
} else {
return scheduler.getRootQueueMetrics().getAvailableVirtualCores();
}

View File

@ -519,11 +519,11 @@ public Long getValue() {
}
);
metrics.register("variable.cluster.allocated.vcores",
new Gauge<Long>() {
new Gauge<Integer>() {
@Override
public Long getValue() {
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0L;
return 0;
} else {
return getRootQueueMetrics().getAllocatedVirtualCores();
}
@ -543,11 +543,11 @@ public Long getValue() {
}
);
metrics.register("variable.cluster.available.vcores",
new Gauge<Long>() {
new Gauge<Integer>() {
@Override
public Long getValue() {
public Integer getValue() {
if(getRootQueueMetrics() == null) {
return 0l;
return 0;
} else {
return getRootQueueMetrics().getAvailableVirtualCores();
}

View File

@ -18,14 +18,14 @@
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.util.Records;
/**
* <p><code>Resource</code> models a set of computer resources in the
* cluster.</p>
@ -55,9 +55,18 @@ public abstract class Resource implements Comparable<Resource> {
@Public
@Stable
public static Resource newInstance(long memory, long vCores) {
public static Resource newInstance(int memory, int vCores) {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(memory);
resource.setMemorySize(memory);
resource.setVirtualCores(vCores);
return resource;
}
@Public
@Stable
public static Resource newInstance(long memory, int vCores) {
Resource resource = Records.newRecord(Resource.class);
resource.setMemorySize(memory);
resource.setVirtualCores(vCores);
return resource;
}
@ -77,17 +86,31 @@ public static Resource newInstance(long memory, long vCores) {
* Get <em>memory</em> of the resource.
* @return <em>memory</em> of the resource
*/
@Private
@Unstable
public abstract long getMemorySize();
@Public
@Stable
public long getMemorySize() {
throw new NotImplementedException(
"This method is implemented by ResourcePBImpl");
}
/**
* Set <em>memory</em> of the resource.
* @param memory <em>memory</em> of the resource
*/
@Public
@Deprecated
public abstract void setMemory(int memory);
/**
* Set <em>memory</em> of the resource.
* @param memory <em>memory</em> of the resource
*/
@Public
@Stable
public abstract void setMemory(long memory);
public void setMemorySize(long memory) {
throw new NotImplementedException(
"This method is implemented by ResourcePBImpl");
}
/**
@ -103,10 +126,6 @@ public static Resource newInstance(long memory, long vCores) {
@Public
@Evolving
public abstract int getVirtualCores();
@Public
@Unstable
public abstract long getVirtualCoresSize();
/**
* Set <em>number of virtual cpu cores</em> of the resource.
@ -120,7 +139,7 @@ public static Resource newInstance(long memory, long vCores) {
*/
@Public
@Evolving
public abstract void setVirtualCores(long vCores);
public abstract void setVirtualCores(int vCores);
@Override
public int hashCode() {

View File

@ -55,7 +55,7 @@ message ContainerIdProto {
message ResourceProto {
optional int64 memory = 1;
optional int64 virtual_cores = 2;
optional int32 virtual_cores = 2;
}
message ResourceUtilizationProto {

View File

@ -171,7 +171,7 @@ public void testSubmitApplicationOnHA() throws Exception {
Records.newRecord(ContainerLaunchContext.class);
appContext.setAMContainerSpec(amContainer);
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(10);
capability.setMemorySize(10);
capability.setVirtualCores(1);
appContext.setResource(capability);
ApplicationId appId = client.submitApplication(appContext);

View File

@ -67,24 +67,25 @@ public long getMemorySize() {
}
@Override
public void setMemory(long memory) {
@SuppressWarnings("deprecation")
public void setMemory(int memory) {
setMemorySize(memory);
}
@Override
public void setMemorySize(long memory) {
maybeInitBuilder();
builder.setMemory(memory);
}
@Override
public int getVirtualCores() {
return (int) getVirtualCoresSize();
}
@Override
public long getVirtualCoresSize() {
ResourceProtoOrBuilder p = viaProto ? proto : builder;
return p.getVirtualCores();
}
@Override
public void setVirtualCores(long vCores) {
public void setVirtualCores(int vCores) {
maybeInitBuilder();
builder.setVirtualCores(vCores);
}

View File

@ -56,7 +56,7 @@ public float ratio(Resource a, Resource b) {
}
@Override
public Resource divideAndCeil(Resource numerator, long denominator) {
public Resource divideAndCeil(Resource numerator, int denominator) {
return Resources.createResource(
divideAndCeil(numerator.getMemorySize(), denominator));
}

View File

@ -142,7 +142,7 @@ public float ratio(Resource a, Resource b) {
}
@Override
public Resource divideAndCeil(Resource numerator, long denominator) {
public Resource divideAndCeil(Resource numerator, int denominator) {
return Resources.createResource(
divideAndCeil(numerator.getMemorySize(), denominator),
divideAndCeil(numerator.getVirtualCores(), denominator)
@ -157,7 +157,7 @@ public Resource normalize(Resource r, Resource minimumResource,
Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
stepFactor.getMemorySize()),
maximumResource.getMemorySize());
long normalizedCores = Math.min(
int normalizedCores = Math.min(
roundUp(
Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
stepFactor.getVirtualCores()),
@ -189,7 +189,7 @@ public Resource multiplyAndNormalizeUp(Resource r, double by,
roundUp(
(int)Math.ceil(r.getMemorySize() * by), stepFactor.getMemorySize()),
roundUp(
(int)Math.ceil(r.getVirtualCores() * by),
(int)Math.ceil(r.getVirtualCores() * by),
stepFactor.getVirtualCores())
);
}
@ -203,7 +203,7 @@ public Resource multiplyAndNormalizeDown(Resource r, double by,
stepFactor.getMemorySize()
),
roundDown(
(int)(r.getVirtualCores() * by),
(int)(r.getVirtualCores() * by),
stepFactor.getVirtualCores()
)
);

View File

@ -30,6 +30,13 @@ public abstract class ResourceCalculator {
public abstract int
compare(Resource clusterResource, Resource lhs, Resource rhs);
public static int divideAndCeil(int a, int b) {
if (b == 0) {
return 0;
}
return (a + (b - 1)) / b;
}
public static long divideAndCeil(long a, long b) {
if (b == 0) {
@ -38,6 +45,10 @@ public static long divideAndCeil(long a, long b) {
return (a + (b - 1)) / b;
}
public static int roundUp(int a, int b) {
return divideAndCeil(a, b) * b;
}
public static long roundUp(long a, long b) {
return divideAndCeil(a, b) * b;
}
@ -46,6 +57,10 @@ public static long roundDown(long a, long b) {
return (a / b) * b;
}
public static int roundDown(int a, int b) {
return (a / b) * b;
}
/**
* Compute the number of containers which can be allocated given
* <code>available</code> and <code>required</code> resources.
@ -169,7 +184,7 @@ public abstract float divide(
* @param denominator denominator
* @return resultant resource
*/
public abstract Resource divideAndCeil(Resource numerator, long denominator);
public abstract Resource divideAndCeil(Resource numerator, int denominator);
/**
* Check if a smaller resource can be contained by bigger resource.

View File

@ -42,7 +42,13 @@ public long getMemorySize() {
}
@Override
public void setMemory(long memory) {
public void setMemorySize(long memory) {
throw new RuntimeException("NONE cannot be modified!");
}
@Override
@SuppressWarnings("deprecation")
public void setMemory(int memory) {
throw new RuntimeException("NONE cannot be modified!");
}
@ -52,12 +58,7 @@ public int getVirtualCores() {
}
@Override
public long getVirtualCoresSize() {
return 0;
}
@Override
public void setVirtualCores(long cores) {
public void setVirtualCores(int cores) {
throw new RuntimeException("NONE cannot be modified!");
}
@ -86,7 +87,13 @@ public long getMemorySize() {
}
@Override
public void setMemory(long memory) {
@SuppressWarnings("deprecation")
public void setMemory(int memory) {
throw new RuntimeException("UNBOUNDED cannot be modified!");
}
@Override
public void setMemorySize(long memory) {
throw new RuntimeException("UNBOUNDED cannot be modified!");
}
@ -96,12 +103,7 @@ public int getVirtualCores() {
}
@Override
public long getVirtualCoresSize() {
return Long.MAX_VALUE;
}
@Override
public void setVirtualCores(long cores) {
public void setVirtualCores(int cores) {
throw new RuntimeException("UNBOUNDED cannot be modified!");
}
@ -109,20 +111,31 @@ public void setVirtualCores(long cores) {
public int compareTo(Resource o) {
long diff = Long.MAX_VALUE - o.getMemorySize();
if (diff == 0) {
diff = Long.MAX_VALUE - o.getVirtualCoresSize();
diff = Integer.MAX_VALUE - o.getVirtualCores();
}
return Long.signum(diff);
}
};
public static Resource createResource(int memory) {
return createResource(memory, (memory > 0) ? 1 : 0);
}
public static Resource createResource(int memory, int cores) {
Resource resource = Records.newRecord(Resource.class);
resource.setMemorySize(memory);
resource.setVirtualCores(cores);
return resource;
}
public static Resource createResource(long memory) {
return createResource(memory, (memory > 0) ? 1 : 0);
}
public static Resource createResource(long memory, long cores) {
public static Resource createResource(long memory, int cores) {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(memory);
resource.setMemorySize(memory);
resource.setVirtualCores(cores);
return resource;
}
@ -140,7 +153,7 @@ public static Resource clone(Resource res) {
}
public static Resource addTo(Resource lhs, Resource rhs) {
lhs.setMemory(lhs.getMemorySize() + rhs.getMemorySize());
lhs.setMemorySize(lhs.getMemorySize() + rhs.getMemorySize());
lhs.setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores());
return lhs;
}
@ -150,7 +163,7 @@ public static Resource add(Resource lhs, Resource rhs) {
}
public static Resource subtractFrom(Resource lhs, Resource rhs) {
lhs.setMemory(lhs.getMemorySize() - rhs.getMemorySize());
lhs.setMemorySize(lhs.getMemorySize() - rhs.getMemorySize());
lhs.setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores());
return lhs;
}
@ -164,7 +177,7 @@ public static Resource negate(Resource resource) {
}
public static Resource multiplyTo(Resource lhs, double by) {
lhs.setMemory((int)(lhs.getMemorySize() * by));
lhs.setMemorySize((long)(lhs.getMemorySize() * by));
lhs.setVirtualCores((int)(lhs.getVirtualCores() * by));
return lhs;
}
@ -179,7 +192,7 @@ public static Resource multiply(Resource lhs, double by) {
*/
public static Resource multiplyAndAddTo(
Resource lhs, Resource rhs, double by) {
lhs.setMemory(lhs.getMemorySize() + (int)(rhs.getMemorySize() * by));
lhs.setMemorySize(lhs.getMemorySize() + (long)(rhs.getMemorySize() * by));
lhs.setVirtualCores(lhs.getVirtualCores()
+ (int)(rhs.getVirtualCores() * by));
return lhs;
@ -197,7 +210,7 @@ public static Resource multiplyAndNormalizeDown(
public static Resource multiplyAndRoundDown(Resource lhs, double by) {
Resource out = clone(lhs);
out.setMemory((int)(lhs.getMemorySize() * by));
out.setMemorySize((long)(lhs.getMemorySize() * by));
out.setVirtualCores((int)(lhs.getVirtualCores() * by));
return out;
}

View File

@ -24,18 +24,18 @@
public class TestResources {
public Resource createResource(long memory, long vCores) {
public Resource createResource(long memory, int vCores) {
return Resource.newInstance(memory, vCores);
}
@Test(timeout=1000)
public void testCompareToWithUnboundedResource() {
assertTrue(Resources.unbounded().compareTo(
createResource(Long.MAX_VALUE, Long.MAX_VALUE)) == 0);
createResource(Long.MAX_VALUE, Integer.MAX_VALUE)) == 0);
assertTrue(Resources.unbounded().compareTo(
createResource(Long.MAX_VALUE, 0)) > 0);
assertTrue(Resources.unbounded().compareTo(
createResource(0, Long.MAX_VALUE)) > 0);
createResource(0, Integer.MAX_VALUE)) > 0);
}
@Test(timeout=1000)

View File

@ -64,7 +64,6 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.annotations.VisibleForTesting;
@ -442,9 +441,9 @@ public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
return report;
}
public static Resource newResource(long memory, long vCores) {
public static Resource newResource(long memory, int vCores) {
Resource resource = recordFactory.newRecordInstance(Resource.class);
resource.setMemory(memory);
resource.setMemorySize(memory);
resource.setVirtualCores(vCores);
return resource;
}

View File

@ -201,7 +201,7 @@ public void testRegisterNodeManagerRequestPBImpl() {
original.setHttpPort(8080);
original.setNodeId(getNodeId());
Resource resource = recordFactory.newRecordInstance(Resource.class);
resource.setMemory(10000);
resource.setMemorySize(10000);
resource.setVirtualCores(2);
original.setResource(resource);
RegisterNodeManagerRequestPBImpl copy = new RegisterNodeManagerRequestPBImpl(

View File

@ -39,7 +39,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@ -465,7 +464,7 @@ protected ResourceRequest createResourceRequest(String resource,
pri.setPriority(priority);
req.setPriority(pri);
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(memory);
capability.setMemorySize(memory);
capability.setVirtualCores(vCores);
req.setCapability(capability);
if (labelExpression != null) {

View File

@ -33,16 +33,16 @@ public class TestNodeManagerMetrics {
DefaultMetricsSystem.initialize("NodeManager");
NodeManagerMetrics metrics = NodeManagerMetrics.create();
Resource total = Records.newRecord(Resource.class);
total.setMemory(8*GiB);
total.setMemorySize(8*GiB);
total.setVirtualCores(16);
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(512); //512MiB
resource.setMemorySize(512); //512MiB
resource.setVirtualCores(2);
Resource largerResource = Records.newRecord(Resource.class);
largerResource.setMemory(1024);
largerResource.setMemorySize(1024);
largerResource.setVirtualCores(2);
Resource smallerResource = Records.newRecord(Resource.class);
smallerResource.setMemory(256);
smallerResource.setMemorySize(256);
smallerResource.setVirtualCores(1);
metrics.addResource(total);

View File

@ -57,7 +57,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
// Max allocation
private long maxNodeMemory = -1;
private long maxNodeVCores = -1;
private int maxNodeVCores = -1;
private Resource configuredMaxAllocation;
private boolean forceConfiguredMaxAllocation = true;
private long configuredMaxAllocationWaitTime;

View File

@ -61,7 +61,7 @@ public class QueueMetrics implements MetricsSource {
@Metric("# of apps failed") MutableCounterInt appsFailed;
@Metric("Allocated memory in MB") MutableGaugeLong allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeLong allocatedVCores;
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
@Metric("Aggregate # of allocated node-local containers")
@ -72,12 +72,12 @@ public class QueueMetrics implements MetricsSource {
MutableCounterLong aggregateOffSwitchContainersAllocated;
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
@Metric("Available memory in MB") MutableGaugeLong availableMB;
@Metric("Available CPU in virtual cores") MutableGaugeLong availableVCores;
@Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
@Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB;
@Metric("Pending CPU allocation in virtual cores") MutableGaugeLong pendingVCores;
@Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores;
@Metric("# of pending containers") MutableGaugeInt pendingContainers;
@Metric("# of reserved memory in MB") MutableGaugeLong reservedMB;
@Metric("Reserved CPU in virtual cores") MutableGaugeLong reservedVCores;
@Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
@Metric("# of reserved containers") MutableGaugeInt reservedContainers;
@Metric("# of active users") MutableGaugeInt activeUsers;
@Metric("# of active applications") MutableGaugeInt activeApplications;
@ -364,7 +364,7 @@ public void incrPendingResources(String user, int containers, Resource res) {
private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCoresSize() * containers);
pendingVCores.incr(res.getVirtualCores() * containers);
}
public void decrPendingResources(String user, int containers, Resource res) {
@ -381,7 +381,7 @@ public void decrPendingResources(String user, int containers, Resource res) {
private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
pendingMB.decr(res.getMemorySize() * containers);
pendingVCores.decr(res.getVirtualCoresSize() * containers);
pendingVCores.decr(res.getVirtualCores() * containers);
}
public void incrNodeTypeAggregations(String user, NodeType type) {
@ -409,7 +409,7 @@ public void allocateResources(String user, int containers, Resource res,
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCoresSize() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
if (decrPending) {
_decrPendingResources(containers, res);
}
@ -448,7 +448,7 @@ public void releaseResources(String user, int containers, Resource res) {
allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemorySize() * containers);
allocatedVCores.decr(res.getVirtualCoresSize() * containers);
allocatedVCores.decr(res.getVirtualCores() * containers);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(user, containers, res);
@ -561,14 +561,15 @@ public int getAppsFailed() {
}
public Resource getAllocatedResources() {
return BuilderUtils.newResource(allocatedMB.value(), allocatedVCores.value());
return BuilderUtils.newResource(allocatedMB.value(),
(int) allocatedVCores.value());
}
public long getAllocatedMB() {
return allocatedMB.value();
}
public long getAllocatedVirtualCores() {
public int getAllocatedVirtualCores() {
return allocatedVCores.value();
}
@ -580,7 +581,7 @@ public long getAvailableMB() {
return availableMB.value();
}
public long getAvailableVirtualCores() {
public int getAvailableVirtualCores() {
return availableVCores.value();
}
@ -588,7 +589,7 @@ public long getPendingMB() {
return pendingMB.value();
}
public long getPendingVirtualCores() {
public int getPendingVirtualCores() {
return pendingVCores.value();
}
@ -600,7 +601,7 @@ public long getReservedMB() {
return reservedMB.value();
}
public long getReservedVirtualCores() {
public int getReservedVirtualCores() {
return reservedVCores.value();
}

View File

@ -465,7 +465,7 @@ public synchronized void setHeadroom(Resource globalLimit) {
public synchronized Resource getHeadroom() {
// Corner case to deal with applications being slightly over-limit
if (resourceLimit.getMemorySize() < 0) {
resourceLimit.setMemory(0);
resourceLimit.setMemorySize(0);
}
return resourceLimit;

View File

@ -66,7 +66,7 @@ public Resource getHeadroom() {
}
// Corner case to deal with applications being slightly over-limit
if (headroom.getMemorySize() < 0) {
headroom.setMemory(0);
headroom.setMemorySize(0);
}
return headroom;
}

View File

@ -194,7 +194,7 @@ private void subtractResourcesOnBlacklistedNodes(
node.getUnallocatedResource());
}
if (availableResources.getMemorySize() < 0) {
availableResources.setMemory(0);
availableResources.setMemorySize(0);
}
if (availableResources.getVirtualCores() < 0) {
availableResources.setVirtualCores(0);
@ -460,7 +460,7 @@ public void resetPreemptedResources() {
}
public void clearPreemptedResources() {
preemptedResources.setMemory(0);
preemptedResources.setMemorySize(0);
preemptedResources.setVirtualCores(0);
}

View File

@ -496,15 +496,15 @@ public boolean canRunAppAM(Resource amResource) {
// maxAMResource
Resource maxResource = Resources.clone(getFairShare());
if (maxResource.getMemorySize() == 0) {
maxResource.setMemory(
maxResource.setMemorySize(
Math.min(scheduler.getRootQueueMetrics().getAvailableMB(),
getMaxShare().getMemorySize()));
}
if (maxResource.getVirtualCoresSize() == 0) {
if (maxResource.getVirtualCores() == 0) {
maxResource.setVirtualCores(Math.min(
scheduler.getRootQueueMetrics().getAvailableVirtualCores(),
getMaxShare().getVirtualCoresSize()));
getMaxShare().getVirtualCores()));
}
Resource maxAMResource = Resources.multiply(maxResource, maxAMShare);

View File

@ -266,10 +266,10 @@ private static long getResourceValue(Resource resource, ResourceType type) {
private static void setResourceValue(long val, Resource resource, ResourceType type) {
switch (type) {
case MEMORY:
resource.setMemory(val);
resource.setMemorySize(val);
break;
case CPU:
resource.setVirtualCores(val);
resource.setVirtualCores((int)val);
break;
default:
throw new IllegalArgumentException("Invalid resource");

View File

@ -28,7 +28,7 @@
@XmlAccessorType(XmlAccessType.FIELD)
public class ResourceInfo {
long memory;
long vCores;
int vCores;
public ResourceInfo() {
}
@ -42,7 +42,7 @@ public long getMemorySize() {
return memory;
}
public long getvCores() {
public int getvCores() {
return vCores;
}

View File

@ -45,7 +45,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Logger;
import org.junit.Assert;
public class MockAM {
@ -207,7 +206,7 @@ public ResourceRequest createResourceReq(String resource, int memory, int priori
pri.setPriority(priority);
req.setPriority(pri);
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(memory);
capability.setMemorySize(memory);
req.setCapability(capability);
if (labelExpression != null) {
req.setNodeLabelExpression(labelExpression);

View File

@ -82,19 +82,19 @@ public static List<RMNode> deactivatedNodes(int racks, int nodesPerRack,
public static Resource newResource(int mem) {
Resource rs = recordFactory.newRecordInstance(Resource.class);
rs.setMemory(mem);
rs.setMemorySize(mem);
return rs;
}
public static Resource newUsedResource(Resource total) {
Resource rs = recordFactory.newRecordInstance(Resource.class);
rs.setMemory((int)(Math.random() * total.getMemorySize()));
rs.setMemorySize((int)(Math.random() * total.getMemorySize()));
return rs;
}
public static Resource newAvailResource(Resource total, Resource used) {
Resource rs = recordFactory.newRecordInstance(Resource.class);
rs.setMemory(total.getMemorySize() - used.getMemorySize());
rs.setMemorySize(total.getMemorySize() - used.getMemorySize());
return rs;
}

View File

@ -455,7 +455,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, String queue, String amLabel)
throws Exception {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory);
resource.setMemorySize(masterMemory);
Priority priority = Priority.newInstance(0);
return submitApp(resource, name, user, acls, false, queue,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
@ -508,7 +508,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers) throws Exception {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory);
resource.setMemorySize(masterMemory);
return submitApp(resource, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
false, null, 0, null, true, Priority.newInstance(0));
@ -517,7 +517,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
throws Exception {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory);
resource.setMemorySize(masterMemory);
Priority priority = Priority.newInstance(0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,
@ -532,7 +532,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId) throws Exception {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory);
resource.setMemorySize(masterMemory);
Priority priority = Priority.newInstance(0);
return submitApp(resource, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
@ -542,7 +542,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
public RMApp submitApp(int masterMemory,
LogAggregationContext logAggregationContext) throws Exception {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(masterMemory);
resource.setMemorySize(masterMemory);
Priority priority = Priority.newInstance(0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,

View File

@ -826,21 +826,21 @@ public void testNodeRegistrationWithMinimumAllocations() throws Exception {
resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response1.getNodeAction());
capability.setMemory(2048);
capability.setMemorySize(2048);
capability.setVirtualCores(1);
req.setResource(capability);
RegisterNodeManagerResponse response2 =
resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response2.getNodeAction());
capability.setMemory(1024);
capability.setMemorySize(1024);
capability.setVirtualCores(4);
req.setResource(capability);
RegisterNodeManagerResponse response3 =
resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.SHUTDOWN,response3.getNodeAction());
capability.setMemory(2048);
capability.setMemorySize(2048);
capability.setVirtualCores(4);
req.setResource(capability);
RegisterNodeManagerResponse response4 =

View File

@ -1345,7 +1345,7 @@ public void testContainerCompleteMsgNotLostAfterAMFailedAndRMRestart() throws Ex
// submit app with keepContainersAcrossApplicationAttempts true
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(200);
resource.setMemorySize(200);
RMApp app0 = rm1.submitApp(resource, "", UserGroupInformation
.getCurrentUser().getShortUserName(), null, false, null,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true,

View File

@ -127,7 +127,7 @@ private ReservationAllocation createReservationAllocation(long startTime,
public Resource createResource(int memory, int vCores) {
Resource resource = new ResourcePBImpl();
resource.setMemory(memory);
resource.setMemorySize(memory);
resource.setVirtualCores(vCores);
return resource;
}

View File

@ -402,9 +402,9 @@ public static void checkApps(MetricsSource source, int submitted, int pending,
}
public static void checkResources(MetricsSource source, long allocatedMB,
long allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
long aggreReleasedCtnrs, long availableMB, long availableCores, long pendingMB,
long pendingCores, int pendingCtnrs, long reservedMB, long reservedCores,
int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
long aggreReleasedCtnrs, long availableMB, int availableCores, long pendingMB,
int pendingCores, int pendingCtnrs, long reservedMB, int reservedCores,
int reservedCtnrs) {
MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb);

View File

@ -1143,7 +1143,7 @@ public void testAppSubmissionWithPreviousToken() throws Exception{
// submit app1 with a token, set cancelTokenWhenComplete to false;
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(200);
resource.setMemorySize(200);
RMApp app1 = rm.submitApp(resource, "name", "user", null, false, null, 2,
credentials, null, true, false, false, null, 0, null, false, null);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
@ -1205,7 +1205,7 @@ public void testCancelWithMultipleAppSubmissions() throws Exception{
Assert.assertFalse(Renewer.cancelled);
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(200);
resource.setMemorySize(200);
RMApp app1 =
rm.submitApp(resource, "name", "user", null, false, null, 2, credentials,
null, true, false, false, null, 0, null, true, null);