YARN-3304. Cleaning up ResourceCalculatorProcessTree APIs for public use and removing inconsistencies in the default values. Contributed by Junping Du and Karthik Kambatla.

This commit is contained in:
Vinod Kumar Vavilapalli 2015-03-30 10:09:40 -07:00
parent e7ea2a8e8f
commit c358368f51
11 changed files with 187 additions and 154 deletions

View File

@ -171,7 +171,7 @@ static synchronized String getOutputName(int partition) {
skipRanges.skipRangeIterator(); skipRanges.skipRangeIterator();
private ResourceCalculatorProcessTree pTree; private ResourceCalculatorProcessTree pTree;
private long initCpuCumulativeTime = 0; private long initCpuCumulativeTime = ResourceCalculatorProcessTree.UNAVAILABLE;
protected JobConf conf; protected JobConf conf;
protected MapOutputFile mapOutputFile; protected MapOutputFile mapOutputFile;
@ -866,13 +866,25 @@ void updateResourceCounters() {
} }
pTree.updateProcessTree(); pTree.updateProcessTree();
long cpuTime = pTree.getCumulativeCpuTime(); long cpuTime = pTree.getCumulativeCpuTime();
long pMem = pTree.getCumulativeRssmem(); long pMem = pTree.getRssMemorySize();
long vMem = pTree.getCumulativeVmem(); long vMem = pTree.getVirtualMemorySize();
// Remove the CPU time consumed previously by JVM reuse // Remove the CPU time consumed previously by JVM reuse
cpuTime -= initCpuCumulativeTime; if (cpuTime != ResourceCalculatorProcessTree.UNAVAILABLE &&
counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime); initCpuCumulativeTime != ResourceCalculatorProcessTree.UNAVAILABLE) {
counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem); cpuTime -= initCpuCumulativeTime;
counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem); }
if (cpuTime != ResourceCalculatorProcessTree.UNAVAILABLE) {
counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
}
if (pMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
}
if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) {
counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
}
} }
/** /**

View File

@ -847,6 +847,10 @@ Release 2.7.0 - UNRELEASED
YARN-2213. Change proxy-user cookie log in AmIpFilter to DEBUG. YARN-2213. Change proxy-user cookie log in AmIpFilter to DEBUG.
(Varun Saxena via xgong) (Varun Saxena via xgong)
YARN-3304. Cleaning up ResourceCalculatorProcessTree APIs for public use and
removing inconsistencies in the default values. (Junping Du and Karthik
Kambatla via vinodkv)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -26,7 +26,8 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class CpuTimeTracker { public class CpuTimeTracker {
public static final int UNAVAILABLE = -1; public static final int UNAVAILABLE =
ResourceCalculatorProcessTree.UNAVAILABLE;
final long MINIMUM_UPDATE_INTERVAL; final long MINIMUM_UPDATE_INTERVAL;
// CPU used time since system is on (ms) // CPU used time since system is on (ms)

View File

@ -140,7 +140,7 @@ public static MemInfo getMemInfoByName(String name) {
static private String deadPid = "-1"; static private String deadPid = "-1";
private String pid = deadPid; private String pid = deadPid;
static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*"); static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*");
private Long cpuTime = 0L; private long cpuTime = UNAVAILABLE;
protected Map<String, ProcessInfo> processTree = protected Map<String, ProcessInfo> processTree =
new HashMap<String, ProcessInfo>(); new HashMap<String, ProcessInfo>();
@ -340,66 +340,53 @@ public String getProcessTreeDump() {
return ret.toString(); return ret.toString();
} }
/**
* Get the cumulative virtual memory used by all the processes in the
* process-tree that are older than the passed in age.
*
* @param olderThanAge processes above this age are included in the
* memory addition
* @return cumulative virtual memory used by the process-tree in bytes,
* for processes older than this age.
*/
@Override @Override
public long getCumulativeVmem(int olderThanAge) { public long getVirtualMemorySize(int olderThanAge) {
long total = 0; long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) { if ((p != null) && (p.getAge() > olderThanAge)) {
if (total == UNAVAILABLE ) {
total = 0;
}
total += p.getVmem(); total += p.getVmem();
} }
} }
return total; return total;
} }
/**
* Get the cumulative resident set size (rss) memory used by all the processes
* in the process-tree that are older than the passed in age.
*
* @param olderThanAge processes above this age are included in the
* memory addition
* @return cumulative rss memory used by the process-tree in bytes,
* for processes older than this age. return 0 if it cannot be
* calculated
*/
@Override @Override
public long getCumulativeRssmem(int olderThanAge) { public long getRssMemorySize(int olderThanAge) {
if (PAGE_SIZE < 0) { if (PAGE_SIZE < 0) {
return 0; return UNAVAILABLE;
} }
if (smapsEnabled) { if (smapsEnabled) {
return getSmapBasedCumulativeRssmem(olderThanAge); return getSmapBasedRssMemorySize(olderThanAge);
} }
boolean isAvailable = false;
long totalPages = 0; long totalPages = 0;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) { if ((p != null) && (p.getAge() > olderThanAge)) {
totalPages += p.getRssmemPage(); totalPages += p.getRssmemPage();
isAvailable = true;
} }
} }
return totalPages * PAGE_SIZE; // convert # pages to byte return isAvailable ? totalPages * PAGE_SIZE : UNAVAILABLE; // convert # pages to byte
} }
/** /**
* Get the cumulative resident set size (RSS) memory used by all the processes * Get the resident set size (RSS) memory used by all the processes
* in the process-tree that are older than the passed in age. RSS is * in the process-tree that are older than the passed in age. RSS is
* calculated based on SMAP information. Skip mappings with "r--s", "r-xs" * calculated based on SMAP information. Skip mappings with "r--s", "r-xs"
* permissions to get real RSS usage of the process. * permissions to get real RSS usage of the process.
* *
* @param olderThanAge * @param olderThanAge
* processes above this age are included in the memory addition * processes above this age are included in the memory addition
* @return cumulative rss memory used by the process-tree in bytes, for * @return rss memory used by the process-tree in bytes, for
* processes older than this age. return 0 if it cannot be calculated * processes older than this age. return {@link #UNAVAILABLE} if it cannot
* be calculated.
*/ */
private long getSmapBasedCumulativeRssmem(int olderThanAge) { private long getSmapBasedRssMemorySize(int olderThanAge) {
long total = 0; long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.getAge() > olderThanAge)) { if ((p != null) && (p.getAge() > olderThanAge)) {
ProcessTreeSmapMemInfo procMemInfo = processSMAPTree.get(p.getPid()); ProcessTreeSmapMemInfo procMemInfo = processSMAPTree.get(p.getPid());
@ -412,6 +399,9 @@ private long getSmapBasedCumulativeRssmem(int olderThanAge) {
.equalsIgnoreCase(READ_EXECUTE_WITH_SHARED_PERMISSION)) { .equalsIgnoreCase(READ_EXECUTE_WITH_SHARED_PERMISSION)) {
continue; continue;
} }
if (total == UNAVAILABLE){
total = 0;
}
total += total +=
Math.min(info.sharedDirty, info.pss) + info.privateDirty Math.min(info.sharedDirty, info.pss) + info.privateDirty
+ info.privateClean; + info.privateClean;
@ -429,30 +419,34 @@ private long getSmapBasedCumulativeRssmem(int olderThanAge) {
} }
} }
} }
total = (total * KB_TO_BYTES); // convert to bytes if (total > 0) {
total *= KB_TO_BYTES; // convert to bytes
}
LOG.info("SmapBasedCumulativeRssmem (bytes) : " + total); LOG.info("SmapBasedCumulativeRssmem (bytes) : " + total);
return total; // size return total; // size
} }
/**
* Get the CPU time in millisecond used by all the processes in the
* process-tree since the process-tree created
*
* @return cumulative CPU time in millisecond since the process-tree created
* return 0 if it cannot be calculated
*/
@Override @Override
public long getCumulativeCpuTime() { public long getCumulativeCpuTime() {
if (JIFFY_LENGTH_IN_MILLIS < 0) { if (JIFFY_LENGTH_IN_MILLIS < 0) {
return 0; return UNAVAILABLE;
} }
long incJiffies = 0; long incJiffies = 0;
boolean isAvailable = false;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if (p != null) { if (p != null) {
incJiffies += p.getDtime(); incJiffies += p.getDtime();
// data is available
isAvailable = true;
} }
} }
cpuTime += incJiffies * JIFFY_LENGTH_IN_MILLIS; if (isAvailable) {
// reset cpuTime to 0 instead of UNAVAILABLE
if (cpuTime == UNAVAILABLE) {
cpuTime = 0L;
}
cpuTime += incJiffies * JIFFY_LENGTH_IN_MILLIS;
}
return cpuTime; return cpuTime;
} }
@ -1031,8 +1025,8 @@ public static void main(String[] args) {
System.out.println("Cpu usage " + procfsBasedProcessTree System.out.println("Cpu usage " + procfsBasedProcessTree
.getCpuUsagePercent()); .getCpuUsagePercent());
System.out.println("Vmem usage in bytes " + procfsBasedProcessTree System.out.println("Vmem usage in bytes " + procfsBasedProcessTree
.getCumulativeVmem()); .getVirtualMemorySize());
System.out.println("Rss mem usage in bytes " + procfsBasedProcessTree System.out.println("Rss mem usage in bytes " + procfsBasedProcessTree
.getCumulativeRssmem()); .getRssMemorySize());
} }
} }

