YARN-4844. Add getMemorySize/getVirtualCoresSize to o.a.h.y.api.records.Resource. Contributed by Wangda Tan.

This commit is contained in:
Varun Vasudev 2016-05-29 20:51:02 +05:30
parent f5ff05cc8a
commit 42f90ab885
139 changed files with 1597 additions and 1574 deletions

View File

@ -689,9 +689,9 @@ public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
NormalizedResourceEvent normalizedResourceEvent =
(NormalizedResourceEvent) event;
if (normalizedResourceEvent.getTaskType() == TaskType.MAP) {
summary.setResourcesPerMap(normalizedResourceEvent.getMemory());
summary.setResourcesPerMap((int) normalizedResourceEvent.getMemory());
} else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) {
summary.setResourcesPerReduce(normalizedResourceEvent.getMemory());
summary.setResourcesPerReduce((int) normalizedResourceEvent.getMemory());
}
break;
case JOB_INITED:

View File

@ -1425,7 +1425,7 @@ private static void updateMillisCounters(JobCounterUpdateEvent jce,
}
long duration = (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
Resource allocatedResource = taskAttempt.container.getResource();
int mbAllocated = allocatedResource.getMemory();
int mbAllocated = (int) allocatedResource.getMemorySize();
int vcoresAllocated = allocatedResource.getVirtualCores();
int minSlotMemSize = taskAttempt.conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,

View File

@ -371,10 +371,10 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest
.getMemory())));
.getMemorySize())));
LOG.info("mapResourceRequest:" + mapResourceRequest);
if (mapResourceRequest.getMemory() > supportedMaxContainerCapability
.getMemory()
if (mapResourceRequest.getMemorySize() > supportedMaxContainerCapability
.getMemorySize()
|| mapResourceRequest.getVirtualCores() > supportedMaxContainerCapability
.getVirtualCores()) {
String diagMsg =
@ -388,7 +388,7 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
}
}
// set the resources
reqEvent.getCapability().setMemory(mapResourceRequest.getMemory());
reqEvent.getCapability().setMemory(mapResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(
mapResourceRequest.getVirtualCores());
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
@ -398,10 +398,10 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
reduceResourceRequest.getMemory())));
reduceResourceRequest.getMemorySize())));
LOG.info("reduceResourceRequest:" + reduceResourceRequest);
if (reduceResourceRequest.getMemory() > supportedMaxContainerCapability
.getMemory()
if (reduceResourceRequest.getMemorySize() > supportedMaxContainerCapability
.getMemorySize()
|| reduceResourceRequest.getVirtualCores() > supportedMaxContainerCapability
.getVirtualCores()) {
String diagMsg =
@ -416,7 +416,7 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) {
}
}
// set the resources
reqEvent.getCapability().setMemory(reduceResourceRequest.getMemory());
reqEvent.getCapability().setMemory(reduceResourceRequest.getMemorySize());
reqEvent.getCapability().setVirtualCores(
reduceResourceRequest.getVirtualCores());
if (reqEvent.getEarlierAttemptFailed()) {

View File

@ -20,43 +20,42 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.util.Records;
import java.util.EnumSet;
public class ResourceCalculatorUtils {
public static int divideAndCeil(int a, int b) {
public static int divideAndCeil(long a, long b) {
if (b == 0) {
return 0;
}
return (a + (b - 1)) / b;
return (int) ((a + (b - 1)) / b);
}
public static int computeAvailableContainers(Resource available,
Resource required, EnumSet<SchedulerResourceTypes> resourceTypes) {
if (resourceTypes.contains(SchedulerResourceTypes.CPU)) {
return Math.min(
calculateRatioOrMaxValue(available.getMemory(), required.getMemory()),
calculateRatioOrMaxValue(available.getMemorySize(), required.getMemorySize()),
calculateRatioOrMaxValue(available.getVirtualCores(), required
.getVirtualCores()));
}
return calculateRatioOrMaxValue(
available.getMemory(), required.getMemory());
available.getMemorySize(), required.getMemorySize());
}
public static int divideAndCeilContainers(Resource required, Resource factor,
EnumSet<SchedulerResourceTypes> resourceTypes) {
if (resourceTypes.contains(SchedulerResourceTypes.CPU)) {
return Math.max(divideAndCeil(required.getMemory(), factor.getMemory()),
return Math.max(divideAndCeil(required.getMemorySize(), factor.getMemorySize()),
divideAndCeil(required.getVirtualCores(), factor.getVirtualCores()));
}
return divideAndCeil(required.getMemory(), factor.getMemory());
return divideAndCeil(required.getMemorySize(), factor.getMemorySize());
}
private static int calculateRatioOrMaxValue(int numerator, int denominator) {
private static int calculateRatioOrMaxValue(long numerator, long denominator) {
if (denominator == 0) {
return Integer.MAX_VALUE;
}
return numerator / denominator;
return (int) (numerator / denominator);
}
}

View File

@ -157,7 +157,7 @@ public void preempt(Context ctxt, PreemptionMessage preemptionRequests) {
int pendingPreemptionRam = 0;
int pendingPreemptionCores = 0;
for (Resource r : pendingFlexiblePreemptions.values()) {
pendingPreemptionRam += r.getMemory();
pendingPreemptionRam += r.getMemorySize();
pendingPreemptionCores += r.getVirtualCores();
}
@ -171,8 +171,8 @@ public void preempt(Context ctxt, PreemptionMessage preemptionRequests) {
LOG.info("ResourceRequest:" + reqRsrc);
int reqCont = reqRsrc.getNumContainers();
int reqMem = reqRsrc.getCapability().getMemory();
int totalMemoryToRelease = reqCont * reqMem;
long reqMem = reqRsrc.getCapability().getMemorySize();
long totalMemoryToRelease = reqCont * reqMem;
int reqCores = reqRsrc.getCapability().getVirtualCores();
int totalCoresToRelease = reqCont * reqCores;
@ -204,7 +204,7 @@ public int compare(final Container o1, final Container o2) {
break;
}
TaskAttemptId reduceId = ctxt.getTaskAttempt(cont.getId());
int cMem = cont.getResource().getMemory();
int cMem = (int) cont.getResource().getMemorySize();
int cCores = cont.getResource().getVirtualCores();
if (!toBePreempted.contains(reduceId)) {

View File

@ -103,7 +103,7 @@ public void setup() {
assignedContainers.entrySet()) {
System.out.println("cont:" + ent.getKey().getContainerId() +
" type:" + ent.getValue().getTaskId().getTaskType() +
" res:" + contToResourceMap.get(ent.getKey()).getMemory() + "MB" );
" res:" + contToResourceMap.get(ent.getKey()).getMemorySize() + "MB" );
}
}
@ -180,8 +180,8 @@ public List<Container> getContainers(TaskType t) {
CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
policy.init(mActxt);
int supposedMemPreemption = pM.getContract().getResourceRequest()
.get(0).getResourceRequest().getCapability().getMemory()
int supposedMemPreemption = (int) pM.getContract().getResourceRequest()
.get(0).getResourceRequest().getCapability().getMemorySize()
* pM.getContract().getResourceRequest().get(0).getResourceRequest()
.getNumContainers();
@ -240,11 +240,11 @@ private List<TaskAttemptId> validatePreemption(PreemptionMessage pM,
}
// preempt enough
assert (effectivelyPreempted.getMemory() >= supposedMemPreemption)
: " preempted: " + effectivelyPreempted.getMemory();
assert (effectivelyPreempted.getMemorySize() >= supposedMemPreemption)
: " preempted: " + effectivelyPreempted.getMemorySize();
// preempt not too much enough
assert effectivelyPreempted.getMemory() <= supposedMemPreemption + minAlloc;
assert effectivelyPreempted.getMemorySize() <= supposedMemPreemption + minAlloc;
return preempting;
}
@ -261,8 +261,8 @@ private PreemptionMessage generatePreemptionMessage(
Resources.addTo(tot,
resPerCont.get(c));
}
int numCont = (int) Math.ceil(tot.getMemory() /
(double) minimumAllocation.getMemory());
int numCont = (int) Math.ceil(tot.getMemorySize() /
(double) minimumAllocation.getMemorySize());
ResourceRequest rr = ResourceRequest.newInstance(
Priority.newInstance(0), ResourceRequest.ANY,
minimumAllocation, numCont);

View File

@ -1794,7 +1794,7 @@ private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {
int appAttemptId = 3;
MRAppMetrics metrics = mock(MRAppMetrics.class);
Resource minContainerRequirements = mock(Resource.class);
when(minContainerRequirements.getMemory()).thenReturn(1000);
when(minContainerRequirements.getMemorySize()).thenReturn(1000L);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
AppContext appContext = mock(AppContext.class);

View File

@ -33,12 +33,6 @@
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -47,11 +41,9 @@
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@ -94,7 +86,6 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
@ -313,7 +304,7 @@ public void verifyMillisCounters(Resource containerResource,
Assert.assertEquals(rta.getLaunchTime(), 10);
Counters counters = job.getAllCounters();
int memoryMb = containerResource.getMemory();
int memoryMb = (int) containerResource.getMemorySize();
int vcores = containerResource.getVirtualCores();
Assert.assertEquals((int) Math.ceil((float) memoryMb / minContainerSize),
counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue());
@ -577,7 +568,7 @@ public void testContainerCleanedWhileRunning() throws Exception {
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
@ -635,7 +626,7 @@ public void testContainerCleanedWhileCommitting() throws Exception {
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
@ -699,7 +690,7 @@ public void testDoubleTooManyFetchFailure() throws Exception {
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
@ -769,7 +760,7 @@ public void testAppDiognosticEventOnUnassignedTask() throws Exception {
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
@ -826,7 +817,7 @@ public void testTooManyFetchFailureAfterKill() throws Exception {
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl =
@ -894,7 +885,7 @@ public void testAppDiognosticEventOnNewTask() throws Exception {
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
@ -1054,7 +1045,7 @@ public void testContainerKillAfterAssigned() throws Exception {
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(),
@ -1108,7 +1099,7 @@ public void testContainerKillWhileRunning() throws Exception {
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(),
@ -1165,7 +1156,7 @@ public void testContainerKillWhileCommitPending() throws Exception {
ClusterInfo clusterInfo = mock(ClusterInfo.class);
Resource resource = mock(Resource.class);
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
when(resource.getMemory()).thenReturn(1024);
when(resource.getMemorySize()).thenReturn(1024L);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(),

View File

@ -201,7 +201,7 @@ public void testAllocatedContainerResourceIsNotNull() {
Container container = containerAssignedCaptor.getValue().getContainer();
Resource containerResource = container.getResource();
Assert.assertNotNull(containerResource);
Assert.assertEquals(containerResource.getMemory(), 0);
Assert.assertEquals(containerResource.getMemorySize(), 0);
Assert.assertEquals(containerResource.getVirtualCores(), 0);
}

View File

@ -1772,7 +1772,7 @@ public synchronized Allocation allocate(
when(excessC.getId()).thenReturn(containerId);
when(excessC.getPriority()).thenReturn(RMContainerAllocator.PRIORITY_REDUCE);
Resource mockR = mock(Resource.class);
when(mockR.getMemory()).thenReturn(2048);
when(mockR.getMemorySize()).thenReturn(2048L);
when(excessC.getResource()).thenReturn(mockR);
NodeId nId = mock(NodeId.class);
when(nId.getHost()).thenReturn("local");

View File

@ -47,8 +47,8 @@ public void testComputeAvailableContainers() throws Exception {
Integer.MAX_VALUE,
expectedNumberOfContainersForCPU);
Resource zeroCpuResource = Resource.newInstance(nonZeroResource.getMemory(),
0);
Resource zeroCpuResource = Resource.newInstance(
nonZeroResource.getMemorySize(), 0);
verifyDifferentResourceTypes(clusterAvailableResources, zeroCpuResource,
expectedNumberOfContainersForMemory,

View File

@ -521,13 +521,13 @@ public static JobStatus fromYarn(ApplicationReport application,
application.getApplicationResourceUsageReport();
if (resourceUsageReport != null) {
jobStatus.setNeededMem(
resourceUsageReport.getNeededResources().getMemory());
resourceUsageReport.getNeededResources().getMemorySize());
jobStatus.setNumReservedSlots(
resourceUsageReport.getNumReservedContainers());
jobStatus.setNumUsedSlots(resourceUsageReport.getNumUsedContainers());
jobStatus.setReservedMem(
resourceUsageReport.getReservedResources().getMemory());
jobStatus.setUsedMem(resourceUsageReport.getUsedResources().getMemory());
resourceUsageReport.getReservedResources().getMemorySize());
jobStatus.setUsedMem(resourceUsageReport.getUsedResources().getMemorySize());
}
return jobStatus;
}

View File

@ -95,9 +95,9 @@ public int getValue() {
private String trackingUrl ="";
private int numUsedSlots;
private int numReservedSlots;
private int usedMem;
private int reservedMem;
private int neededMem;
private long usedMem;
private long reservedMem;
private long neededMem;
private boolean isUber;
/**
@ -580,42 +580,42 @@ public void setNumReservedSlots(int n) {
/**
* @return the used memory
*/
public int getUsedMem() {
public long getUsedMem() {
return usedMem;
}
/**
* @param m the used memory
*/
public void setUsedMem(int m) {
public void setUsedMem(long m) {
this.usedMem = m;
}
/**
* @return the reserved memory
*/
public int getReservedMem() {
public long getReservedMem() {
return reservedMem;
}
/**
* @param r the reserved memory
*/
public void setReservedMem(int r) {
public void setReservedMem(long r) {
this.reservedMem = r;
}
/**
* @return the needed memory
*/
public int getNeededMem() {
public long getNeededMem() {
return neededMem;
}
/**
* @param n the needed memory
*/
public void setNeededMem(int n) {
public void setNeededMem(long n) {
this.neededMem = n;
}

View File

@ -28,7 +28,7 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NormalizedResourceEvent implements HistoryEvent {
private int memory;
private long memory;
private TaskType taskType;
/**
@ -36,7 +36,7 @@ public class NormalizedResourceEvent implements HistoryEvent {
* @param taskType the tasktype of the request.
* @param memory the normalized memory requirements.
*/
public NormalizedResourceEvent(TaskType taskType, int memory) {
public NormalizedResourceEvent(TaskType taskType, long memory) {
this.memory = memory;
this.taskType = taskType;
}
@ -53,7 +53,7 @@ public TaskType getTaskType() {
* the normalized memory
* @return the normalized memory
*/
public int getMemory() {
public long getMemory() {
return this.memory;
}

View File

@ -786,9 +786,9 @@ public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
for (JobStatus job : jobs) {
int numUsedSlots = job.getNumUsedSlots();
int numReservedSlots = job.getNumReservedSlots();
int usedMem = job.getUsedMem();
int rsvdMem = job.getReservedMem();
int neededMem = job.getNeededMem();
long usedMem = job.getUsedMem();
long rsvdMem = job.getReservedMem();
long neededMem = job.getNeededMem();
int jobNameLength = job.getJobName().length();
writer.printf(dataPattern, job.getJobID().toString(),
job.getJobName().substring(0, jobNameLength > 20 ? 20 : jobNameLength),

View File

@ -170,9 +170,9 @@ public void testShowJob() throws Exception {
when(mockJobStatus.getPriority()).thenReturn(JobPriority.NORMAL);
when(mockJobStatus.getNumUsedSlots()).thenReturn(1);
when(mockJobStatus.getNumReservedSlots()).thenReturn(1);
when(mockJobStatus.getUsedMem()).thenReturn(1024);
when(mockJobStatus.getReservedMem()).thenReturn(512);
when(mockJobStatus.getNeededMem()).thenReturn(2048);
when(mockJobStatus.getUsedMem()).thenReturn(1024L);
when(mockJobStatus.getReservedMem()).thenReturn(512L);
when(mockJobStatus.getNeededMem()).thenReturn(2048L);
when(mockJobStatus.getSchedulingInfo()).thenReturn("NA");
Job mockJob = mock(Job.class);

View File

@ -67,10 +67,10 @@ public void trackApp(ApplicationAttemptId appAttemptId, String oldAppId) {
FairScheduler fair = (FairScheduler) scheduler;
final FSAppAttempt app = fair.getSchedulerApp(appAttemptId);
metrics.register("variable.app." + oldAppId + ".demand.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return app.getDemand().getMemory();
public Long getValue() {
return app.getDemand().getMemorySize();
}
}
);
@ -83,10 +83,10 @@ public Integer getValue() {
}
);
metrics.register("variable.app." + oldAppId + ".usage.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return app.getResourceUsage().getMemory();
public Long getValue() {
return app.getResourceUsage().getMemorySize();
}
}
);
@ -99,26 +99,26 @@ public Integer getValue() {
}
);
metrics.register("variable.app." + oldAppId + ".minshare.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return app.getMinShare().getMemory();
public Long getValue() {
return app.getMinShare().getMemorySize();
}
}
);
metrics.register("variable.app." + oldAppId + ".minshare.vcores",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return app.getMinShare().getMemory();
public Long getValue() {
return app.getMinShare().getMemorySize();
}
}
);
metrics.register("variable.app." + oldAppId + ".maxshare.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return Math.min(app.getMaxShare().getMemory(), totalMemoryMB);
public Long getValue() {
return Math.min(app.getMaxShare().getMemorySize(), totalMemoryMB);
}
}
);
@ -154,10 +154,10 @@ public void trackQueue(String queueName) {
FairScheduler fair = (FairScheduler) scheduler;
final FSQueue queue = fair.getQueueManager().getQueue(queueName);
metrics.register("variable.queue." + queueName + ".demand.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return queue.getDemand().getMemory();
public Long getValue() {
return queue.getDemand().getMemorySize();
}
}
);
@ -170,10 +170,10 @@ public Integer getValue() {
}
);
metrics.register("variable.queue." + queueName + ".usage.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return queue.getResourceUsage().getMemory();
public Long getValue() {
return queue.getResourceUsage().getMemorySize();
}
}
);
@ -186,10 +186,10 @@ public Integer getValue() {
}
);
metrics.register("variable.queue." + queueName + ".minshare.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return queue.getMinShare().getMemory();
public Long getValue() {
return queue.getMinShare().getMemorySize();
}
}
);
@ -202,9 +202,9 @@ public Integer getValue() {
}
);
metrics.register("variable.queue." + queueName + ".maxshare.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if (! maxReset &&
SLSRunner.simulateInfoMap.containsKey("Number of nodes") &&
SLSRunner.simulateInfoMap.containsKey("Node memory (MB)") &&
@ -221,7 +221,7 @@ public Integer getValue() {
maxReset = false;
}
return Math.min(queue.getMaxShare().getMemory(), totalMemoryMB);
return Math.min(queue.getMaxShare().getMemorySize(), totalMemoryMB);
}
}
);
@ -234,10 +234,10 @@ public Integer getValue() {
}
);
metrics.register("variable.queue." + queueName + ".fairshare.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
return queue.getFairShare().getMemory();
public Long getValue() {
return queue.getFairShare().getMemorySize();
}
}
);

View File

@ -266,7 +266,7 @@ public void handle(SchedulerEvent schedulerEvent) {
// should have one container which is AM container
RMContainer rmc = app.getLiveContainers().iterator().next();
updateQueueMetrics(queue,
rmc.getContainer().getResource().getMemory(),
rmc.getContainer().getResource().getMemorySize(),
rmc.getContainer().getResource().getVirtualCores());
}
}
@ -322,7 +322,7 @@ private void updateQueueWithNodeUpdate(
if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
for (RMContainer rmc : app.getLiveContainers()) {
if (rmc.getContainerId() == containerId) {
releasedMemory += rmc.getContainer().getResource().getMemory();
releasedMemory += rmc.getContainer().getResource().getMemorySize();
releasedVCores += rmc.getContainer()
.getResource().getVirtualCores();
break;
@ -331,7 +331,7 @@ private void updateQueueWithNodeUpdate(
} else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
if (preemptionContainerMap.containsKey(containerId)) {
Resource preResource = preemptionContainerMap.get(containerId);
releasedMemory += preResource.getMemory();
releasedMemory += preResource.getMemorySize();
releasedVCores += preResource.getVirtualCores();
preemptionContainerMap.remove(containerId);
}
@ -422,9 +422,9 @@ private void updateQueueWithAllocateRequest(Allocation allocation,
"counter.queue." + queueName + ".pending.cores",
"counter.queue." + queueName + ".allocated.memory",
"counter.queue." + queueName + ".allocated.cores"};
int values[] = new int[]{pendingResource.getMemory(),
long values[] = new long[]{pendingResource.getMemorySize(),
pendingResource.getVirtualCores(),
allocatedResource.getMemory(), allocatedResource.getVirtualCores()};
allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()};
for (int i = names.length - 1; i >= 0; i --) {
if (! counterMap.containsKey(names[i])) {
metrics.counter(names[i]);
@ -530,11 +530,11 @@ public Long getValue() {
private void registerClusterResourceMetrics() {
metrics.register("variable.cluster.allocated.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAllocatedMB();
}
@ -542,11 +542,11 @@ public Integer getValue() {
}
);
metrics.register("variable.cluster.allocated.vcores",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
}
@ -554,11 +554,11 @@ public Integer getValue() {
}
);
metrics.register("variable.cluster.available.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAvailableMB();
}
@ -566,11 +566,11 @@ public Integer getValue() {
}
);
metrics.register("variable.cluster.available.vcores",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0;
return 0L;
} else {
return scheduler.getRootQueueMetrics().getAvailableVirtualCores();
}
@ -748,7 +748,7 @@ public void addAMRuntime(ApplicationId appId,
}
private void updateQueueMetrics(String queue,
int releasedMemory, int releasedVCores) {
long releasedMemory, int releasedVCores) {
// update queue counters
SortedMap<String, Counter> counterMap = metrics.getCounters();
if (releasedMemory != 0) {

View File

@ -240,7 +240,7 @@ public void handle(SchedulerEvent schedulerEvent) {
// should have one container which is AM container
RMContainer rmc = app.getLiveContainers().iterator().next();
updateQueueMetrics(queue,
rmc.getContainer().getResource().getMemory(),
rmc.getContainer().getResource().getMemorySize(),
rmc.getContainer().getResource().getVirtualCores());
}
}
@ -299,7 +299,7 @@ private void updateQueueWithNodeUpdate(
if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
for (RMContainer rmc : app.getLiveContainers()) {
if (rmc.getContainerId() == containerId) {
releasedMemory += rmc.getContainer().getResource().getMemory();
releasedMemory += rmc.getContainer().getResource().getMemorySize();
releasedVCores += rmc.getContainer()
.getResource().getVirtualCores();
break;
@ -308,7 +308,7 @@ private void updateQueueWithNodeUpdate(
} else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
if (preemptionContainerMap.containsKey(containerId)) {
Resource preResource = preemptionContainerMap.get(containerId);
releasedMemory += preResource.getMemory();
releasedMemory += preResource.getMemorySize();
releasedVCores += preResource.getVirtualCores();
preemptionContainerMap.remove(containerId);
}
@ -399,9 +399,9 @@ private void updateQueueWithAllocateRequest(Allocation allocation,
"counter.queue." + queueName + ".pending.cores",
"counter.queue." + queueName + ".allocated.memory",
"counter.queue." + queueName + ".allocated.cores"};
int values[] = new int[]{pendingResource.getMemory(),
long values[] = new long[]{pendingResource.getMemorySize(),
pendingResource.getVirtualCores(),
allocatedResource.getMemory(), allocatedResource.getVirtualCores()};
allocatedResource.getMemorySize(), allocatedResource.getVirtualCores()};
for (int i = names.length - 1; i >= 0; i --) {
if (! counterMap.containsKey(names[i])) {
metrics.counter(names[i]);
@ -507,11 +507,11 @@ public Long getValue() {
private void registerClusterResourceMetrics() {
metrics.register("variable.cluster.allocated.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if( getRootQueueMetrics() == null) {
return 0;
return 0L;
} else {
return getRootQueueMetrics().getAllocatedMB();
}
@ -519,11 +519,11 @@ public Integer getValue() {
}
);
metrics.register("variable.cluster.allocated.vcores",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if(getRootQueueMetrics() == null) {
return 0;
return 0L;
} else {
return getRootQueueMetrics().getAllocatedVirtualCores();
}
@ -531,11 +531,11 @@ public Integer getValue() {
}
);
metrics.register("variable.cluster.available.memory",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if(getRootQueueMetrics() == null) {
return 0;
return 0L;
} else {
return getRootQueueMetrics().getAvailableMB();
}
@ -543,11 +543,11 @@ public Integer getValue() {
}
);
metrics.register("variable.cluster.available.vcores",
new Gauge<Integer>() {
new Gauge<Long>() {
@Override
public Integer getValue() {
public Long getValue() {
if(getRootQueueMetrics() == null) {
return 0;
return 0l;
} else {
return getRootQueueMetrics().getAvailableVirtualCores();
}
@ -726,7 +726,7 @@ public void addAMRuntime(ApplicationId appId,
}
private void updateQueueMetrics(String queue,
int releasedMemory, int releasedVCores) {
long releasedMemory, int releasedVCores) {
// update queue counters
SortedMap<String, Counter> counterMap = metrics.getCounters();
if (releasedMemory != 0) {

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@ -53,7 +55,7 @@ public abstract class Resource implements Comparable<Resource> {
@Public
@Stable
public static Resource newInstance(int memory, int vCores) {
public static Resource newInstance(long memory, long vCores) {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(memory);
resource.setVirtualCores(vCores);
@ -61,20 +63,31 @@ public static Resource newInstance(int memory, int vCores) {
}
/**
* This method is DEPRECATED:
* Use {@link Resource#getMemorySize()} instead
*
* Get <em>memory</em> of the resource.
* @return <em>memory</em> of the resource
*/
@Public
@Stable
@Deprecated
public abstract int getMemory();
/**
* Get <em>memory</em> of the resource.
* @return <em>memory</em> of the resource
*/
@Private
@Unstable
public abstract long getMemorySize();
/**
* Set <em>memory</em> of the resource.
* @param memory <em>memory</em> of the resource
*/
@Public
@Stable
public abstract void setMemory(int memory);
public abstract void setMemory(long memory);
/**
@ -91,6 +104,10 @@ public static Resource newInstance(int memory, int vCores) {
@Evolving
public abstract int getVirtualCores();
@Public
@Unstable
public abstract long getVirtualCoresSize();
/**
* Set <em>number of virtual cpu cores</em> of the resource.
*
@ -103,13 +120,14 @@ public static Resource newInstance(int memory, int vCores) {
*/
@Public
@Evolving
public abstract void setVirtualCores(int vCores);
public abstract void setVirtualCores(long vCores);
@Override
public int hashCode() {
final int prime = 263167;
int result = 3571;
result = 939769357 + getMemory(); // prime * result = 939769357 initially
int result = (int) (939769357
+ getMemorySize()); // prime * result = 939769357 initially
result = prime * result + getVirtualCores();
return result;
}
@ -123,7 +141,7 @@ public boolean equals(Object obj) {
if (!(obj instanceof Resource))
return false;
Resource other = (Resource) obj;
if (getMemory() != other.getMemory() ||
if (getMemorySize() != other.getMemorySize() ||
getVirtualCores() != other.getVirtualCores()) {
return false;
}
@ -132,6 +150,6 @@ public boolean equals(Object obj) {
@Override
public String toString() {
return "<memory:" + getMemory() + ", vCores:" + getVirtualCores() + ">";
return "<memory:" + getMemorySize() + ", vCores:" + getVirtualCores() + ">";
}
}

View File

@ -54,8 +54,8 @@ message ContainerIdProto {
}
message ResourceProto {
optional int32 memory = 1;
optional int32 virtual_cores = 2;
optional int64 memory = 1;
optional int64 virtual_cores = 2;
}
message ResourceUtilizationProto {

View File

@ -224,7 +224,7 @@ public static enum DSEntity {
@VisibleForTesting
protected int numTotalContainers = 1;
// Memory to request for the container on which the shell command will run
private int containerMemory = 10;
private long containerMemory = 10;
// VirtualCores to request for the container on which the shell command will run
private int containerVirtualCores = 1;
// Priority of the request
@ -631,7 +631,7 @@ public void run() throws YarnException, IOException, InterruptedException {
appMasterTrackingUrl);
// Dump out information about cluster capability as seen by the
// resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
long maxMem = response.getMaximumResourceCapability().getMemorySize();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
@ -861,7 +861,7 @@ public void onContainersAllocated(List<Container> allocatedContainers) {
+ ":" + allocatedContainer.getNodeId().getPort()
+ ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ ", containerResourceMemory"
+ allocatedContainer.getResource().getMemory()
+ allocatedContainer.getResource().getMemorySize()
+ ", containerResourceVirtualCores"
+ allocatedContainer.getResource().getVirtualCores());
// + ", containerToken"

View File

@ -127,7 +127,7 @@ public class Client {
// Queue for App master
private String amQueue = "";
// Amt. of memory resource to request for to run the App Master
private int amMemory = 10;
private long amMemory = 10;
// Amt. of virtual core resource to request for to run the App Master
private int amVCores = 1;
@ -520,7 +520,7 @@ public boolean run() throws IOException, YarnException {
// the required resources from the RM for the app master
// Memory ask has to be a multiple of min and less than max.
// Dump out information about cluster capability as seen by the resource manager
int maxMem = appResponse.getMaximumResourceCapability().getMemory();
long maxMem = appResponse.getMaximumResourceCapability().getMemorySize();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
// A resource ask cannot exceed the max.

View File

@ -122,10 +122,10 @@ class ResourceRequestInfo {
class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
@Override
public int compare(Resource arg0, Resource arg1) {
int mem0 = arg0.getMemory();
int mem1 = arg1.getMemory();
int cpu0 = arg0.getVirtualCores();
int cpu1 = arg1.getVirtualCores();
long mem0 = arg0.getMemorySize();
long mem1 = arg1.getMemorySize();
long cpu0 = arg0.getVirtualCores();
long cpu1 = arg1.getVirtualCores();
if(mem0 == mem1) {
if(cpu0 == cpu1) {
return 0;
@ -143,10 +143,10 @@ public int compare(Resource arg0, Resource arg1) {
}
static boolean canFit(Resource arg0, Resource arg1) {
int mem0 = arg0.getMemory();
int mem1 = arg1.getMemory();
int cpu0 = arg0.getVirtualCores();
int cpu1 = arg1.getVirtualCores();
long mem0 = arg0.getMemorySize();
long mem1 = arg1.getMemorySize();
long cpu0 = arg0.getVirtualCores();
long cpu1 = arg1.getVirtualCores();
return (mem0 <= mem1 && cpu0 <= cpu1);
}

View File

@ -275,9 +275,9 @@ private void printNodeStatus(String nodeIdStr) throws YarnException,
nodeReportStr.println(nodeReport.getNumContainers());
nodeReportStr.print("\tMemory-Used : ");
nodeReportStr.println((nodeReport.getUsed() == null) ? "0MB"
: (nodeReport.getUsed().getMemory() + "MB"));
: (nodeReport.getUsed().getMemorySize() + "MB"));
nodeReportStr.print("\tMemory-Capacity : ");
nodeReportStr.println(nodeReport.getCapability().getMemory() + "MB");
nodeReportStr.println(nodeReport.getCapability().getMemorySize() + "MB");
nodeReportStr.print("\tCPU-Used : ");
nodeReportStr.println((nodeReport.getUsed() == null) ? "0 vcores"
: (nodeReport.getUsed().getVirtualCores() + " vcores"));

View File

@ -158,7 +158,7 @@ private static class ApplicationInformation {
displayStringsMap.put(Columns.VCORES, String.valueOf(usedVirtualCores));
usedMemory =
appReport.getApplicationResourceUsageReport().getUsedResources()
.getMemory() / 1024;
.getMemorySize() / 1024;
displayStringsMap.put(Columns.MEM, String.valueOf(usedMemory) + "G");
reservedVirtualCores =
appReport.getApplicationResourceUsageReport().getReservedResources()
@ -167,7 +167,7 @@ private static class ApplicationInformation {
String.valueOf(reservedVirtualCores));
reservedMemory =
appReport.getApplicationResourceUsageReport().getReservedResources()
.getMemory() / 1024;
.getMemorySize() / 1024;
displayStringsMap.put(Columns.RMEM, String.valueOf(reservedMemory) + "G");
attempts = appReport.getCurrentApplicationAttemptId().getAttemptId();
nodes = 0;

View File

@ -1209,7 +1209,7 @@ private MiniYARNCluster setupMiniYARNCluster() {
for (attempts = 10; attempts > 0; attempts--) {
if (cluster.getResourceManager().getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
.getMemory() > 6000) {
.getMemorySize() > 6000) {
break;
}
try {

View File

@ -54,38 +54,48 @@ private void maybeInitBuilder() {
viaProto = false;
}
@Override
@SuppressWarnings("deprecation")
public int getMemory() {
ResourceProtoOrBuilder p = viaProto ? proto : builder;
return (p.getMemory());
return (int) getMemorySize();
}
@Override
public void setMemory(int memory) {
public long getMemorySize() {
ResourceProtoOrBuilder p = viaProto ? proto : builder;
return p.getMemory();
}
@Override
public void setMemory(long memory) {
maybeInitBuilder();
builder.setMemory((memory));
builder.setMemory(memory);
}
@Override
public int getVirtualCores() {
ResourceProtoOrBuilder p = viaProto ? proto : builder;
return (p.getVirtualCores());
return (int) getVirtualCoresSize();
}
@Override
public void setVirtualCores(int vCores) {
public long getVirtualCoresSize() {
ResourceProtoOrBuilder p = viaProto ? proto : builder;
return p.getVirtualCores();
}
@Override
public void setVirtualCores(long vCores) {
maybeInitBuilder();
builder.setVirtualCores((vCores));
builder.setVirtualCores(vCores);
}
@Override
public int compareTo(Resource other) {
int diff = this.getMemory() - other.getMemory();
long diff = this.getMemorySize() - other.getMemorySize();
if (diff == 0) {
diff = this.getVirtualCores() - other.getVirtualCores();
}
return diff;
return diff == 0 ? 0 : (diff > 0 ? 1 : -1);
}

View File

@ -28,13 +28,13 @@ public class DefaultResourceCalculator extends ResourceCalculator {
@Override
public int compare(Resource unused, Resource lhs, Resource rhs) {
// Only consider memory
return lhs.getMemory() - rhs.getMemory();
return Long.compare(lhs.getMemorySize(), rhs.getMemorySize());
}
@Override
public int computeAvailableContainers(Resource available, Resource required) {
public long computeAvailableContainers(Resource available, Resource required) {
// Only consider memory
return available.getMemory() / required.getMemory();
return available.getMemorySize() / required.getMemorySize();
}
@Override
@ -44,7 +44,7 @@ public float divide(Resource unused,
}
public boolean isInvalidDivisor(Resource r) {
if (r.getMemory() == 0.0f) {
if (r.getMemorySize() == 0.0f) {
return true;
}
return false;
@ -52,23 +52,23 @@ public boolean isInvalidDivisor(Resource r) {
@Override
public float ratio(Resource a, Resource b) {
return (float)a.getMemory() / b.getMemory();
return (float)a.getMemorySize() / b.getMemorySize();
}
@Override
public Resource divideAndCeil(Resource numerator, int denominator) {
public Resource divideAndCeil(Resource numerator, long denominator) {
return Resources.createResource(
divideAndCeil(numerator.getMemory(), denominator));
divideAndCeil(numerator.getMemorySize(), denominator));
}
@Override
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource, Resource stepFactor) {
int normalizedMemory = Math.min(
long normalizedMemory = Math.min(
roundUp(
Math.max(r.getMemory(), minimumResource.getMemory()),
stepFactor.getMemory()),
maximumResource.getMemory());
Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
stepFactor.getMemorySize()),
maximumResource.getMemorySize());
return Resources.createResource(normalizedMemory);
}
@ -81,22 +81,22 @@ public Resource normalize(Resource r, Resource minimumResource,
@Override
public Resource roundUp(Resource r, Resource stepFactor) {
return Resources.createResource(
roundUp(r.getMemory(), stepFactor.getMemory())
roundUp(r.getMemorySize(), stepFactor.getMemorySize())
);
}
@Override
public Resource roundDown(Resource r, Resource stepFactor) {
return Resources.createResource(
roundDown(r.getMemory(), stepFactor.getMemory()));
roundDown(r.getMemorySize(), stepFactor.getMemorySize()));
}
@Override
public Resource multiplyAndNormalizeUp(Resource r, double by,
Resource stepFactor) {
return Resources.createResource(
roundUp((int)(r.getMemory() * by + 0.5), stepFactor.getMemory())
);
roundUp((long) (r.getMemorySize() * by + 0.5),
stepFactor.getMemorySize()));
}
@Override
@ -104,8 +104,8 @@ public Resource multiplyAndNormalizeDown(Resource r, double by,
Resource stepFactor) {
return Resources.createResource(
roundDown(
(int)(r.getMemory() * by),
stepFactor.getMemory()
(long)(r.getMemorySize() * by),
stepFactor.getMemorySize()
)
);
}
@ -113,6 +113,6 @@ public Resource multiplyAndNormalizeDown(Resource r, double by,
@Override
public boolean fitsIn(Resource cluster,
Resource smaller, Resource bigger) {
return smaller.getMemory() <= bigger.getMemory();
return smaller.getMemorySize() <= bigger.getMemorySize();
}
}

View File

@ -54,15 +54,15 @@ public int compare(Resource clusterResource, Resource lhs, Resource rhs) {
}
if (isInvalidDivisor(clusterResource)) {
if ((lhs.getMemory() < rhs.getMemory() && lhs.getVirtualCores() > rhs
if ((lhs.getMemorySize() < rhs.getMemorySize() && lhs.getVirtualCores() > rhs
.getVirtualCores())
|| (lhs.getMemory() > rhs.getMemory() && lhs.getVirtualCores() < rhs
|| (lhs.getMemorySize() > rhs.getMemorySize() && lhs.getVirtualCores() < rhs
.getVirtualCores())) {
return 0;
} else if (lhs.getMemory() > rhs.getMemory()
} else if (lhs.getMemorySize() > rhs.getMemorySize()
|| lhs.getVirtualCores() > rhs.getVirtualCores()) {
return 1;
} else if (lhs.getMemory() < rhs.getMemory()
} else if (lhs.getMemorySize() < rhs.getMemorySize()
|| lhs.getVirtualCores() < rhs.getVirtualCores()) {
return -1;
}
@ -100,20 +100,20 @@ protected float getResourceAsValue(
// Just use 'dominant' resource
return (dominant) ?
Math.max(
(float)resource.getMemory() / clusterResource.getMemory(),
(float)resource.getMemorySize() / clusterResource.getMemorySize(),
(float)resource.getVirtualCores() / clusterResource.getVirtualCores()
)
:
Math.min(
(float)resource.getMemory() / clusterResource.getMemory(),
(float)resource.getMemorySize() / clusterResource.getMemorySize(),
(float)resource.getVirtualCores() / clusterResource.getVirtualCores()
);
}
@Override
public int computeAvailableContainers(Resource available, Resource required) {
public long computeAvailableContainers(Resource available, Resource required) {
return Math.min(
available.getMemory() / required.getMemory(),
available.getMemorySize() / required.getMemorySize(),
available.getVirtualCores() / required.getVirtualCores());
}
@ -127,7 +127,7 @@ public float divide(Resource clusterResource,
@Override
public boolean isInvalidDivisor(Resource r) {
if (r.getMemory() == 0.0f || r.getVirtualCores() == 0.0f) {
if (r.getMemorySize() == 0.0f || r.getVirtualCores() == 0.0f) {
return true;
}
return false;
@ -136,15 +136,15 @@ public boolean isInvalidDivisor(Resource r) {
@Override
public float ratio(Resource a, Resource b) {
return Math.max(
(float)a.getMemory()/b.getMemory(),
(float)a.getMemorySize()/b.getMemorySize(),
(float)a.getVirtualCores()/b.getVirtualCores()
);
}
@Override
public Resource divideAndCeil(Resource numerator, int denominator) {
public Resource divideAndCeil(Resource numerator, long denominator) {
return Resources.createResource(
divideAndCeil(numerator.getMemory(), denominator),
divideAndCeil(numerator.getMemorySize(), denominator),
divideAndCeil(numerator.getVirtualCores(), denominator)
);
}
@ -152,12 +152,12 @@ public Resource divideAndCeil(Resource numerator, int denominator) {
@Override
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource, Resource stepFactor) {
int normalizedMemory = Math.min(
long normalizedMemory = Math.min(
roundUp(
Math.max(r.getMemory(), minimumResource.getMemory()),
stepFactor.getMemory()),
maximumResource.getMemory());
int normalizedCores = Math.min(
Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
stepFactor.getMemorySize()),
maximumResource.getMemorySize());
long normalizedCores = Math.min(
roundUp(
Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
stepFactor.getVirtualCores()),
@ -169,7 +169,7 @@ public Resource normalize(Resource r, Resource minimumResource,
@Override
public Resource roundUp(Resource r, Resource stepFactor) {
return Resources.createResource(
roundUp(r.getMemory(), stepFactor.getMemory()),
roundUp(r.getMemorySize(), stepFactor.getMemorySize()),
roundUp(r.getVirtualCores(), stepFactor.getVirtualCores())
);
}
@ -177,7 +177,7 @@ public Resource roundUp(Resource r, Resource stepFactor) {
@Override
public Resource roundDown(Resource r, Resource stepFactor) {
return Resources.createResource(
roundDown(r.getMemory(), stepFactor.getMemory()),
roundDown(r.getMemorySize(), stepFactor.getMemorySize()),
roundDown(r.getVirtualCores(), stepFactor.getVirtualCores())
);
}
@ -187,7 +187,7 @@ public Resource multiplyAndNormalizeUp(Resource r, double by,
Resource stepFactor) {
return Resources.createResource(
roundUp(
(int)Math.ceil(r.getMemory() * by), stepFactor.getMemory()),
(int)Math.ceil(r.getMemorySize() * by), stepFactor.getMemorySize()),
roundUp(
(int)Math.ceil(r.getVirtualCores() * by),
stepFactor.getVirtualCores())
@ -199,8 +199,8 @@ public Resource multiplyAndNormalizeDown(Resource r, double by,
Resource stepFactor) {
return Resources.createResource(
roundDown(
(int)(r.getMemory() * by),
stepFactor.getMemory()
(int)(r.getMemorySize() * by),
stepFactor.getMemorySize()
),
roundDown(
(int)(r.getVirtualCores() * by),
@ -212,7 +212,7 @@ public Resource multiplyAndNormalizeDown(Resource r, double by,
@Override
public boolean fitsIn(Resource cluster,
Resource smaller, Resource bigger) {
return smaller.getMemory() <= bigger.getMemory()
return smaller.getMemorySize() <= bigger.getMemorySize()
&& smaller.getVirtualCores() <= bigger.getVirtualCores();
}
}

View File

@ -31,18 +31,18 @@ public abstract class ResourceCalculator {
public abstract int
compare(Resource clusterResource, Resource lhs, Resource rhs);
public static int divideAndCeil(int a, int b) {
public static long divideAndCeil(long a, long b) {
if (b == 0) {
return 0;
}
return (a + (b - 1)) / b;
}
public static int roundUp(int a, int b) {
public static long roundUp(long a, long b) {
return divideAndCeil(a, b) * b;
}
public static int roundDown(int a, int b) {
public static long roundDown(long a, long b) {
return (a / b) * b;
}
@ -54,7 +54,7 @@ public static int roundDown(int a, int b) {
* @param required required resources
* @return number of containers which can be allocated
*/
public abstract int computeAvailableContainers(
public abstract long computeAvailableContainers(
Resource available, Resource required);
/**
@ -169,7 +169,7 @@ public abstract float divide(
* @param denominator denominator
* @return resultant resource
*/
public abstract Resource divideAndCeil(Resource numerator, int denominator);
public abstract Resource divideAndCeil(Resource numerator, long denominator);
/**
* Check if a smaller resource can be contained by bigger resource.

View File

@ -31,12 +31,18 @@ public class Resources {
private static final Resource NONE = new Resource() {
@Override
@SuppressWarnings("deprecation")
public int getMemory() {
return 0;
}
@Override
public void setMemory(int memory) {
public long getMemorySize() {
return 0;
}
@Override
public void setMemory(long memory) {
throw new RuntimeException("NONE cannot be modified!");
}
@ -46,17 +52,22 @@ public int getVirtualCores() {
}
@Override
public void setVirtualCores(int cores) {
public long getVirtualCoresSize() {
return 0;
}
@Override
public void setVirtualCores(long cores) {
throw new RuntimeException("NONE cannot be modified!");
}
@Override
public int compareTo(Resource o) {
int diff = 0 - o.getMemory();
long diff = 0 - o.getMemorySize();
if (diff == 0) {
diff = 0 - o.getVirtualCores();
}
return diff;
return Long.signum(diff);
}
};
@ -64,12 +75,18 @@ public int compareTo(Resource o) {
private static final Resource UNBOUNDED = new Resource() {
@Override
@SuppressWarnings("deprecation")
public int getMemory() {
return Integer.MAX_VALUE;
}
@Override
public void setMemory(int memory) {
public long getMemorySize() {
return Long.MAX_VALUE;
}
@Override
public void setMemory(long memory) {
throw new RuntimeException("UNBOUNDED cannot be modified!");
}
@ -79,26 +96,31 @@ public int getVirtualCores() {
}
@Override
public void setVirtualCores(int cores) {
public long getVirtualCoresSize() {
return Long.MAX_VALUE;
}
@Override
public void setVirtualCores(long cores) {
throw new RuntimeException("UNBOUNDED cannot be modified!");
}
@Override
public int compareTo(Resource o) {
int diff = Integer.MAX_VALUE - o.getMemory();
long diff = Long.MAX_VALUE - o.getMemorySize();
if (diff == 0) {
diff = Integer.MAX_VALUE - o.getVirtualCores();
diff = Long.MAX_VALUE - o.getVirtualCoresSize();
}
return diff;
return Long.signum(diff);
}
};
public static Resource createResource(int memory) {
public static Resource createResource(long memory) {
return createResource(memory, (memory > 0) ? 1 : 0);
}
public static Resource createResource(int memory, int cores) {
public static Resource createResource(long memory, long cores) {
Resource resource = Records.newRecord(Resource.class);
resource.setMemory(memory);
resource.setVirtualCores(cores);
@ -114,11 +136,11 @@ public static Resource unbounded() {
}
public static Resource clone(Resource res) {
return createResource(res.getMemory(), res.getVirtualCores());
return createResource(res.getMemorySize(), res.getVirtualCores());
}
public static Resource addTo(Resource lhs, Resource rhs) {
lhs.setMemory(lhs.getMemory() + rhs.getMemory());
lhs.setMemory(lhs.getMemorySize() + rhs.getMemorySize());
lhs.setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores());
return lhs;
}
@ -128,7 +150,7 @@ public static Resource add(Resource lhs, Resource rhs) {
}
public static Resource subtractFrom(Resource lhs, Resource rhs) {
lhs.setMemory(lhs.getMemory() - rhs.getMemory());
lhs.setMemory(lhs.getMemorySize() - rhs.getMemorySize());
lhs.setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores());
return lhs;
}
@ -142,7 +164,7 @@ public static Resource negate(Resource resource) {
}
public static Resource multiplyTo(Resource lhs, double by) {
lhs.setMemory((int)(lhs.getMemory() * by));
lhs.setMemory((int)(lhs.getMemorySize() * by));
lhs.setVirtualCores((int)(lhs.getVirtualCores() * by));
return lhs;
}
@ -157,7 +179,7 @@ public static Resource multiply(Resource lhs, double by) {
*/
public static Resource multiplyAndAddTo(
Resource lhs, Resource rhs, double by) {
lhs.setMemory(lhs.getMemory() + (int)(rhs.getMemory() * by));
lhs.setMemory(lhs.getMemorySize() + (int)(rhs.getMemorySize() * by));
lhs.setVirtualCores(lhs.getVirtualCores()
+ (int)(rhs.getVirtualCores() * by));
return lhs;
@ -175,7 +197,7 @@ public static Resource multiplyAndNormalizeDown(
public static Resource multiplyAndRoundDown(Resource lhs, double by) {
Resource out = clone(lhs);
out.setMemory((int)(lhs.getMemory() * by));
out.setMemory((int)(lhs.getMemorySize() * by));
out.setVirtualCores((int)(lhs.getVirtualCores() * by));
return out;
}
@ -264,7 +286,7 @@ public static Resource max(
}
public static boolean fitsIn(Resource smaller, Resource bigger) {
return smaller.getMemory() <= bigger.getMemory() &&
return smaller.getMemorySize() <= bigger.getMemorySize() &&
smaller.getVirtualCores() <= bigger.getVirtualCores();
}
@ -274,12 +296,12 @@ public static boolean fitsIn(ResourceCalculator rc, Resource cluster,
}
public static Resource componentwiseMin(Resource lhs, Resource rhs) {
return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),
return createResource(Math.min(lhs.getMemorySize(), rhs.getMemorySize()),
Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()));
}
public static Resource componentwiseMax(Resource lhs, Resource rhs) {
return createResource(Math.max(lhs.getMemory(), rhs.getMemory()),
return createResource(Math.max(lhs.getMemorySize(), rhs.getMemorySize()),
Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
}
}

View File

@ -24,18 +24,18 @@
public class TestResources {
public Resource createResource(int memory, int vCores) {
public Resource createResource(long memory, long vCores) {
return Resource.newInstance(memory, vCores);
}
@Test(timeout=1000)
public void testCompareToWithUnboundedResource() {
assertTrue(Resources.unbounded().compareTo(
createResource(Integer.MAX_VALUE, Integer.MAX_VALUE)) == 0);
createResource(Long.MAX_VALUE, Long.MAX_VALUE)) == 0);
assertTrue(Resources.unbounded().compareTo(
createResource(Integer.MAX_VALUE, 0)) > 0);
createResource(Long.MAX_VALUE, 0)) > 0);
assertTrue(Resources.unbounded().compareTo(
createResource(0, Integer.MAX_VALUE)) > 0);
createResource(0, Long.MAX_VALUE)) > 0);
}
@Test(timeout=1000)

View File

@ -442,7 +442,7 @@ public static ApplicationResourceUsageReport newApplicationResourceUsageReport(
return report;
}
public static Resource newResource(int memory, int vCores) {
public static Resource newResource(long memory, long vCores) {
Resource resource = recordFactory.newRecordInstance(Resource.class);
resource.setMemory(memory);
resource.setVirtualCores(vCores);

View File

@ -59,8 +59,8 @@ public class AppInfo {
protected long elapsedTime;
protected String applicationTags;
protected int priority;
private int allocatedCpuVcores;
private int allocatedMemoryMB;
private long allocatedCpuVcores;
private long allocatedMemoryMB;
protected boolean unmanagedApplication;
private String appNodeLabelExpression;
private String amNodeLabelExpression;
@ -100,7 +100,7 @@ public AppInfo(ApplicationReport app) {
allocatedCpuVcores = app.getApplicationResourceUsageReport()
.getUsedResources().getVirtualCores();
allocatedMemoryMB = app.getApplicationResourceUsageReport()
.getUsedResources().getMemory();
.getUsedResources().getMemorySize();
}
}
progress = app.getProgress() * 100; // in percent
@ -152,11 +152,11 @@ public int getRunningContainers() {
return runningContainers;
}
public int getAllocatedCpuVcores() {
public long getAllocatedCpuVcores() {
return allocatedCpuVcores;
}
public int getAllocatedMemoryMB() {
public long getAllocatedMemoryMB() {
return allocatedMemoryMB;
}

View File

@ -36,8 +36,8 @@
public class ContainerInfo {
protected String containerId;
protected int allocatedMB;
protected int allocatedVCores;
protected long allocatedMB;
protected long allocatedVCores;
protected String assignedNodeId;
protected int priority;
protected long startedTime;
@ -57,7 +57,7 @@ public ContainerInfo() {
public ContainerInfo(ContainerReport container) {
containerId = container.getContainerId().toString();
if (container.getAllocatedResource() != null) {
allocatedMB = container.getAllocatedResource().getMemory();
allocatedMB = container.getAllocatedResource().getMemorySize();
allocatedVCores = container.getAllocatedResource().getVirtualCores();
}
if (container.getAssignedNode() != null) {
@ -79,11 +79,11 @@ public String getContainerId() {
return containerId;
}
public int getAllocatedMB() {
public long getAllocatedMB() {
return allocatedMB;
}
public int getAllocatedVCores() {
public long getAllocatedVCores() {
return allocatedVCores;
}

View File

@ -180,7 +180,7 @@ public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() {
assertEquals(1, copy.getContainersToDecrease().get(0)
.getId().getContainerId());
assertEquals(1024, copy.getContainersToDecrease().get(1)
.getResource().getMemory());
.getResource().getMemorySize());
}
/**
@ -201,7 +201,7 @@ public void testRegisterNodeManagerRequestPBImpl() {
assertEquals(8080, copy.getHttpPort());
assertEquals(9090, copy.getNodeId().getPort());
assertEquals(10000, copy.getResource().getMemory());
assertEquals(10000, copy.getResource().getMemorySize());
assertEquals(2, copy.getResource().getVirtualCores());
}
@ -273,7 +273,7 @@ public void testNodeStatusPBImpl() {
assertEquals(1, copy.getIncreasedContainers().get(0)
.getId().getContainerId());
assertEquals(4096, copy.getIncreasedContainers().get(1)
.getResource().getMemory());
.getResource().getMemorySize());
}
@Test

View File

@ -405,7 +405,7 @@ protected String[] getRunCommand(String command, String groupId,
.getBoolean(
YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED,
YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) {
memory = resource.getMemory();
memory = (int) resource.getMemorySize();
}
if (conf.getBoolean(

View File

@ -215,7 +215,7 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
if (recoveredCapability != null
&& !this.resource.equals(recoveredCapability)) {
// resource capability had been updated before NM was down
this.resource = Resource.newInstance(recoveredCapability.getMemory(),
this.resource = Resource.newInstance(recoveredCapability.getMemorySize(),
recoveredCapability.getVirtualCores());
}
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
@ -611,7 +611,7 @@ private void sendContainerMonitorStartEvent() {
long launchDuration = clock.getTime() - containerLaunchStartTime;
metrics.addContainerLaunchDuration(launchDuration);
long pmemBytes = getResource().getMemory() * 1024 * 1024L;
long pmemBytes = getResource().getMemorySize() * 1024 * 1024L;
float pmemRatio = daemonConf.getFloat(
YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);

View File

@ -115,8 +115,8 @@ public List<PrivilegedOperation> preStart(Container container)
String cgroupId = container.getContainerId().toString();
//memory is in MB
long containerSoftLimit =
(long) (container.getResource().getMemory() * this.softLimit);
long containerHardLimit = container.getResource().getMemory();
(long) (container.getResource().getMemorySize() * this.softLimit);
long containerHardLimit = container.getResource().getMemorySize();
cGroupsHandler.createCGroup(MEMORY, cgroupId);
try {
cGroupsHandler.updateCGroupParam(MEMORY, cgroupId,

View File

@ -646,7 +646,7 @@ private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
ChangeMonitoringContainerResourceEvent changeEvent =
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
Resource resource = changeEvent.getResource();
pmemLimitMBs = resource.getMemory();
pmemLimitMBs = (int) resource.getMemorySize();
vmemLimitMBs = (int) (pmemLimitMBs * vmemRatio);
cpuVcores = resource.getVirtualCores();
usageMetrics.recordResourceLimit(
@ -822,7 +822,7 @@ protected void onChangeMonitoringContainerResource(
}
LOG.info("Changing resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
long pmemLimit = changeEvent.getResource().getMemorySize() * 1024L * 1024L;
long vmemLimit = (long) (pmemLimit * vmemRatio);
int cpuVcores = changeEvent.getResource().getVirtualCores();
processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);

View File

@ -600,7 +600,7 @@ protected ProcessTreeInfo getPti() {
private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId,
Resource resource, Configuration conf) {
long pmemBytes = resource.getMemory() * 1024 * 1024L;
long pmemBytes = resource.getMemorySize() * 1024 * 1024L;
float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
long vmemBytes = (long) (pmemRatio * pmemBytes);

View File

@ -115,9 +115,9 @@ public void endRunningContainer() {
public void allocateContainer(Resource res) {
allocatedContainers.incr();
allocatedMB = allocatedMB + res.getMemory();
allocatedMB = allocatedMB + res.getMemorySize();
allocatedGB.set((int)Math.ceil(allocatedMB/1024d));
availableMB = availableMB - res.getMemory();
availableMB = availableMB - res.getMemorySize();
availableGB.set((int)Math.floor(availableMB/1024d));
allocatedVCores.incr(res.getVirtualCores());
availableVCores.decr(res.getVirtualCores());
@ -125,16 +125,16 @@ public void allocateContainer(Resource res) {
public void releaseContainer(Resource res) {
allocatedContainers.decr();
allocatedMB = allocatedMB - res.getMemory();
allocatedMB = allocatedMB - res.getMemorySize();
allocatedGB.set((int)Math.ceil(allocatedMB/1024d));
availableMB = availableMB + res.getMemory();
availableMB = availableMB + res.getMemorySize();
availableGB.set((int)Math.floor(availableMB/1024d));
allocatedVCores.decr(res.getVirtualCores());
availableVCores.incr(res.getVirtualCores());
}
public void changeContainer(Resource before, Resource now) {
int deltaMB = now.getMemory() - before.getMemory();
long deltaMB = now.getMemorySize() - before.getMemorySize();
int deltaVCores = now.getVirtualCores() - before.getVirtualCores();
allocatedMB = allocatedMB + deltaMB;
allocatedGB.set((int)Math.ceil(allocatedMB/1024d));
@ -145,7 +145,7 @@ public void changeContainer(Resource before, Resource now) {
}
public void addResource(Resource res) {
availableMB = availableMB + res.getMemory();
availableMB = availableMB + res.getMemorySize();
availableGB.incr((int)Math.floor(availableMB/1024d));
availableVCores.incr(res.getVirtualCores());
}

View File

@ -81,7 +81,7 @@ public ContainerInfo(final Context nmContext, final Container container,
this.user = container.getUser();
Resource res = container.getResource();
if (res != null) {
this.totalMemoryNeededMB = res.getMemory();
this.totalMemoryNeededMB = res.getMemorySize();
this.totalVCoresNeeded = res.getVirtualCores();
}
this.containerLogsShortLink = ujoin("containerlogs", this.id,

View File

@ -190,7 +190,7 @@ public RegisterNodeManagerResponse registerNodeManager(
InetSocketAddress expected = NetUtils.getConnectAddress(
conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
Assert.assertEquals(NetUtils.getHostPortString(expected), nodeId.toString());
Assert.assertEquals(5 * 1024, resource.getMemory());
Assert.assertEquals(5 * 1024, resource.getMemorySize());
registeredNodes.add(nodeId);
RegisterNodeManagerResponse response = recordFactory
@ -918,7 +918,7 @@ public RegisterNodeManagerResponse registerNodeManager(
conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
Assert.assertEquals(NetUtils.getHostPortString(expected),
nodeId.toString());
Assert.assertEquals(5 * 1024, resource.getMemory());
Assert.assertEquals(5 * 1024, resource.getMemorySize());
registeredNodes.add(nodeId);
RegisterNodeManagerResponse response = recordFactory

View File

@ -228,7 +228,7 @@ public void testContainerKillOnMemoryOverflow() throws IOException,
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
Resource r = BuilderUtils.newResource(8 * 1024 * 1024, 1);
Resource r = BuilderUtils.newResource(0, 0);
ContainerTokenIdentifier containerIdentifier =
new ContainerTokenIdentifier(cId, context.getNodeId().toString(), user,
r, System.currentTimeMillis() + 120000, 123, DUMMY_RM_IDENTIFIER,

View File

@ -94,9 +94,9 @@ public String getLiveNodeManagers() {
ni.getNodeManagerVersion());
if(report != null) {
info.put("NumContainers", report.getNumContainers());
info.put("UsedMemoryMB", report.getUsedResource().getMemory());
info.put("UsedMemoryMB", report.getUsedResource().getMemorySize());
info.put("AvailableMemoryMB",
report.getAvailableResource().getMemory());
report.getAvailableResource().getMemorySize());
}
nodesInfo.add(info);

View File

@ -142,7 +142,7 @@ public static void checkSchedContainerChangeRequest(
// example, you cannot request target resource of a <10G, 10> container to
// <20G, 8>
if (increase) {
if (originalResource.getMemory() > targetResource.getMemory()
if (originalResource.getMemorySize() > targetResource.getMemorySize()
|| originalResource.getVirtualCores() > targetResource
.getVirtualCores()) {
String msg =
@ -153,7 +153,7 @@ public static void checkSchedContainerChangeRequest(
throw new InvalidResourceRequestException(msg);
}
} else {
if (originalResource.getMemory() < targetResource.getMemory()
if (originalResource.getMemorySize() < targetResource.getMemorySize()
|| originalResource.getVirtualCores() < targetResource
.getVirtualCores()) {
String msg =
@ -243,15 +243,15 @@ private static void validateIncreaseDecreaseRequest(RMContext rmContext,
return;
}
for (ContainerResourceChangeRequest request : requests) {
if (request.getCapability().getMemory() < 0
|| request.getCapability().getMemory() > maximumAllocation
.getMemory()) {
if (request.getCapability().getMemorySize() < 0
|| request.getCapability().getMemorySize() > maximumAllocation
.getMemorySize()) {
throw new InvalidResourceRequestException("Invalid "
+ (increase ? "increase" : "decrease") + " request"
+ ", requested memory < 0"
+ ", or requested memory > max configured" + ", requestedMemory="
+ request.getCapability().getMemory() + ", maxMemory="
+ maximumAllocation.getMemory());
+ request.getCapability().getMemorySize() + ", maxMemory="
+ maximumAllocation.getMemorySize());
}
if (request.getCapability().getVirtualCores() < 0
|| request.getCapability().getVirtualCores() > maximumAllocation

View File

@ -362,7 +362,7 @@ public RegisterNodeManagerResponse registerNodeManager(
}
// Check if this node has minimum allocations
if (capability.getMemory() < minAllocMb
if (capability.getMemorySize() < minAllocMb
|| capability.getVirtualCores() < minAllocVcores) {
String message =
"NodeManager from " + host

View File

@ -480,7 +480,7 @@ private void publishContainerCreatedEvent(ContainerCreatedEvent event) {
TimelineEntity entity = createContainerEntity(event.getContainerId());
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
event.getAllocatedResource().getMemory());
event.getAllocatedResource().getMemorySize());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
event.getAllocatedResource().getVirtualCores());
entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,

View File

@ -258,17 +258,17 @@ public void deductActuallyToBePreempted(ResourceCalculator rc,
void appendLogString(StringBuilder sb) {
sb.append(queueName).append(", ")
.append(current.getMemory()).append(", ")
.append(current.getMemorySize()).append(", ")
.append(current.getVirtualCores()).append(", ")
.append(pending.getMemory()).append(", ")
.append(pending.getMemorySize()).append(", ")
.append(pending.getVirtualCores()).append(", ")
.append(getGuaranteed().getMemory()).append(", ")
.append(getGuaranteed().getMemorySize()).append(", ")
.append(getGuaranteed().getVirtualCores()).append(", ")
.append(idealAssigned.getMemory()).append(", ")
.append(idealAssigned.getMemorySize()).append(", ")
.append(idealAssigned.getVirtualCores()).append(", ")
.append(toBePreempted.getMemory()).append(", ")
.append(toBePreempted.getMemorySize()).append(", ")
.append(toBePreempted.getVirtualCores() ).append(", ")
.append(actuallyToBePreempted.getMemory()).append(", ")
.append(actuallyToBePreempted.getMemorySize()).append(", ")
.append(actuallyToBePreempted.getVirtualCores());
}

View File

@ -160,7 +160,7 @@ public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) {
}
Resource capToAssign = res.getResourcesAtTime(now);
float targetCapacity = 0f;
if (planResources.getMemory() > 0
if (planResources.getMemorySize() > 0
&& planResources.getVirtualCores() > 0) {
if (shouldResize) {
capToAssign =

View File

@ -246,7 +246,7 @@ private static class IntegralResource {
long vcores;
public IntegralResource(Resource resource) {
this.memory = resource.getMemory();
this.memory = resource.getMemorySize();
this.vcores = resource.getVirtualCores();
}
@ -256,12 +256,12 @@ public IntegralResource(long mem, long vcores) {
}
public void add(Resource r) {
memory += r.getMemory();
memory += r.getMemorySize();
vcores += r.getVirtualCores();
}
public void subtract(Resource r) {
memory -= r.getMemory();
memory -= r.getMemorySize();
vcores -= r.getVirtualCores();
}

View File

@ -106,7 +106,7 @@ private static ReservationDefinitionProto convertToProtoFormat(
public static ResourceProto convertToProtoFormat(Resource e) {
return YarnProtos.ResourceProto.newBuilder()
.setMemory(e.getMemory())
.setMemory(e.getMemorySize())
.setVirtualCores(e.getVirtualCores())
.build();
}

View File

@ -88,7 +88,7 @@ public long setEarliestStartTime(Plan plan,
// Weight = total memory consumption of stage
protected double calcWeight(ReservationRequest stage) {
return (stage.getDuration() * stage.getCapability().getMemory())
return (stage.getDuration() * stage.getCapability().getMemorySize())
* (stage.getNumContainers());
}

View File

@ -712,7 +712,7 @@ private static void updateAttemptMetrics(RMContainerImpl container) {
}
long usedMillis = container.finishTime - container.creationTime;
long memorySeconds = resource.getMemory()
long memorySeconds = resource.getMemorySize()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
long vcoreSeconds = resource.getVirtualCores()
* usedMillis / DateUtils.MILLIS_PER_SECOND;

View File

@ -56,8 +56,8 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
private Resource staleClusterCapacity = null;
// Max allocation
private int maxNodeMemory = -1;
private int maxNodeVCores = -1;
private long maxNodeMemory = -1;
private long maxNodeVCores = -1;
private Resource configuredMaxAllocation;
private boolean forceConfiguredMaxAllocation = true;
private long configuredMaxAllocationWaitTime;
@ -211,7 +211,7 @@ public Resource getMaxAllowedAllocation() {
}
return Resources.createResource(
Math.min(configuredMaxAllocation.getMemory(), maxNodeMemory),
Math.min(configuredMaxAllocation.getMemorySize(), maxNodeMemory),
Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores)
);
} finally {
@ -224,7 +224,7 @@ private void updateMaxResources(SchedulerNode node, boolean add) {
writeLock.lock();
try {
if (add) { // added node
int nodeMemory = totalResource.getMemory();
long nodeMemory = totalResource.getMemorySize();
if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory;
}
@ -233,7 +233,7 @@ private void updateMaxResources(SchedulerNode node, boolean add) {
maxNodeVCores = nodeVCores;
}
} else { // removed node
if (maxNodeMemory == totalResource.getMemory()) {
if (maxNodeMemory == totalResource.getMemorySize()) {
maxNodeMemory = -1;
}
if (maxNodeVCores == totalResource.getVirtualCores()) {

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -59,8 +60,8 @@ public class QueueMetrics implements MetricsSource {
@Metric("# of apps killed") MutableCounterInt appsKilled;
@Metric("# of apps failed") MutableCounterInt appsFailed;
@Metric("Allocated memory in MB") MutableGaugeInt allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
@Metric("Allocated memory in MB") MutableGaugeLong allocatedMB;
@Metric("Allocated CPU in virtual cores") MutableGaugeLong allocatedVCores;
@Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
@Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
@Metric("Aggregate # of allocated node-local containers")
@ -70,13 +71,13 @@ public class QueueMetrics implements MetricsSource {
@Metric("Aggregate # of allocated off-switch containers")
MutableCounterLong aggregateOffSwitchContainersAllocated;
@Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
@Metric("Available memory in MB") MutableGaugeInt availableMB;
@Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
@Metric("Pending memory allocation in MB") MutableGaugeInt pendingMB;
@Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores;
@Metric("Available memory in MB") MutableGaugeLong availableMB;
@Metric("Available CPU in virtual cores") MutableGaugeLong availableVCores;
@Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB;
@Metric("Pending CPU allocation in virtual cores") MutableGaugeLong pendingVCores;
@Metric("# of pending containers") MutableGaugeInt pendingContainers;
@Metric("# of reserved memory in MB") MutableGaugeInt reservedMB;
@Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
@Metric("# of reserved memory in MB") MutableGaugeLong reservedMB;
@Metric("Reserved CPU in virtual cores") MutableGaugeLong reservedVCores;
@Metric("# of reserved containers") MutableGaugeInt reservedContainers;
@Metric("# of active users") MutableGaugeInt activeUsers;
@Metric("# of active applications") MutableGaugeInt activeApplications;
@ -325,7 +326,7 @@ public void moveAppTo(AppSchedulingInfo app) {
* @param limit resource limit
*/
public void setAvailableResourcesToQueue(Resource limit) {
availableMB.set(limit.getMemory());
availableMB.set(limit.getMemorySize());
availableVCores.set(limit.getVirtualCores());
}
@ -362,8 +363,8 @@ public void incrPendingResources(String user, int containers, Resource res) {
private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingMB.incr(res.getMemory() * containers);
pendingVCores.incr(res.getVirtualCores() * containers);
pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCoresSize() * containers);
}
public void decrPendingResources(String user, int containers, Resource res) {
@ -379,8 +380,8 @@ public void decrPendingResources(String user, int containers, Resource res) {
private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
pendingMB.decr(res.getMemory() * containers);
pendingVCores.decr(res.getVirtualCores() * containers);
pendingMB.decr(res.getMemorySize() * containers);
pendingVCores.decr(res.getVirtualCoresSize() * containers);
}
public void incrNodeTypeAggregations(String user, NodeType type) {
@ -407,8 +408,8 @@ public void allocateResources(String user, int containers, Resource res,
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemory() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCoresSize() * containers);
if (decrPending) {
_decrPendingResources(containers, res);
}
@ -428,10 +429,10 @@ public void allocateResources(String user, int containers, Resource res,
* @param res
*/
public void allocateResources(String user, Resource res) {
allocatedMB.incr(res.getMemory());
allocatedMB.incr(res.getMemorySize());
allocatedVCores.incr(res.getVirtualCores());
pendingMB.decr(res.getMemory());
pendingMB.decr(res.getMemorySize());
pendingVCores.decr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user);
@ -446,8 +447,8 @@ public void allocateResources(String user, Resource res) {
public void releaseResources(String user, int containers, Resource res) {
allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemory() * containers);
allocatedVCores.decr(res.getVirtualCores() * containers);
allocatedMB.decr(res.getMemorySize() * containers);
allocatedVCores.decr(res.getVirtualCoresSize() * containers);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(user, containers, res);
@ -464,7 +465,7 @@ public void releaseResources(String user, int containers, Resource res) {
* @param res
*/
public void releaseResources(String user, Resource res) {
allocatedMB.decr(res.getMemory());
allocatedMB.decr(res.getMemorySize());
allocatedVCores.decr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@ -477,7 +478,7 @@ public void releaseResources(String user, Resource res) {
public void reserveResource(String user, Resource res) {
reservedContainers.incr();
reservedMB.incr(res.getMemory());
reservedMB.incr(res.getMemorySize());
reservedVCores.incr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@ -490,7 +491,7 @@ public void reserveResource(String user, Resource res) {
public void unreserveResource(String user, Resource res) {
reservedContainers.decr();
reservedMB.decr(res.getMemory());
reservedMB.decr(res.getMemorySize());
reservedVCores.decr(res.getVirtualCores());
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
@ -563,11 +564,11 @@ public Resource getAllocatedResources() {
return BuilderUtils.newResource(allocatedMB.value(), allocatedVCores.value());
}
public int getAllocatedMB() {
public long getAllocatedMB() {
return allocatedMB.value();
}
public int getAllocatedVirtualCores() {
public long getAllocatedVirtualCores() {
return allocatedVCores.value();
}
@ -575,19 +576,19 @@ public int getAllocatedContainers() {
return allocatedContainers.value();
}
public int getAvailableMB() {
public long getAvailableMB() {
return availableMB.value();
}
public int getAvailableVirtualCores() {
public long getAvailableVirtualCores() {
return availableVCores.value();
}
public int getPendingMB() {
public long getPendingMB() {
return pendingMB.value();
}
public int getPendingVirtualCores() {
public long getPendingVirtualCores() {
return pendingVCores.value();
}
@ -595,11 +596,11 @@ public int getPendingContainers() {
return pendingContainers.value();
}
public int getReservedMB() {
public long getReservedMB() {
return reservedMB.value();
}
public int getReservedVirtualCores() {
public long getReservedVirtualCores() {
return reservedVCores.value();
}

View File

@ -444,7 +444,7 @@ public synchronized void setHeadroom(Resource globalLimit) {
*/
public synchronized Resource getHeadroom() {
// Corner case to deal with applications being slightly over-limit
if (resourceLimit.getMemory() < 0) {
if (resourceLimit.getMemorySize() < 0) {
resourceLimit.setMemory(0);
}
@ -480,7 +480,7 @@ public synchronized void showRequests() {
if (requests != null) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " headRoom=" + getHeadroom() + " currentConsumption="
+ attemptResourceUsage.getUsed().getMemory());
+ attemptResourceUsage.getUsed().getMemorySize());
for (ResourceRequest request : requests.values()) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " request=" + request);
@ -682,7 +682,7 @@ synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
for (RMContainer rmContainer : this.liveContainers.values()) {
long usedMillis = currentTimeMillis - rmContainer.getCreationTime();
Resource resource = rmContainer.getContainer().getResource();
memorySeconds += resource.getMemory() * usedMillis /
memorySeconds += resource.getMemorySize() * usedMillis /
DateUtils.MILLIS_PER_SECOND;
vcoreSeconds += resource.getVirtualCores() * usedMillis
/ DateUtils.MILLIS_PER_SECOND;

View File

@ -274,13 +274,13 @@ public static void normalizeAndvalidateRequest(ResourceRequest resReq,
private static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
throws InvalidResourceRequestException {
if (resReq.getCapability().getMemory() < 0 ||
resReq.getCapability().getMemory() > maximumResource.getMemory()) {
if (resReq.getCapability().getMemorySize() < 0 ||
resReq.getCapability().getMemorySize() > maximumResource.getMemorySize()) {
throw new InvalidResourceRequestException("Invalid resource request"
+ ", requested memory < 0"
+ ", or requested memory > max configured"
+ ", requestedMemory=" + resReq.getCapability().getMemory()
+ ", maxMemory=" + maximumResource.getMemory());
+ ", requestedMemory=" + resReq.getCapability().getMemorySize()
+ ", maxMemory=" + maximumResource.getMemorySize());
}
if (resReq.getCapability().getVirtualCores() < 0 ||
resReq.getCapability().getVirtualCores() >

View File

@ -23,7 +23,7 @@
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -32,37 +32,37 @@
public class CSQueueMetrics extends QueueMetrics {
@Metric("AM memory limit in MB")
MutableGaugeInt AMResourceLimitMB;
MutableGaugeLong AMResourceLimitMB;
@Metric("AM CPU limit in virtual cores")
MutableGaugeInt AMResourceLimitVCores;
MutableGaugeLong AMResourceLimitVCores;
@Metric("Used AM memory limit in MB")
MutableGaugeInt usedAMResourceMB;
MutableGaugeLong usedAMResourceMB;
@Metric("Used AM CPU limit in virtual cores")
MutableGaugeInt usedAMResourceVCores;
MutableGaugeLong usedAMResourceVCores;
CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
super(ms, queueName, parent, enableUserMetrics, conf);
}
public int getAMResourceLimitMB() {
public long getAMResourceLimitMB() {
return AMResourceLimitMB.value();
}
public int getAMResourceLimitVCores() {
public long getAMResourceLimitVCores() {
return AMResourceLimitVCores.value();
}
public int getUsedAMResourceMB() {
public long getUsedAMResourceMB() {
return usedAMResourceMB.value();
}
public int getUsedAMResourceVCores() {
public long getUsedAMResourceVCores() {
return usedAMResourceVCores.value();
}
public void setAMResouceLimit(Resource res) {
AMResourceLimitMB.set(res.getMemory());
AMResourceLimitMB.set(res.getMemorySize());
AMResourceLimitVCores.set(res.getVirtualCores());
}
@ -74,7 +74,7 @@ public void setAMResouceLimitForUser(String user, Resource res) {
}
public void incAMUsed(String user, Resource res) {
usedAMResourceMB.incr(res.getMemory());
usedAMResourceMB.incr(res.getMemorySize());
usedAMResourceVCores.incr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {
@ -83,7 +83,7 @@ public void incAMUsed(String user, Resource res) {
}
public void decAMUsed(String user, Resource res) {
usedAMResourceMB.decr(res.getMemory());
usedAMResourceMB.decr(res.getMemorySize());
usedAMResourceVCores.decr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {

View File

@ -65,7 +65,7 @@ public Resource getHeadroom() {
}
}
// Corner case to deal with applications being slightly over-limit
if (headroom.getMemory() < 0) {
if (headroom.getMemorySize() < 0) {
headroom.setMemory(0);
}
return headroom;

View File

@ -677,7 +677,7 @@ public Resource getMaximumAllocation() {
*/
public Resource getMaximumAllocationPerQueue(String queue) {
String queuePrefix = getQueuePrefix(queue);
int maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB,
long maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB,
(int)UNDEFINED);
int maxAllocationVcoresPerQueue = getInt(
queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED);
@ -690,7 +690,7 @@ public Resource getMaximumAllocationPerQueue(String queue) {
Resource clusterMax = getMaximumAllocation();
if (maxAllocationMbPerQueue == (int)UNDEFINED) {
LOG.info("max alloc mb per queue for " + queue + " is undefined");
maxAllocationMbPerQueue = clusterMax.getMemory();
maxAllocationMbPerQueue = clusterMax.getMemorySize();
}
if (maxAllocationVcoresPerQueue == (int)UNDEFINED) {
LOG.info("max alloc vcore per queue for " + queue + " is undefined");
@ -698,7 +698,7 @@ public Resource getMaximumAllocationPerQueue(String queue) {
}
Resource result = Resources.createResource(maxAllocationMbPerQueue,
maxAllocationVcoresPerQueue);
if (maxAllocationMbPerQueue > clusterMax.getMemory()
if (maxAllocationMbPerQueue > clusterMax.getMemorySize()
|| maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) {
throw new IllegalArgumentException(
"Queue maximum allocation cannot be larger than the cluster setting"

View File

@ -449,7 +449,7 @@ public synchronized void reinitialize(
// since we have already told running AM's the size
Resource oldMax = getMaximumAllocation();
Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
if (newMax.getMemory() < oldMax.getMemory()
if (newMax.getMemorySize() < oldMax.getMemorySize()
|| newMax.getVirtualCores() < oldMax.getVirtualCores()) {
throw new IOException(
"Trying to reinitialize "

View File

@ -442,7 +442,7 @@ private ContainerAllocation assignContainer(Resource clusterResource,
priority, capability);
// Can we allocate a container on this node?
int availableContainers =
long availableContainers =
rc.computeAvailableContainers(available, capability);
// How much need to unreserve equals to:

View File

@ -193,7 +193,7 @@ private void subtractResourcesOnBlacklistedNodes(
Resources.subtractFrom(availableResources,
node.getUnallocatedResource());
}
if (availableResources.getMemory() < 0) {
if (availableResources.getMemorySize() < 0) {
availableResources.setMemory(0);
}
if (availableResources.getVirtualCores() < 0) {

View File

@ -128,7 +128,7 @@ public void updatePreemptionVariables() {
public Resource getDemand() {
readLock.lock();
try {
return Resource.newInstance(demand.getMemory(), demand.getVirtualCores());
return Resource.newInstance(demand.getMemorySize(), demand.getVirtualCores());
} finally {
readLock.unlock();
}

View File

@ -135,18 +135,18 @@ public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(getQueueName());
if (scheduler.getClusterResource().getMemory() == 0) {
if (scheduler.getClusterResource().getMemorySize() == 0) {
queueInfo.setCapacity(0.0f);
} else {
queueInfo.setCapacity((float) getFairShare().getMemory() /
scheduler.getClusterResource().getMemory());
queueInfo.setCapacity((float) getFairShare().getMemorySize() /
scheduler.getClusterResource().getMemorySize());
}
if (getFairShare().getMemory() == 0) {
if (getFairShare().getMemorySize() == 0) {
queueInfo.setCurrentCapacity(0.0f);
} else {
queueInfo.setCurrentCapacity((float) getResourceUsage().getMemory() /
getFairShare().getMemory());
queueInfo.setCurrentCapacity((float) getResourceUsage().getMemorySize() /
getFairShare().getMemorySize());
}
ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -31,14 +32,14 @@
@Metrics(context="yarn")
public class FSQueueMetrics extends QueueMetrics {
@Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB;
@Metric("Fair share of CPU in vcores") MutableGaugeInt fairShareVCores;
@Metric("Steady fair share of memory in MB") MutableGaugeInt steadyFairShareMB;
@Metric("Steady fair share of CPU in vcores") MutableGaugeInt steadyFairShareVCores;
@Metric("Minimum share of memory in MB") MutableGaugeInt minShareMB;
@Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores;
@Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB;
@Metric("Maximum share of CPU in vcores") MutableGaugeInt maxShareVCores;
@Metric("Fair share of memory in MB") MutableGaugeLong fairShareMB;
@Metric("Fair share of CPU in vcores") MutableGaugeLong fairShareVCores;
@Metric("Steady fair share of memory in MB") MutableGaugeLong steadyFairShareMB;
@Metric("Steady fair share of CPU in vcores") MutableGaugeLong steadyFairShareVCores;
@Metric("Minimum share of memory in MB") MutableGaugeLong minShareMB;
@Metric("Minimum share of CPU in vcores") MutableGaugeLong minShareVCores;
@Metric("Maximum share of memory in MB") MutableGaugeLong maxShareMB;
@Metric("Maximum share of CPU in vcores") MutableGaugeLong maxShareVCores;
@Metric("Maximum number of applications") MutableGaugeInt maxApps;
private String schedulingPolicy;
@ -49,54 +50,54 @@ public class FSQueueMetrics extends QueueMetrics {
}
public void setFairShare(Resource resource) {
fairShareMB.set(resource.getMemory());
fairShareMB.set(resource.getMemorySize());
fairShareVCores.set(resource.getVirtualCores());
}
public int getFairShareMB() {
public long getFairShareMB() {
return fairShareMB.value();
}
public int getFairShareVirtualCores() {
public long getFairShareVirtualCores() {
return fairShareVCores.value();
}
public void setSteadyFairShare(Resource resource) {
steadyFairShareMB.set(resource.getMemory());
steadyFairShareMB.set(resource.getMemorySize());
steadyFairShareVCores.set(resource.getVirtualCores());
}
public int getSteadyFairShareMB() {
public long getSteadyFairShareMB() {
return steadyFairShareMB.value();
}
public int getSteadyFairShareVCores() {
public long getSteadyFairShareVCores() {
return steadyFairShareVCores.value();
}
public void setMinShare(Resource resource) {
minShareMB.set(resource.getMemory());
minShareMB.set(resource.getMemorySize());
minShareVCores.set(resource.getVirtualCores());
}
public int getMinShareMB() {
public long getMinShareMB() {
return minShareMB.value();
}
public int getMinShareVirtualCores() {
public long getMinShareVirtualCores() {
return minShareVCores.value();
}
public void setMaxShare(Resource resource) {
maxShareMB.set(resource.getMemory());
maxShareMB.set(resource.getMemorySize());
maxShareVCores.set(resource.getVirtualCores());
}
public int getMaxShareMB() {
public long getMaxShareMB() {
return maxShareMB.value();
}
public int getMaxShareVirtualCores() {
public long getMaxShareVirtualCores() {
return maxShareVCores.value();
}

View File

@ -472,7 +472,7 @@ protected void preemptResources(Resource toPreempt) {
}
private boolean isResourceGreaterThanNone(Resource toPreempt) {
return (toPreempt.getMemory() > 0) || (toPreempt.getVirtualCores() > 0);
return (toPreempt.getMemorySize() > 0) || (toPreempt.getVirtualCores() > 0);
}
protected void warnOrKillContainer(RMContainer container) {
@ -559,7 +559,7 @@ public synchronized ResourceWeights getAppWeight(FSAppAttempt app) {
double weight = 1.0;
if (sizeBasedWeight) {
// Set weight based on current memory demand
weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
weight = Math.log1p(app.getDemand().getMemorySize()) / Math.log(2);
}
weight *= app.getPriority().getPriority();
if (weightAdjuster != null) {
@ -1214,7 +1214,7 @@ private boolean shouldAttemptPreemption() {
if (preemptionEnabled) {
Resource clusterResource = getClusterResource();
return (preemptionUtilizationThreshold < Math.max(
(float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
(float) rootMetrics.getAllocatedMB() / clusterResource.getMemorySize(),
(float) rootMetrics.getAllocatedVirtualCores() /
clusterResource.getVirtualCores()));
}

View File

@ -124,15 +124,15 @@ private static void computeSharesInternal(
// have met all Schedulables' max shares.
int totalMaxShare = 0;
for (Schedulable sched : schedulables) {
int maxShare = getResourceValue(sched.getMaxShare(), type);
totalMaxShare = (int) Math.min((long)maxShare + (long)totalMaxShare,
long maxShare = getResourceValue(sched.getMaxShare(), type);
totalMaxShare = (int) Math.min(maxShare + (long)totalMaxShare,
Integer.MAX_VALUE);
if (totalMaxShare == Integer.MAX_VALUE) {
break;
}
}
int totalResource = Math.max((getResourceValue(totalResources, type) -
long totalResource = Math.max((getResourceValue(totalResources, type) -
takenResources), 0);
totalResource = Math.min(totalMaxShare, totalResource);
@ -207,7 +207,7 @@ private static int handleFixedFairShares(
int totalResource = 0;
for (Schedulable sched : schedulables) {
int fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
long fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
if (fixedShare < 0) {
nonFixedSchedulables.add(sched);
} else {
@ -229,7 +229,7 @@ private static int handleFixedFairShares(
* The fairshare is fixed if either the maxShare is 0, weight is 0,
* or the Schedulable is not active for instantaneous fairshare.
*/
private static int getFairShareIfFixed(Schedulable sched,
private static long getFairShareIfFixed(Schedulable sched,
boolean isSteadyShare, ResourceType type) {
// Check if maxShare is 0
@ -245,17 +245,17 @@ private static int getFairShareIfFixed(Schedulable sched,
// Check if weight is 0
if (sched.getWeights().getWeight(type) <= 0) {
int minShare = getResourceValue(sched.getMinShare(), type);
long minShare = getResourceValue(sched.getMinShare(), type);
return (minShare <= 0) ? 0 : minShare;
}
return -1;
}
private static int getResourceValue(Resource resource, ResourceType type) {
private static long getResourceValue(Resource resource, ResourceType type) {
switch (type) {
case MEMORY:
return resource.getMemory();
return resource.getMemorySize();
case CPU:
return resource.getVirtualCores();
default:
@ -263,7 +263,7 @@ private static int getResourceValue(Resource resource, ResourceType type) {
}
}
private static void setResourceValue(int val, Resource resource, ResourceType type) {
private static void setResourceValue(long val, Resource resource, ResourceType type) {
switch (type) {
case MEMORY:
resource.setMemory(val);

View File

@ -101,13 +101,13 @@ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMRes
@Override
public Resource getHeadroom(Resource queueFairShare, Resource queueUsage,
Resource maxAvailable) {
int queueAvailableMemory =
Math.max(queueFairShare.getMemory() - queueUsage.getMemory(), 0);
long queueAvailableMemory =
Math.max(queueFairShare.getMemorySize() - queueUsage.getMemorySize(), 0);
int queueAvailableCPU =
Math.max(queueFairShare.getVirtualCores() - queueUsage
.getVirtualCores(), 0);
Resource headroom = Resources.createResource(
Math.min(maxAvailable.getMemory(), queueAvailableMemory),
Math.min(maxAvailable.getMemorySize(), queueAvailableMemory),
Math.min(maxAvailable.getVirtualCores(),
queueAvailableCPU));
return headroom;
@ -180,8 +180,8 @@ public int compare(Schedulable s1, Schedulable s2) {
*/
void calculateShares(Resource resource, Resource pool,
ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) {
shares.setWeight(MEMORY, (float)resource.getMemory() /
(pool.getMemory() * weights.getWeight(MEMORY)));
shares.setWeight(MEMORY, (float)resource.getMemorySize() /
(pool.getMemorySize() * weights.getWeight(MEMORY)));
shares.setWeight(CPU, (float)resource.getVirtualCores() /
(pool.getVirtualCores() * weights.getWeight(CPU)));
// sort order vector by resource share

View File

@ -82,13 +82,13 @@ public int compare(Schedulable s1, Schedulable s2) {
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
minShareRatio1 = (double) s1.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
useToWeightRatio1 = s1.getResourceUsage().getMemory() /
minShareRatio1 = (double) s1.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize();
minShareRatio2 = (double) s2.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize();
useToWeightRatio1 = s1.getResourceUsage().getMemorySize() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemory() /
useToWeightRatio2 = s2.getResourceUsage().getMemorySize() /
s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
@ -124,10 +124,10 @@ public ResourceCalculator getResourceCalculator() {
@Override
public Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource maxAvailable) {
int queueAvailableMemory = Math.max(
queueFairShare.getMemory() - queueUsage.getMemory(), 0);
long queueAvailableMemory = Math.max(
queueFairShare.getMemorySize() - queueUsage.getMemorySize(), 0);
Resource headroom = Resources.createResource(
Math.min(maxAvailable.getMemory(), queueAvailableMemory),
Math.min(maxAvailable.getMemorySize(), queueAvailableMemory),
maxAvailable.getVirtualCores());
return headroom;
}
@ -152,7 +152,7 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
@Override
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
return usage.getMemory() > maxAMResource.getMemory();
return usage.getMemorySize() > maxAMResource.getMemorySize();
}
@Override

View File

@ -115,16 +115,16 @@ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
@Override
public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
return usage.getMemory() > maxAMResource.getMemory();
return usage.getMemorySize() > maxAMResource.getMemorySize();
}
@Override
public Resource getHeadroom(Resource queueFairShare,
Resource queueUsage, Resource maxAvailable) {
int queueAvailableMemory = Math.max(
queueFairShare.getMemory() - queueUsage.getMemory(), 0);
long queueAvailableMemory = Math.max(
queueFairShare.getMemorySize() - queueUsage.getMemorySize(), 0);
Resource headroom = Resources.createResource(
Math.min(maxAvailable.getMemory(), queueAvailableMemory),
Math.min(maxAvailable.getMemorySize(), queueAvailableMemory),
maxAvailable.getVirtualCores());
return headroom;
}

View File

@ -143,11 +143,11 @@ public QueueInfo getQueueInfo(
queueInfo.setQueueName(DEFAULT_QUEUE.getQueueName());
queueInfo.setCapacity(1.0f);
Resource clusterResource = getClusterResource();
if (clusterResource.getMemory() == 0) {
if (clusterResource.getMemorySize() == 0) {
queueInfo.setCurrentCapacity(0.0f);
} else {
queueInfo.setCurrentCapacity((float) usedResource.getMemory()
/ clusterResource.getMemory());
queueInfo.setCurrentCapacity((float) usedResource.getMemorySize()
/ clusterResource.getMemorySize());
}
queueInfo.setMaximumCapacity(1.0f);
queueInfo.setChildQueues(new ArrayList<QueueInfo>());
@ -697,8 +697,9 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application
Resource capability = request.getCapability();
// TODO: A buggy application with this zero would crash the scheduler.
int availableContainers = node.getUnallocatedResource().getMemory() /
capability.getMemory();
int availableContainers =
(int) (node.getUnallocatedResource().getMemorySize() /
capability.getMemorySize());
int assignedContainers =
Math.min(assignableContainers, availableContainers);

View File

@ -23,7 +23,6 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
/**
@ -67,10 +66,10 @@ public FairOrderingPolicy() {
private double getMagnitude(SchedulableEntity r) {
double mag = r.getSchedulingResourceUsage().getCachedUsed(
CommonNodeLabelsManager.ANY).getMemory();
CommonNodeLabelsManager.ANY).getMemorySize();
if (sizeBasedWeight) {
double weight = Math.log1p(r.getSchedulingResourceUsage().getCachedDemand(
CommonNodeLabelsManager.ANY).getMemory()) / Math.log(2);
CommonNodeLabelsManager.ANY).getMemorySize()) / Math.log(2);
mag = mag / weight;
}
return mag;

View File

@ -54,8 +54,8 @@ static class QueueInfoBlock extends HtmlBlock {
@Override public void render(Block html) {
info("\'" + sinfo.getQueueName() + "\' Queue Status").
_("Queue State:" , sinfo.getState()).
_("Minimum Queue Memory Capacity:" , Integer.toString(sinfo.getMinQueueMemoryCapacity())).
_("Maximum Queue Memory Capacity:" , Integer.toString(sinfo.getMaxQueueMemoryCapacity())).
_("Minimum Queue Memory Capacity:" , Long.toString(sinfo.getMinQueueMemoryCapacity())).
_("Maximum Queue Memory Capacity:" , Long.toString(sinfo.getMaxQueueMemoryCapacity())).
_("Number of Nodes:" , Integer.toString(sinfo.getNumNodes())).
_("Used Node Capacity:" , Integer.toString(sinfo.getUsedNodeCapacity())).
_("Available Node Capacity:" , Integer.toString(sinfo.getAvailNodeCapacity())).

View File

@ -115,7 +115,7 @@ public FairSchedulerAppsBlock(ResourceManager rm, ViewContext ctx,
AppInfo appInfo = new AppInfo(rm, app, true, WebAppUtils.getHttpSchemePrefix(conf));
String percent = StringUtils.format("%.1f", appInfo.getProgress());
ApplicationAttemptId attemptId = app.getCurrentAppAttempt().getAppAttemptId();
int fairShare = fsinfo.getAppFairShare(attemptId);
long fairShare = fsinfo.getAppFairShare(attemptId);
if (fairShare == FairSchedulerInfo.INVALID_FAIR_SHARE) {
// FairScheduler#applications don't have the entry. Skip it.
continue;

View File

@ -1514,14 +1514,14 @@ protected Resource createAppSubmissionContextResource(
String msg = "Requested more cores than configured max";
throw new BadRequestException(msg);
}
if (newApp.getResource().getMemory() > rm.getConfig().getInt(
if (newApp.getResource().getMemorySize() > rm.getConfig().getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)) {
String msg = "Requested more memory than configured max";
throw new BadRequestException(msg);
}
Resource r =
Resource.newInstance(newApp.getResource().getMemory(), newApp
Resource.newInstance(newApp.getResource().getMemorySize(), newApp
.getResource().getvCores());
return r;
}
@ -2012,7 +2012,7 @@ private ReservationSubmissionRequest createReservationSubmissionRequest(
.getReservationRequest()) {
ResourceInfo rInfo = resReqInfo.getCapability();
Resource capability =
Resource.newInstance(rInfo.getMemory(), rInfo.getvCores());
Resource.newInstance(rInfo.getMemorySize(), rInfo.getvCores());
int numContainers = resReqInfo.getNumContainers();
int minConcurrency = resReqInfo.getMinConcurrency();
long duration = resReqInfo.getDuration();
@ -2125,7 +2125,7 @@ private ReservationUpdateRequest createReservationUpdateRequest(
.getReservationRequest()) {
ResourceInfo rInfo = resReqInfo.getCapability();
Resource capability =
Resource.newInstance(rInfo.getMemory(), rInfo.getvCores());
Resource.newInstance(rInfo.getMemorySize(), rInfo.getvCores());
int numContainers = resReqInfo.getNumContainers();
int minConcurrency = resReqInfo.getMinConcurrency();
long duration = resReqInfo.getDuration();

View File

@ -82,8 +82,8 @@ public class AppInfo {
protected long elapsedTime;
protected String amContainerLogs;
protected String amHostHttpAddress;
protected int allocatedMB;
protected int allocatedVCores;
protected long allocatedMB;
protected long allocatedVCores;
protected int runningContainers;
protected long memorySeconds;
protected long vcoreSeconds;
@ -91,8 +91,8 @@ public class AppInfo {
protected float clusterUsagePercentage;
// preemption info fields
protected int preemptedResourceMB;
protected int preemptedResourceVCores;
protected long preemptedResourceMB;
protected long preemptedResourceVCores;
protected int numNonAMContainerPreempted;
protected int numAMContainerPreempted;
@ -174,7 +174,7 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess,
.getApplicationResourceUsageReport();
if (resourceReport != null) {
Resource usedResources = resourceReport.getUsedResources();
allocatedMB = usedResources.getMemory();
allocatedMB = usedResources.getMemorySize();
allocatedVCores = usedResources.getVirtualCores();
runningContainers = resourceReport.getNumUsedContainers();
queueUsagePercentage = resourceReport.getQueueUsagePercentage();
@ -190,7 +190,7 @@ public AppInfo(ResourceManager rm, RMApp app, Boolean hasAccess,
numAMContainerPreempted =
appMetrics.getNumAMContainersPreempted();
preemptedResourceMB =
appMetrics.getResourcePreempted().getMemory();
appMetrics.getResourcePreempted().getMemorySize();
numNonAMContainerPreempted =
appMetrics.getNumNonAMContainersPreempted();
preemptedResourceVCores =
@ -302,19 +302,19 @@ public int getRunningContainers() {
return this.runningContainers;
}
public int getAllocatedMB() {
public long getAllocatedMB() {
return this.allocatedMB;
}
public int getAllocatedVCores() {
public long getAllocatedVCores() {
return this.allocatedVCores;
}
public int getPreemptedMB() {
public long getPreemptedMB() {
return preemptedResourceMB;
}
public int getPreemptedVCores() {
public long getPreemptedVCores() {
return preemptedResourceVCores;
}

View File

@ -54,10 +54,10 @@ public FairSchedulerInfo(FairScheduler fs) {
* <code>FairSchedulerInfo#INVALID_FAIR_SHARE</code> if the scheduler does
* not know about this application attempt.
*/
public int getAppFairShare(ApplicationAttemptId appAttemptId) {
public long getAppFairShare(ApplicationAttemptId appAttemptId) {
FSAppAttempt fsAppAttempt = scheduler.getSchedulerApp(appAttemptId);
return fsAppAttempt == null ?
INVALID_FAIR_SHARE : fsAppAttempt.getFairShare().getMemory();
INVALID_FAIR_SHARE : fsAppAttempt.getFairShare().getMemorySize();
}
public FairSchedulerQueueInfo getRootQueueInfo() {

View File

@ -83,8 +83,8 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
usedResources = new ResourceInfo(queue.getResourceUsage());
demandResources = new ResourceInfo(queue.getDemand());
fractionMemUsed = (float)usedResources.getMemory() /
clusterResources.getMemory();
fractionMemUsed = (float)usedResources.getMemorySize() /
clusterResources.getMemorySize();
steadyFairResources = new ResourceInfo(queue.getSteadyFairShare());
fairResources = new ResourceInfo(queue.getFairShare());
@ -95,11 +95,11 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
scheduler.getClusterResource()));
fractionMemSteadyFairShare =
(float)steadyFairResources.getMemory() / clusterResources.getMemory();
fractionMemFairShare = (float) fairResources.getMemory()
/ clusterResources.getMemory();
fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory();
fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory();
(float)steadyFairResources.getMemorySize() / clusterResources.getMemorySize();
fractionMemFairShare = (float) fairResources.getMemorySize()
/ clusterResources.getMemorySize();
fractionMemMinShare = (float)minResources.getMemorySize() / clusterResources.getMemorySize();
fractionMemMaxShare = (float)maxResources.getMemorySize() / clusterResources.getMemorySize();
maxApps = allocConf.getQueueMaxApps(queueName);

View File

@ -40,8 +40,8 @@ public class FifoSchedulerInfo extends SchedulerInfo {
protected float capacity;
protected float usedCapacity;
protected QueueState qstate;
protected int minQueueMemoryCapacity;
protected int maxQueueMemoryCapacity;
protected long minQueueMemoryCapacity;
protected long maxQueueMemoryCapacity;
protected int numNodes;
protected int usedNodeCapacity;
protected int availNodeCapacity;
@ -67,8 +67,8 @@ public FifoSchedulerInfo(final ResourceManager rm) {
this.usedCapacity = qInfo.getCurrentCapacity();
this.capacity = qInfo.getCapacity();
this.minQueueMemoryCapacity = fs.getMinimumResourceCapability().getMemory();
this.maxQueueMemoryCapacity = fs.getMaximumResourceCapability().getMemory();
this.minQueueMemoryCapacity = fs.getMinimumResourceCapability().getMemorySize();
this.maxQueueMemoryCapacity = fs.getMaximumResourceCapability().getMemorySize();
this.qstate = qInfo.getQueueState();
this.numNodes = rmContext.getRMNodes().size();
@ -79,9 +79,9 @@ public FifoSchedulerInfo(final ResourceManager rm) {
for (RMNode ni : rmContext.getRMNodes().values()) {
SchedulerNodeReport report = fs.getNodeReport(ni.getNodeID());
this.usedNodeCapacity += report.getUsedResource().getMemory();
this.availNodeCapacity += report.getAvailableResource().getMemory();
this.totalNodeCapacity += ni.getTotalCapability().getMemory();
this.usedNodeCapacity += report.getUsedResource().getMemorySize();
this.availNodeCapacity += report.getAvailableResource().getMemorySize();
this.totalNodeCapacity += ni.getTotalCapability().getMemorySize();
this.numContainers += fs.getNodeReport(ni.getNodeID()).getNumContainers();
}
}
@ -114,11 +114,11 @@ public String getQueueName() {
return this.qName;
}
public int getMinQueueMemoryCapacity() {
public long getMinQueueMemoryCapacity() {
return this.minQueueMemoryCapacity;
}
public int getMaxQueueMemoryCapacity() {
public long getMaxQueueMemoryCapacity() {
return this.maxQueueMemoryCapacity;
}

View File

@ -63,8 +63,8 @@ public NodeInfo(RMNode ni, ResourceScheduler sched) {
this.availMemoryMB = 0;
if (report != null) {
this.numContainers = report.getNumContainers();
this.usedMemoryMB = report.getUsedResource().getMemory();
this.availMemoryMB = report.getAvailableResource().getMemory();
this.usedMemoryMB = report.getUsedResource().getMemorySize();
this.availMemoryMB = report.getAvailableResource().getMemorySize();
this.usedVirtualCores = report.getUsedResource().getVirtualCores();
this.availableVirtualCores = report.getAvailableResource().getVirtualCores();
}

View File

@ -27,22 +27,22 @@
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class ResourceInfo {
int memory;
int vCores;
long memory;
long vCores;
public ResourceInfo() {
}
public ResourceInfo(Resource res) {
memory = res.getMemory();
memory = res.getMemorySize();
vCores = res.getVirtualCores();
}
public int getMemory() {
public long getMemorySize() {
return memory;
}
public int getvCores() {
public long getvCores() {
return vCores;
}

View File

@ -194,7 +194,7 @@ public synchronized void addResourceRequestSpec(
Resource currentSpec = requestSpec.put(priority, capability);
if (currentSpec != null) {
throw new IllegalStateException("Resource spec already exists for " +
"priority " + priority.getPriority() + " - " + currentSpec.getMemory());
"priority " + priority.getPriority() + " - " + currentSpec.getMemorySize());
}
}

View File

@ -50,7 +50,7 @@ public class MockNM {
private int responseId;
private NodeId nodeId;
private int memory;
private long memory;
private int vCores;
private ResourceTrackerService resourceTracker;
private int httpPort = 2;
@ -144,7 +144,7 @@ public RegisterNodeManagerResponse registerNode(
this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
Resource newResource = registrationResponse.getResource();
if (newResource != null) {
memory = newResource.getMemory();
memory = (int) newResource.getMemorySize();
vCores = newResource.getVirtualCores();
}
return registrationResponse;
@ -219,14 +219,14 @@ public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
Resource newResource = heartbeatResponse.getResource();
if (newResource != null) {
memory = newResource.getMemory();
memory = newResource.getMemorySize();
vCores = newResource.getVirtualCores();
}
return heartbeatResponse;
}
public int getMemory() {
public long getMemory() {
return memory;
}

View File

@ -89,13 +89,13 @@ public static Resource newResource(int mem) {
public static Resource newUsedResource(Resource total) {
Resource rs = recordFactory.newRecordInstance(Resource.class);
rs.setMemory((int)(Math.random() * total.getMemory()));
rs.setMemory((int)(Math.random() * total.getMemorySize()));
return rs;
}
public static Resource newAvailResource(Resource total, Resource used) {
Resource rs = recordFactory.newRecordInstance(Resource.class);
rs.setMemory(total.getMemory() - used.getMemory());
rs.setMemory(total.getMemorySize() - used.getMemorySize());
return rs;
}

View File

@ -213,12 +213,12 @@ synchronized public StartContainersResponse startContainers(
synchronized public void checkResourceUsage() {
LOG.info("Checking resource usage for " + containerManagerAddress);
Assert.assertEquals(available.getMemory(),
Assert.assertEquals(available.getMemorySize(),
resourceManager.getResourceScheduler().getNodeReport(
this.nodeId).getAvailableResource().getMemory());
Assert.assertEquals(used.getMemory(),
this.nodeId).getAvailableResource().getMemorySize());
Assert.assertEquals(used.getMemorySize(),
resourceManager.getResourceScheduler().getNodeReport(
this.nodeId).getUsedResource().getMemory());
this.nodeId).getUsedResource().getMemorySize());
}
@Override

View File

@ -488,7 +488,7 @@ private void registerNode(String host, int memory, int vCores) throws
private boolean checkCapacity(Collection<Plan> plans) {
for (Plan plan : plans) {
if (plan.getTotalCapacity().getMemory() > 0) {
if (plan.getTotalCapacity().getMemorySize() > 0) {
return true;
}
}

View File

@ -387,11 +387,11 @@ private void verifyEnemyAppReport(ApplicationReport appReport) {
Assert.assertEquals("Enemy should not see app reserved containers",
-1, usageReport.getNumReservedContainers());
Assert.assertEquals("Enemy should not see app used resources",
-1, usageReport.getUsedResources().getMemory());
-1, usageReport.getUsedResources().getMemorySize());
Assert.assertEquals("Enemy should not see app reserved resources",
-1, usageReport.getReservedResources().getMemory());
-1, usageReport.getReservedResources().getMemorySize());
Assert.assertEquals("Enemy should not see app needed resources",
-1, usageReport.getNeededResources().getMemory());
-1, usageReport.getNeededResources().getMemorySize());
}
private void verifyInvalidQueueWithAcl() throws Exception {

View File

@ -534,7 +534,7 @@ public void testProcessingNMContainerStatusesOnNMRestart() throws Exception {
// 4. Verify Memory Usage by cluster, it should be 3072. AM memory +
// requested memory. 1024 + 2048=3072
ResourceScheduler rs = rm1.getRMContext().getScheduler();
int allocatedMB = rs.getRootQueueMetrics().getAllocatedMB();
long allocatedMB = rs.getRootQueueMetrics().getAllocatedMB();
Assert.assertEquals(amMemory + containerMemory, allocatedMB);
// 5. Re-register NM by sending completed container status

View File

@ -420,7 +420,7 @@ private AggregateAppResourceUsage calculateContainerResourceMetrics(
Resource resource = rmContainer.getContainer().getResource();
long usedMillis =
rmContainer.getFinishTime() - rmContainer.getCreationTime();
long memorySeconds = resource.getMemory()
long memorySeconds = resource.getMemorySize()
* usedMillis / DateUtils.MILLIS_PER_SECOND;
long vcoreSeconds = resource.getVirtualCores()
* usedMillis / DateUtils.MILLIS_PER_SECOND;

View File

@ -161,7 +161,7 @@ public Configuration getYarnConfiguration() {
Assert.assertEquals(4,
dsRegResp.getMaxAllocatableCapabilty().getVirtualCores());
Assert.assertEquals(1024,
dsRegResp.getMinAllocatableCapabilty().getMemory());
dsRegResp.getMinAllocatableCapabilty().getMemorySize());
Assert.assertEquals(2,
dsRegResp.getIncrAllocatableCapabilty().getVirtualCores());

View File

@ -21,7 +21,6 @@
import org.junit.Before;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.util.ArrayList;
@ -45,7 +44,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -61,8 +59,6 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -79,8 +75,6 @@
import org.apache.log4j.Logger;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRM extends ParameterizedSchedulerTestBase {
@ -112,7 +106,7 @@ public void testGetNewAppId() throws Exception {
GetNewApplicationResponse resp = rm.getNewAppId();
assert (resp.getApplicationId().getId() != 0);
assert (resp.getMaximumResourceCapability().getMemory() > 0);
assert (resp.getMaximumResourceCapability().getMemorySize() > 0);
rm.stop();
}

View File

@ -711,7 +711,7 @@ public void innerTestHAWithRMHostName(boolean includeBindHost) {
}
private void verifyClusterMetrics(int activeNodes, int appsSubmitted,
int appsPending, int containersPending, int availableMB,
int appsPending, int containersPending, long availableMB,
int activeApplications) throws Exception {
int timeoutSecs = 0;
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
@ -742,7 +742,7 @@ private void verifyClusterMetrics(int activeNodes, int appsSubmitted,
assertTrue(message, isAllMetricAssertionDone);
}
private void assertMetric(String metricName, int expected, int actual) {
private void assertMetric(String metricName, long expected, long actual) {
assertEquals("Incorrect value for metric " + metricName, expected, actual);
}

View File

@ -31,7 +31,6 @@
import java.util.List;
import java.util.Random;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -50,7 +49,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
.AllocationExpirationInfo;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@ -869,13 +867,13 @@ public void testReconnectWithNewPortOnDecommissioningNode() {
public void testResourceUpdateOnRunningNode() {
RMNodeImpl node = getRunningNode();
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.RUNNING, node.getState());
@ -893,13 +891,13 @@ public void testDecommissioningOnRunningNode(){
public void testResourceUpdateOnNewNode() {
RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4));
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.NEW, node.getState());
@ -913,13 +911,13 @@ public void testResourceUpdateOnRebootedNode() {
int initialUnHealthy = cm.getUnhealthyNMs();
int initialDecommissioning = cm.getNumDecommissioningNMs();
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), ResourceOption
.newInstance(Resource.newInstance(2048, 2),
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.REBOOTED, node.getState());
@ -994,16 +992,16 @@ public void testContainerExpire() throws Exception {
public void testResourceUpdateOnDecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource originalCapacity = node.getOriginalTotalCapability();
assertEquals("Memory resource is not match.", originalCapacity.getMemory(), oldCapacity.getMemory());
assertEquals("Memory resource is not match.", originalCapacity.getMemorySize(), oldCapacity.getMemorySize());
assertEquals("CPU resource is not match.", originalCapacity.getVirtualCores(), oldCapacity.getVirtualCores());
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("Memory resource is not match.", newCapacity.getMemorySize(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
@ -1016,7 +1014,7 @@ public void testResourceUpdateOnDecommissioningNode() {
public void testResourceUpdateOnRecommissioningNode() {
RMNodeImpl node = getDecommissioningNode();
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("Memory resource is not match.", oldCapacity.getMemorySize(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeEvent(node.getNodeID(),
RMNodeEventType.RECOMMISSION));

View File

@ -193,7 +193,7 @@ private void addNodeCapacityToPlan(MockRM rm, int memory, int vCores) {
.synchronizePlan(ReservationSystemTestUtil.reservationQ, false);
if (rm.getRMContext().getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
.getMemory() > 0) {
.getMemorySize() > 0) {
break;
}
LOG.info("Waiting for node capacity to be added to plan");

View File

@ -1058,7 +1058,7 @@ public void handle(SchedulerEvent event) {
rm.drainEvents();
RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
Assert.assertEquals(3, rmNode.getHttpPort());
Assert.assertEquals(5120, rmNode.getTotalCapability().getMemory());
Assert.assertEquals(5120, rmNode.getTotalCapability().getMemorySize());
Assert.assertEquals(5120 + 15360, metrics.getAvailableMB());
}

View File

@ -87,7 +87,6 @@
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@ -417,15 +416,15 @@ private void checkCSQueue(MockRM rm,
// ************* check Queue metrics ************
QueueMetrics queueMetrics = queue.getMetrics();
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
availableResources.getVirtualCores(), usedResource.getMemory(),
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemorySize(),
availableResources.getVirtualCores(), usedResource.getMemorySize(),
usedResource.getVirtualCores());
// ************ check user metrics ***********
QueueMetrics userMetrics =
queueMetrics.getUserMetrics(app.getUser());
assertMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
availableResources.getVirtualCores(), usedResource.getMemory(),
assertMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemorySize(),
availableResources.getVirtualCores(), usedResource.getMemorySize(),
usedResource.getVirtualCores());
}
@ -485,8 +484,8 @@ private void checkFSQueue(ResourceManager rm,
// ************ check queue metrics ****************
QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
availableResources.getVirtualCores(), usedResources.getMemory(),
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemorySize(),
availableResources.getVirtualCores(), usedResources.getMemorySize(),
usedResources.getVirtualCores());
}
@ -697,8 +696,8 @@ public void testCapacitySchedulerRecovery() throws Exception {
q1UsedResource, 4);
QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics();
assertMetrics(queue1Metrics, 2, 0, 2, 0, 4,
q1availableResources.getMemory(),
q1availableResources.getVirtualCores(), q1UsedResource.getMemory(),
q1availableResources.getMemorySize(),
q1availableResources.getVirtualCores(), q1UsedResource.getMemorySize(),
q1UsedResource.getVirtualCores());
// assert queue B state.
@ -708,8 +707,8 @@ public void testCapacitySchedulerRecovery() throws Exception {
q2UsedResource, 2);
QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics();
assertMetrics(queue2Metrics, 1, 0, 1, 0, 2,
q2availableResources.getMemory(),
q2availableResources.getVirtualCores(), q2UsedResource.getMemory(),
q2availableResources.getMemorySize(),
q2availableResources.getVirtualCores(), q2UsedResource.getMemorySize(),
q2UsedResource.getVirtualCores());
// assert parent queue state.
@ -718,8 +717,8 @@ public void testCapacitySchedulerRecovery() throws Exception {
checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16,
(float) 6 / 16);
assertMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
totalAvailableResource.getMemory(),
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
totalAvailableResource.getMemorySize(),
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemorySize(),
totalUsedResource.getVirtualCores());
}
@ -1137,8 +1136,8 @@ public Boolean get() {
private void assertMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted,
int allocatedContainers, int availableMB, int availableVirtualCores,
int allocatedMB, int allocatedVirtualCores) {
int allocatedContainers, long availableMB, long availableVirtualCores,
long allocatedMB, long allocatedVirtualCores) {
assertEquals(appsSubmitted, qm.getAppsSubmitted());
assertEquals(appsPending, qm.getAppsPending());
assertEquals(appsRunning, qm.getAppsRunning());

Some files were not shown because too many files have changed in this diff Show More