View File

@ -23,19 +23,23 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
/** /**
* Interface class to obtain process resource usage * Interface class to obtain process resource usage
* * NOTE: This class should not be used by external users, but only by external
* developers to extend and include their own process-tree implementation,
* especially for platforms other than Linux and Windows.
*/ */
@Public @Public
@Evolving @Evolving
public abstract class ResourceCalculatorProcessTree extends Configured { public abstract class ResourceCalculatorProcessTree extends Configured {
static final Log LOG = LogFactory static final Log LOG = LogFactory
.getLog(ResourceCalculatorProcessTree.class); .getLog(ResourceCalculatorProcessTree.class);
public static final int UNAVAILABLE = -1;
/** /**
* Create process-tree instance with specified root process. * Create process-tree instance with specified root process.
@ -65,63 +69,64 @@ public ResourceCalculatorProcessTree(String root) {
public abstract String getProcessTreeDump(); public abstract String getProcessTreeDump();
/** /**
* Get the cumulative virtual memory used by all the processes in the * Get the virtual memory used by all the processes in the
* process-tree. * process-tree.
* *
* @return cumulative virtual memory used by the process-tree in bytes. * @return virtual memory used by the process-tree in bytes,
* {@link #UNAVAILABLE} if it cannot be calculated.
*/ */
public long getCumulativeVmem() { public long getVirtualMemorySize() {
return getCumulativeVmem(0); return getVirtualMemorySize(0);
} }
/** /**
* Get the cumulative resident set size (rss) memory used by all the processes * Get the resident set size (rss) memory used by all the processes
* in the process-tree. * in the process-tree.
* *
* @return cumulative rss memory used by the process-tree in bytes. return 0 * @return rss memory used by the process-tree in bytes,
* if it cannot be calculated * {@link #UNAVAILABLE} if it cannot be calculated.
*/ */
public long getCumulativeRssmem() { public long getRssMemorySize() {
return getCumulativeRssmem(0); return getRssMemorySize(0);
} }
/** /**
* Get the cumulative virtual memory used by all the processes in the * Get the virtual memory used by all the processes in the
* process-tree that are older than the passed in age. * process-tree that are older than the passed in age.
* *
* @param olderThanAge processes above this age are included in the * @param olderThanAge processes above this age are included in the
* memory addition * memory addition
* @return cumulative virtual memory used by the process-tree in bytes, * @return virtual memory used by the process-tree in bytes for
* for processes older than this age. return 0 if it cannot be * processes older than the specified age, {@link #UNAVAILABLE} if it
* calculated * cannot be calculated.
*/ */
public long getCumulativeVmem(int olderThanAge) { public long getVirtualMemorySize(int olderThanAge) {
return 0; return UNAVAILABLE;
} }
/** /**
* Get the cumulative resident set size (rss) memory used by all the processes * Get the resident set size (rss) memory used by all the processes
* in the process-tree that are older than the passed in age. * in the process-tree that are older than the passed in age.
* *
* @param olderThanAge processes above this age are included in the * @param olderThanAge processes above this age are included in the
* memory addition * memory addition
* @return cumulative rss memory used by the process-tree in bytes, * @return rss memory used by the process-tree in bytes for
* for processes older than this age. return 0 if it cannot be * processes older than specified age, {@link #UNAVAILABLE} if it cannot be
* calculated * calculated.
*/ */
public long getCumulativeRssmem(int olderThanAge) { public long getRssMemorySize(int olderThanAge) {
return 0; return UNAVAILABLE;
} }
/** /**
* Get the CPU time in millisecond used by all the processes in the * Get the CPU time in millisecond used by all the processes in the
* process-tree since the process-tree was created * process-tree since the process-tree was created
* *
* @return cumulative CPU time in millisecond since the process-tree created * @return cumulative CPU time in millisecond since the process-tree
* return 0 if it cannot be calculated * created, {@link #UNAVAILABLE} if it cannot be calculated.
*/ */
public long getCumulativeCpuTime() { public long getCumulativeCpuTime() {
return 0; return UNAVAILABLE;
} }
/** /**
@ -129,11 +134,11 @@ public long getCumulativeCpuTime() {
* average between samples as a ratio of overall CPU cycles similar to top. * average between samples as a ratio of overall CPU cycles similar to top.
* Thus, if 2 out of 4 cores are used this should return 200.0. * Thus, if 2 out of 4 cores are used this should return 200.0.
* *
* @return percentage CPU usage since the process-tree was created * @return percentage CPU usage since the process-tree was created,
* return {@link CpuTimeTracker#UNAVAILABLE} if it cannot be calculated * {@link #UNAVAILABLE} if it cannot be calculated.
*/ */
public float getCpuUsagePercent() { public float getCpuUsagePercent() {
return -1; return UNAVAILABLE;
} }
/** Verify that the tree process id is same as its process group id. /** Verify that the tree process id is same as its process group id.
@ -153,6 +158,7 @@ public float getCpuUsagePercent() {
* @return ResourceCalculatorProcessTree or null if ResourceCalculatorPluginTree * @return ResourceCalculatorProcessTree or null if ResourceCalculatorPluginTree
* is not available for this system. * is not available for this system.
*/ */
@Private
public static ResourceCalculatorProcessTree getResourceCalculatorProcessTree( public static ResourceCalculatorProcessTree getResourceCalculatorProcessTree(
String pid, Class<? extends ResourceCalculatorProcessTree> clazz, Configuration conf) { String pid, Class<? extends ResourceCalculatorProcessTree> clazz, Configuration conf) {

View File

@ -45,8 +45,8 @@ static class ProcessInfo {
} }
private String taskProcessId = null; private String taskProcessId = null;
private long cpuTimeMs = 0; private long cpuTimeMs = UNAVAILABLE;
private Map<String, ProcessInfo> processTree = private Map<String, ProcessInfo> processTree =
new HashMap<String, ProcessInfo>(); new HashMap<String, ProcessInfo>();
public static boolean isAvailable() { public static boolean isAvailable() {
@ -173,10 +173,13 @@ public String getProcessTreeDump() {
} }
@Override @Override
public long getCumulativeVmem(int olderThanAge) { public long getVirtualMemorySize(int olderThanAge) {
long total = 0; long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.age > olderThanAge)) { if ((p != null) && (p.age > olderThanAge)) {
if (total == UNAVAILABLE) {
total = 0;
}
total += p.vmem; total += p.vmem;
} }
} }
@ -184,10 +187,13 @@ public long getCumulativeVmem(int olderThanAge) {
} }
@Override @Override
public long getCumulativeRssmem(int olderThanAge) { public long getRssMemorySize(int olderThanAge) {
long total = 0; long total = UNAVAILABLE;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if ((p != null) && (p.age > olderThanAge)) { if ((p != null) && (p.age > olderThanAge)) {
if (total == UNAVAILABLE) {
total = 0;
}
total += p.workingSet; total += p.workingSet;
} }
} }
@ -197,6 +203,9 @@ public long getCumulativeRssmem(int olderThanAge) {
@Override @Override
public long getCumulativeCpuTime() { public long getCumulativeCpuTime() {
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if (cpuTimeMs == UNAVAILABLE) {
cpuTimeMs = 0;
}
cpuTimeMs += p.cpuTimeMsDelta; cpuTimeMs += p.cpuTimeMsDelta;
} }
return cpuTimeMs; return cpuTimeMs;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.util; package org.apache.hadoop.yarn.util;
import static org.apache.hadoop.yarn.util.ProcfsBasedProcessTree.KB_TO_BYTES; import static org.apache.hadoop.yarn.util.ProcfsBasedProcessTree.KB_TO_BYTES;
import static org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree.UNAVAILABLE;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
@ -226,8 +227,8 @@ public void testProcessTree() throws Exception {
p.updateProcessTree(); p.updateProcessTree();
Assert.assertFalse("ProcessTree must have been gone", isAlive(pid)); Assert.assertFalse("ProcessTree must have been gone", isAlive(pid));
Assert.assertTrue( Assert.assertTrue(
"Cumulative vmem for the gone-process is " + p.getCumulativeVmem() "vmem for the gone-process is " + p.getVirtualMemorySize()
+ " . It should be zero.", p.getCumulativeVmem() == 0); + " . It should be zero.", p.getVirtualMemorySize() == 0);
Assert.assertTrue(p.toString().equals("[ ]")); Assert.assertTrue(p.toString().equals("[ ]"));
} }
@ -429,16 +430,16 @@ public void testCpuAndMemoryForProcessTree() throws IOException {
// build the process tree. // build the process tree.
processTree.updateProcessTree(); processTree.updateProcessTree();
// verify cumulative memory // verify virtual memory
Assert.assertEquals("Cumulative virtual memory does not match", 600000L, Assert.assertEquals("Virtual memory does not match", 600000L,
processTree.getCumulativeVmem()); processTree.getVirtualMemorySize());
// verify rss memory // verify rss memory
long cumuRssMem = long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0 ProcfsBasedProcessTree.PAGE_SIZE > 0
? 600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; ? 600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
Assert.assertEquals("Cumulative rss memory does not match", cumuRssMem, Assert.assertEquals("rss memory does not match", cumuRssMem,
processTree.getCumulativeRssmem()); processTree.getRssMemorySize());
// verify cumulative cpu time // verify cumulative cpu time
long cumuCpuTime = long cumuCpuTime =
@ -456,8 +457,8 @@ public void testCpuAndMemoryForProcessTree() throws IOException {
setSmapsInProceTree(processTree, true); setSmapsInProceTree(processTree, true);
// RSS=Min(shared_dirty,PSS)+PrivateClean+PrivateDirty (exclude r-xs, // RSS=Min(shared_dirty,PSS)+PrivateClean+PrivateDirty (exclude r-xs,
// r--s) // r--s)
Assert.assertEquals("Cumulative rss memory does not match", Assert.assertEquals("rss memory does not match",
(100 * KB_TO_BYTES * 3), processTree.getCumulativeRssmem()); (100 * KB_TO_BYTES * 3), processTree.getRssMemorySize());
// test the cpu time again to see if it cumulates // test the cpu time again to see if it cumulates
procInfos[0] = procInfos[0] =
@ -563,9 +564,9 @@ private void testMemForOlderProcesses(boolean smapEnabled) throws IOException {
new SystemClock()); new SystemClock());
setSmapsInProceTree(processTree, smapEnabled); setSmapsInProceTree(processTree, smapEnabled);
// verify cumulative memory // verify virtual memory
Assert.assertEquals("Cumulative memory does not match", 700000L, Assert.assertEquals("Cumulative memory does not match", 700000L,
processTree.getCumulativeVmem()); processTree.getVirtualMemorySize());
// write one more process as child of 100. // write one more process as child of 100.
String[] newPids = { "500" }; String[] newPids = { "500" };
setupPidDirs(procfsRootDir, newPids); setupPidDirs(procfsRootDir, newPids);
@ -581,34 +582,34 @@ private void testMemForOlderProcesses(boolean smapEnabled) throws IOException {
// check memory includes the new process. // check memory includes the new process.
processTree.updateProcessTree(); processTree.updateProcessTree();
Assert.assertEquals("Cumulative vmem does not include new process", Assert.assertEquals("vmem does not include new process",
1200000L, processTree.getCumulativeVmem()); 1200000L, processTree.getVirtualMemorySize());
if (!smapEnabled) { if (!smapEnabled) {
long cumuRssMem = long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0 ProcfsBasedProcessTree.PAGE_SIZE > 0
? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; ? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
Assert.assertEquals("Cumulative rssmem does not include new process", Assert.assertEquals("rssmem does not include new process",
cumuRssMem, processTree.getCumulativeRssmem()); cumuRssMem, processTree.getRssMemorySize());
} else { } else {
Assert.assertEquals("Cumulative rssmem does not include new process", Assert.assertEquals("rssmem does not include new process",
100 * KB_TO_BYTES * 4, processTree.getCumulativeRssmem()); 100 * KB_TO_BYTES * 4, processTree.getRssMemorySize());
} }
// however processes older than 1 iteration will retain the older value // however processes older than 1 iteration will retain the older value
Assert.assertEquals( Assert.assertEquals(
"Cumulative vmem shouldn't have included new process", 700000L, "vmem shouldn't have included new process", 700000L,
processTree.getCumulativeVmem(1)); processTree.getVirtualMemorySize(1));
if (!smapEnabled) { if (!smapEnabled) {
long cumuRssMem = long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0 ProcfsBasedProcessTree.PAGE_SIZE > 0
? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; ? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new process", cumuRssMem, "rssmem shouldn't have included new process", cumuRssMem,
processTree.getCumulativeRssmem(1)); processTree.getRssMemorySize(1));
} else { } else {
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new process", "rssmem shouldn't have included new process",
100 * KB_TO_BYTES * 3, processTree.getCumulativeRssmem(1)); 100 * KB_TO_BYTES * 3, processTree.getRssMemorySize(1));
} }
// one more process // one more process
@ -629,49 +630,49 @@ private void testMemForOlderProcesses(boolean smapEnabled) throws IOException {
// processes older than 2 iterations should be same as before. // processes older than 2 iterations should be same as before.
Assert.assertEquals( Assert.assertEquals(
"Cumulative vmem shouldn't have included new processes", 700000L, "vmem shouldn't have included new processes", 700000L,
processTree.getCumulativeVmem(2)); processTree.getVirtualMemorySize(2));
if (!smapEnabled) { if (!smapEnabled) {
long cumuRssMem = long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0 ProcfsBasedProcessTree.PAGE_SIZE > 0
? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; ? 700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new processes", "rssmem shouldn't have included new processes",
cumuRssMem, processTree.getCumulativeRssmem(2)); cumuRssMem, processTree.getRssMemorySize(2));
} else { } else {
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new processes", "rssmem shouldn't have included new processes",
100 * KB_TO_BYTES * 3, processTree.getCumulativeRssmem(2)); 100 * KB_TO_BYTES * 3, processTree.getRssMemorySize(2));
} }
// processes older than 1 iteration should not include new process, // processes older than 1 iteration should not include new process,
// but include process 500 // but include process 500
Assert.assertEquals( Assert.assertEquals(
"Cumulative vmem shouldn't have included new processes", 1200000L, "vmem shouldn't have included new processes", 1200000L,
processTree.getCumulativeVmem(1)); processTree.getVirtualMemorySize(1));
if (!smapEnabled) { if (!smapEnabled) {
long cumuRssMem = long cumuRssMem =
ProcfsBasedProcessTree.PAGE_SIZE > 0 ProcfsBasedProcessTree.PAGE_SIZE > 0
? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; ? 1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new processes", "rssmem shouldn't have included new processes",
cumuRssMem, processTree.getCumulativeRssmem(1)); cumuRssMem, processTree.getRssMemorySize(1));
} else { } else {
Assert.assertEquals( Assert.assertEquals(
"Cumulative rssmem shouldn't have included new processes", "rssmem shouldn't have included new processes",
100 * KB_TO_BYTES * 4, processTree.getCumulativeRssmem(1)); 100 * KB_TO_BYTES * 4, processTree.getRssMemorySize(1));
} }
// no processes older than 3 iterations, this should be 0 // no processes older than 3 iterations
Assert.assertEquals( Assert.assertEquals(
"Getting non-zero vmem for processes older than 3 iterations", 0L, "Getting non-zero vmem for processes older than 3 iterations",
processTree.getCumulativeVmem(3)); UNAVAILABLE, processTree.getVirtualMemorySize(3));
Assert.assertEquals( Assert.assertEquals(
"Getting non-zero rssmem for processes older than 3 iterations", 0L, "Getting non-zero rssmem for processes older than 3 iterations",
processTree.getCumulativeRssmem(3)); UNAVAILABLE, processTree.getRssMemorySize(3));
Assert.assertEquals( Assert.assertEquals(
"Getting non-zero rssmem for processes older than 3 iterations", 0L, "Getting non-zero rssmem for processes older than 3 iterations",
processTree.getCumulativeRssmem(3)); UNAVAILABLE, processTree.getRssMemorySize(3));
} finally { } finally {
FileUtil.fullyDelete(procfsRootDir); FileUtil.fullyDelete(procfsRootDir);
} }

View File

@ -41,11 +41,11 @@ public String getProcessTreeDump() {
return "Empty tree for testing"; return "Empty tree for testing";
} }
public long getCumulativeRssmem(int age) { public long getRssMemorySize(int age) {
return 0; return 0;
} }
public long getCumulativeVmem(int age) { public long getVirtualMemorySize(int age) {
return 0; return 0;
} }

View File

@ -53,26 +53,26 @@ public void tree() {
WindowsBasedProcessTreeTester pTree = new WindowsBasedProcessTreeTester("-1"); WindowsBasedProcessTreeTester pTree = new WindowsBasedProcessTreeTester("-1");
pTree.infoStr = "3524,1024,1024,500\r\n2844,1024,1024,500\r\n"; pTree.infoStr = "3524,1024,1024,500\r\n2844,1024,1024,500\r\n";
pTree.updateProcessTree(); pTree.updateProcessTree();
assertTrue(pTree.getCumulativeVmem() == 2048); assertTrue(pTree.getVirtualMemorySize() == 2048);
assertTrue(pTree.getCumulativeVmem(0) == 2048); assertTrue(pTree.getVirtualMemorySize(0) == 2048);
assertTrue(pTree.getCumulativeRssmem() == 2048); assertTrue(pTree.getRssMemorySize() == 2048);
assertTrue(pTree.getCumulativeRssmem(0) == 2048); assertTrue(pTree.getRssMemorySize(0) == 2048);
assertTrue(pTree.getCumulativeCpuTime() == 1000); assertTrue(pTree.getCumulativeCpuTime() == 1000);
pTree.infoStr = "3524,1024,1024,1000\r\n2844,1024,1024,1000\r\n1234,1024,1024,1000\r\n"; pTree.infoStr = "3524,1024,1024,1000\r\n2844,1024,1024,1000\r\n1234,1024,1024,1000\r\n";
pTree.updateProcessTree(); pTree.updateProcessTree();
assertTrue(pTree.getCumulativeVmem() == 3072); assertTrue(pTree.getVirtualMemorySize() == 3072);
assertTrue(pTree.getCumulativeVmem(1) == 2048); assertTrue(pTree.getVirtualMemorySize(1) == 2048);
assertTrue(pTree.getCumulativeRssmem() == 3072); assertTrue(pTree.getRssMemorySize() == 3072);
assertTrue(pTree.getCumulativeRssmem(1) == 2048); assertTrue(pTree.getRssMemorySize(1) == 2048);
assertTrue(pTree.getCumulativeCpuTime() == 3000); assertTrue(pTree.getCumulativeCpuTime() == 3000);
pTree.infoStr = "3524,1024,1024,1500\r\n2844,1024,1024,1500\r\n"; pTree.infoStr = "3524,1024,1024,1500\r\n2844,1024,1024,1500\r\n";
pTree.updateProcessTree(); pTree.updateProcessTree();
assertTrue(pTree.getCumulativeVmem() == 2048); assertTrue(pTree.getVirtualMemorySize() == 2048);
assertTrue(pTree.getCumulativeVmem(2) == 2048); assertTrue(pTree.getVirtualMemorySize(2) == 2048);
assertTrue(pTree.getCumulativeRssmem() == 2048); assertTrue(pTree.getRssMemorySize() == 2048);
assertTrue(pTree.getCumulativeRssmem(2) == 2048); assertTrue(pTree.getRssMemorySize(2) == 2048);
assertTrue(pTree.getCumulativeCpuTime() == 4000); assertTrue(pTree.getCumulativeCpuTime() == 4000);
} }
} }

View File

@ -188,13 +188,19 @@ public synchronized void finished() {
} }
public void recordMemoryUsage(int memoryMBs) { public void recordMemoryUsage(int memoryMBs) {
this.pMemMBsStat.add(memoryMBs); if (memoryMBs >= 0) {
this.pMemMBsStat.add(memoryMBs);
}
} }
public void recordCpuUsage( public void recordCpuUsage(
int totalPhysicalCpuPercent, int milliVcoresUsed) { int totalPhysicalCpuPercent, int milliVcoresUsed) {
this.cpuCoreUsagePercent.add(totalPhysicalCpuPercent); if (totalPhysicalCpuPercent >=0) {
this.milliVcoresUsed.add(milliVcoresUsed); this.cpuCoreUsagePercent.add(totalPhysicalCpuPercent);
}
if (milliVcoresUsed >= 0) {
this.milliVcoresUsed.add(milliVcoresUsed);
}
} }
public void recordProcessId(String processId) { public void recordProcessId(String processId) {

View File

@ -333,10 +333,10 @@ boolean isProcessTreeOverLimit(String containerId,
// method provided just for easy testing purposes // method provided just for easy testing purposes
boolean isProcessTreeOverLimit(ResourceCalculatorProcessTree pTree, boolean isProcessTreeOverLimit(ResourceCalculatorProcessTree pTree,
String containerId, long limit) { String containerId, long limit) {
long currentMemUsage = pTree.getCumulativeVmem(); long currentMemUsage = pTree.getVirtualMemorySize();
// as processes begin with an age 1, we want to see if there are processes // as processes begin with an age 1, we want to see if there are processes
// more than 1 iteration old. // more than 1 iteration old.
long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1); long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1);
return isProcessTreeOverLimit(containerId, currentMemUsage, return isProcessTreeOverLimit(containerId, currentMemUsage,
curMemUsageOfAgedProcesses, limit); curMemUsageOfAgedProcesses, limit);
} }
@ -437,8 +437,8 @@ public void run() {
+ " ContainerId = " + containerId); + " ContainerId = " + containerId);
ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree(); ResourceCalculatorProcessTree pTree = ptInfo.getProcessTree();
pTree.updateProcessTree(); // update process-tree pTree.updateProcessTree(); // update process-tree
long currentVmemUsage = pTree.getCumulativeVmem(); long currentVmemUsage = pTree.getVirtualMemorySize();
long currentPmemUsage = pTree.getCumulativeRssmem(); long currentPmemUsage = pTree.getRssMemorySize();
// if machine has 6 cores and 3 are used, // if machine has 6 cores and 3 are used,
// cpuUsagePercentPerCore should be 300% and // cpuUsagePercentPerCore should be 300% and
// cpuUsageTotalCoresPercentage should be 50% // cpuUsageTotalCoresPercentage should be 50%
@ -451,8 +451,8 @@ public void run() {
* maxVCoresAllottedForContainers /nodeCpuPercentageForYARN); * maxVCoresAllottedForContainers /nodeCpuPercentageForYARN);
// as processes begin with an age 1, we want to see if there // as processes begin with an age 1, we want to see if there
// are processes more than 1 iteration old. // are processes more than 1 iteration old.
long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1); long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1);
long curRssMemUsageOfAgedProcesses = pTree.getCumulativeRssmem(1); long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1);
long vmemLimit = ptInfo.getVmemLimit(); long vmemLimit = ptInfo.getVmemLimit();
long pmemLimit = ptInfo.getPmemLimit(); long pmemLimit = ptInfo.getPmemLimit();
LOG.info(String.format( LOG.info(String.format(