YARN-5776. Checkstyle: MonitoringThread.Run method length is too long (miklos.szegedi@cloudera.com via rkanter)
This commit is contained in:
parent
dd4ed6a587
commit
9449519a25
@ -48,10 +48,14 @@
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Monitors containers collecting resource usage and preempting the container
|
||||
* if it exceeds its limits.
|
||||
*/
|
||||
public class ContainersMonitorImpl extends AbstractService implements
|
||||
ContainersMonitor {
|
||||
|
||||
final static Log LOG = LogFactory
|
||||
private final static Log LOG = LogFactory
|
||||
.getLog(ContainersMonitorImpl.class);
|
||||
|
||||
private long monitoringInterval;
|
||||
@ -66,7 +70,7 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||
|
||||
private final ContainerExecutor containerExecutor;
|
||||
private final Dispatcher eventDispatcher;
|
||||
protected final Context context;
|
||||
private final Context context;
|
||||
private ResourceCalculatorPlugin resourceCalculatorPlugin;
|
||||
private Configuration conf;
|
||||
private static float vmemRatio;
|
||||
@ -84,15 +88,18 @@ public class ContainersMonitorImpl extends AbstractService implements
|
||||
private static final long UNKNOWN_MEMORY_LIMIT = -1L;
|
||||
private int nodeCpuPercentageForYARN;
|
||||
|
||||
/**
|
||||
* Type of container metric.
|
||||
*/
|
||||
@Private
|
||||
public static enum ContainerMetric {
|
||||
public enum ContainerMetric {
|
||||
CPU, MEMORY
|
||||
}
|
||||
|
||||
private ResourceUtilization containersUtilization;
|
||||
// Tracks the aggregated allocation of the currently allocated containers
|
||||
// when queuing of containers at the NMs is enabled.
|
||||
private ResourceUtilization containersAllocation;
|
||||
private final ResourceUtilization containersAllocation;
|
||||
|
||||
private volatile boolean stopped = false;
|
||||
|
||||
@ -111,44 +118,47 @@ public ContainersMonitorImpl(ContainerExecutor exec,
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
protected void serviceInit(Configuration myConf) throws Exception {
|
||||
this.conf = myConf;
|
||||
this.monitoringInterval =
|
||||
conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,
|
||||
conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,
|
||||
this.conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,
|
||||
this.conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS));
|
||||
|
||||
Class<? extends ResourceCalculatorPlugin> clazz =
|
||||
conf.getClass(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
|
||||
conf.getClass(
|
||||
this.conf.getClass(YarnConfiguration
|
||||
.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
|
||||
this.conf.getClass(
|
||||
YarnConfiguration.NM_MON_RESOURCE_CALCULATOR, null,
|
||||
ResourceCalculatorPlugin.class),
|
||||
ResourceCalculatorPlugin.class);
|
||||
this.resourceCalculatorPlugin =
|
||||
ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, conf);
|
||||
ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, this.conf);
|
||||
LOG.info(" Using ResourceCalculatorPlugin : "
|
||||
+ this.resourceCalculatorPlugin);
|
||||
processTreeClass = conf.getClass(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null,
|
||||
processTreeClass = this.conf.getClass(
|
||||
YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, null,
|
||||
ResourceCalculatorProcessTree.class);
|
||||
this.conf = conf;
|
||||
LOG.info(" Using ResourceCalculatorProcessTree : "
|
||||
+ this.processTreeClass);
|
||||
|
||||
this.containerMetricsEnabled =
|
||||
conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
|
||||
this.conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
|
||||
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE);
|
||||
this.containerMetricsPeriodMs =
|
||||
conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
|
||||
this.conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
|
||||
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
|
||||
this.containerMetricsUnregisterDelayMs = conf.getLong(
|
||||
this.containerMetricsUnregisterDelayMs = this.conf.getLong(
|
||||
YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,
|
||||
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
|
||||
|
||||
long configuredPMemForContainers =
|
||||
NodeManagerHardwareUtils.getContainerMemoryMB(
|
||||
this.resourceCalculatorPlugin, conf) * 1024 * 1024L;
|
||||
this.resourceCalculatorPlugin, this.conf) * 1024 * 1024L;
|
||||
|
||||
long configuredVCoresForContainers =
|
||||
NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin, conf);
|
||||
NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin,
|
||||
this.conf);
|
||||
|
||||
// Setting these irrespective of whether checks are enabled. Required in
|
||||
// the UI.
|
||||
@ -157,16 +167,18 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
this.maxVCoresAllottedForContainers = configuredVCoresForContainers;
|
||||
|
||||
// ///////// Virtual memory configuration //////
|
||||
vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
||||
vmemRatio = this.conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
||||
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
||||
Preconditions.checkArgument(vmemRatio > 0.99f,
|
||||
YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
|
||||
this.maxVmemAllottedForContainers =
|
||||
(long) (vmemRatio * configuredPMemForContainers);
|
||||
|
||||
pmemCheckEnabled = conf.getBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED,
|
||||
pmemCheckEnabled = this.conf.getBoolean(
|
||||
YarnConfiguration.NM_PMEM_CHECK_ENABLED,
|
||||
YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED);
|
||||
vmemCheckEnabled = conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED,
|
||||
vmemCheckEnabled = this.conf.getBoolean(
|
||||
YarnConfiguration.NM_VMEM_CHECK_ENABLED,
|
||||
YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED);
|
||||
LOG.info("Physical memory check enabled: " + pmemCheckEnabled);
|
||||
LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);
|
||||
@ -175,7 +187,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
LOG.info("ContainersMonitor enabled: " + containersMonitorEnabled);
|
||||
|
||||
nodeCpuPercentageForYARN =
|
||||
NodeManagerHardwareUtils.getNodeCpuPercentage(conf);
|
||||
NodeManagerHardwareUtils.getNodeCpuPercentage(this.conf);
|
||||
|
||||
if (pmemCheckEnabled) {
|
||||
// Logging if actual pmem cannot be determined.
|
||||
@ -201,7 +213,7 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
1) + "). Thrashing might happen.");
|
||||
}
|
||||
}
|
||||
super.serviceInit(conf);
|
||||
super.serviceInit(this.conf);
|
||||
}
|
||||
|
||||
private boolean isContainerMonitorEnabled() {
|
||||
@ -241,12 +253,15 @@ protected void serviceStop() throws Exception {
|
||||
try {
|
||||
this.monitoringThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
;
|
||||
LOG.info("ContainersMonitorImpl monitoring thread interrupted");
|
||||
}
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Encapsulates resource requirements of a process and its tree.
|
||||
*/
|
||||
public static class ProcessTreeInfo {
|
||||
private ContainerId containerId;
|
||||
private String pid;
|
||||
@ -278,49 +293,49 @@ public void setPid(String pid) {
|
||||
this.pid = pid;
|
||||
}
|
||||
|
||||
public ResourceCalculatorProcessTree getProcessTree() {
|
||||
ResourceCalculatorProcessTree getProcessTree() {
|
||||
return this.pTree;
|
||||
}
|
||||
|
||||
public void setProcessTree(ResourceCalculatorProcessTree pTree) {
|
||||
this.pTree = pTree;
|
||||
void setProcessTree(ResourceCalculatorProcessTree mypTree) {
|
||||
this.pTree = mypTree;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Virtual memory limit for the process tree in bytes
|
||||
*/
|
||||
public synchronized long getVmemLimit() {
|
||||
synchronized long getVmemLimit() {
|
||||
return this.vmemLimit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Physical memory limit for the process tree in bytes
|
||||
*/
|
||||
public synchronized long getPmemLimit() {
|
||||
synchronized long getPmemLimit() {
|
||||
return this.pmemLimit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Number of cpu vcores assigned
|
||||
*/
|
||||
public synchronized int getCpuVcores() {
|
||||
synchronized int getCpuVcores() {
|
||||
return this.cpuVcores;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set resource limit for enforcement
|
||||
* @param pmemLimit
|
||||
* Set resource limit for enforcement.
|
||||
* @param myPmemLimit
|
||||
* Physical memory limit for the process tree in bytes
|
||||
* @param vmemLimit
|
||||
* @param myVmemLimit
|
||||
* Virtual memory limit for the process tree in bytes
|
||||
* @param cpuVcores
|
||||
* @param myCpuVcores
|
||||
* Number of cpu vcores assigned
|
||||
*/
|
||||
public synchronized void setResourceLimit(
|
||||
long pmemLimit, long vmemLimit, int cpuVcores) {
|
||||
this.pmemLimit = pmemLimit;
|
||||
this.vmemLimit = vmemLimit;
|
||||
this.cpuVcores = cpuVcores;
|
||||
synchronized void setResourceLimit(
|
||||
long myPmemLimit, long myVmemLimit, int myCpuVcores) {
|
||||
this.pmemLimit = myPmemLimit;
|
||||
this.vmemLimit = myVmemLimit;
|
||||
this.cpuVcores = myCpuVcores;
|
||||
}
|
||||
}
|
||||
|
||||
@ -354,7 +369,7 @@ public synchronized void setResourceLimit(
|
||||
* or if processes in the tree, older than this thread's monitoring
|
||||
* interval, exceed the memory limit. False, otherwise.
|
||||
*/
|
||||
boolean isProcessTreeOverLimit(String containerId,
|
||||
private boolean isProcessTreeOverLimit(String containerId,
|
||||
long currentMemUsage,
|
||||
long curMemUsageOfAgedProcesses,
|
||||
long vmemLimit) {
|
||||
@ -388,7 +403,7 @@ boolean isProcessTreeOverLimit(ResourceCalculatorProcessTree pTree,
|
||||
}
|
||||
|
||||
private class MonitoringThread extends Thread {
|
||||
public MonitoringThread() {
|
||||
MonitoringThread() {
|
||||
super("Container Monitor");
|
||||
}
|
||||
|
||||
@ -425,43 +440,8 @@ public void run() {
|
||||
try {
|
||||
String pId = ptInfo.getPID();
|
||||
|
||||
// Initialize any uninitialized processTrees
|
||||
if (pId == null) {
|
||||
// get pid from ContainerId
|
||||
pId = containerExecutor.getProcessId(ptInfo.getContainerId());
|
||||
if (pId != null) {
|
||||
// pId will be null, either if the container is not spawned yet
|
||||
// or if the container's pid is removed from ContainerExecutor
|
||||
LOG.debug("Tracking ProcessTree " + pId
|
||||
+ " for the first time");
|
||||
|
||||
ResourceCalculatorProcessTree pt =
|
||||
ResourceCalculatorProcessTree.
|
||||
getResourceCalculatorProcessTree(
|
||||
pId, processTreeClass, conf);
|
||||
ptInfo.setPid(pId);
|
||||
ptInfo.setProcessTree(pt);
|
||||
|
||||
if (containerMetricsEnabled) {
|
||||
ContainerMetrics usageMetrics = ContainerMetrics
|
||||
.forContainer(containerId, containerMetricsPeriodMs,
|
||||
containerMetricsUnregisterDelayMs);
|
||||
usageMetrics.recordProcessId(pId);
|
||||
}
|
||||
Container container = context.getContainers().get(containerId);
|
||||
String[] ipAndHost = containerExecutor.getIpAndHost(container);
|
||||
if (ipAndHost != null && ipAndHost[0] != null
|
||||
&& ipAndHost[1] != null) {
|
||||
container.setIpAndHost(ipAndHost);
|
||||
LOG.info(containerId + "'s ip = " + ipAndHost[0]
|
||||
+ ", and hostname = " + ipAndHost[1]);
|
||||
} else {
|
||||
LOG.info("Can not get both ip and hostname: " + Arrays
|
||||
.toString(ipAndHost));
|
||||
}
|
||||
}
|
||||
}
|
||||
// End of initializing any uninitialized processTrees
|
||||
// Initialize uninitialized process trees
|
||||
initializeProcessTrees(entry);
|
||||
|
||||
if (pId == null || !isResourceCalculatorAvailable()) {
|
||||
continue; // processTree cannot be tracked
|
||||
@ -487,74 +467,11 @@ public void run() {
|
||||
continue;
|
||||
}
|
||||
|
||||
float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore /
|
||||
resourceCalculatorPlugin.getNumProcessors();
|
||||
recordUsage(containerId, pId, pTree, ptInfo, currentVmemUsage,
|
||||
currentPmemUsage, trackedContainersUtilization);
|
||||
|
||||
// Multiply by 1000 to avoid losing data when converting to int
|
||||
int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000
|
||||
* maxVCoresAllottedForContainers /nodeCpuPercentageForYARN);
|
||||
// as processes begin with an age 1, we want to see if there
|
||||
// are processes more than 1 iteration old.
|
||||
long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1);
|
||||
long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1);
|
||||
long vmemLimit = ptInfo.getVmemLimit();
|
||||
long pmemLimit = ptInfo.getPmemLimit();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format(
|
||||
"Memory usage of ProcessTree %s for container-id %s: ",
|
||||
pId, containerId.toString()) +
|
||||
formatUsageString(
|
||||
currentVmemUsage, vmemLimit,
|
||||
currentPmemUsage, pmemLimit));
|
||||
}
|
||||
|
||||
// Add resource utilization for this container
|
||||
trackedContainersUtilization.addTo(
|
||||
(int) (currentPmemUsage >> 20),
|
||||
(int) (currentVmemUsage >> 20),
|
||||
milliVcoresUsed / 1000.0f);
|
||||
|
||||
// Add usage to container metrics
|
||||
if (containerMetricsEnabled) {
|
||||
ContainerMetrics.forContainer(
|
||||
containerId, containerMetricsPeriodMs,
|
||||
containerMetricsUnregisterDelayMs).recordMemoryUsage(
|
||||
(int) (currentPmemUsage >> 20));
|
||||
ContainerMetrics.forContainer(
|
||||
containerId, containerMetricsPeriodMs,
|
||||
containerMetricsUnregisterDelayMs).recordCpuUsage
|
||||
((int)cpuUsagePercentPerCore, milliVcoresUsed);
|
||||
}
|
||||
|
||||
boolean isMemoryOverLimit = false;
|
||||
String msg = "";
|
||||
int containerExitStatus = ContainerExitStatus.INVALID;
|
||||
if (isVmemCheckEnabled()
|
||||
&& isProcessTreeOverLimit(containerId.toString(),
|
||||
currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
|
||||
// Container (the root process) is still alive and overflowing
|
||||
// memory.
|
||||
// Dump the process-tree and then clean it up.
|
||||
msg = formatErrorMessage("virtual",
|
||||
currentVmemUsage, vmemLimit,
|
||||
currentPmemUsage, pmemLimit,
|
||||
pId, containerId, pTree);
|
||||
isMemoryOverLimit = true;
|
||||
containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM;
|
||||
} else if (isPmemCheckEnabled()
|
||||
&& isProcessTreeOverLimit(containerId.toString(),
|
||||
currentPmemUsage, curRssMemUsageOfAgedProcesses,
|
||||
pmemLimit)) {
|
||||
// Container (the root process) is still alive and overflowing
|
||||
// memory.
|
||||
// Dump the process-tree and then clean it up.
|
||||
msg = formatErrorMessage("physical",
|
||||
currentVmemUsage, vmemLimit,
|
||||
currentPmemUsage, pmemLimit,
|
||||
pId, containerId, pTree);
|
||||
isMemoryOverLimit = true;
|
||||
containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;
|
||||
}
|
||||
checkLimit(containerId, pId, pTree, ptInfo,
|
||||
currentVmemUsage, currentPmemUsage);
|
||||
|
||||
// Accounting the total memory in usage for all containers
|
||||
vmemUsageByAllContainers += currentVmemUsage;
|
||||
@ -563,32 +480,8 @@ && isProcessTreeOverLimit(containerId.toString(),
|
||||
cpuUsagePercentPerCoreByAllContainers += cpuUsagePercentPerCore;
|
||||
cpuUsageTotalCoresByAllContainers += cpuUsagePercentPerCore;
|
||||
|
||||
if (isMemoryOverLimit) {
|
||||
// Virtual or physical memory over limit. Fail the container and
|
||||
// remove
|
||||
// the corresponding process tree
|
||||
LOG.warn(msg);
|
||||
// warn if not a leader
|
||||
if (!pTree.checkPidPgrpidForMatch()) {
|
||||
LOG.error("Killed container process with PID " + pId
|
||||
+ " but it is not a process group leader.");
|
||||
}
|
||||
// kill the container
|
||||
eventDispatcher.getEventHandler().handle(
|
||||
new ContainerKillEvent(containerId,
|
||||
containerExitStatus, msg));
|
||||
trackingContainers.remove(containerId);
|
||||
LOG.info("Removed ProcessTree with root " + pId);
|
||||
}
|
||||
|
||||
ContainerImpl container =
|
||||
(ContainerImpl) context.getContainers().get(containerId);
|
||||
NMTimelinePublisher nmMetricsPublisher =
|
||||
container.getNMTimelinePublisher();
|
||||
if (nmMetricsPublisher != null) {
|
||||
nmMetricsPublisher.reportContainerResourceUsage(container,
|
||||
currentPmemUsage, cpuUsagePercentPerCore);
|
||||
}
|
||||
reportResourceUsage(containerId, currentPmemUsage,
|
||||
cpuUsagePercentPerCore);
|
||||
} catch (Exception e) {
|
||||
// Log the exception and proceed to the next container.
|
||||
LOG.warn("Uncaught exception in ContainersMonitorImpl "
|
||||
@ -617,21 +510,226 @@ && isProcessTreeOverLimit(containerId.toString(),
|
||||
}
|
||||
}
|
||||
|
||||
private String formatErrorMessage(String memTypeExceeded,
|
||||
long currentVmemUsage, long vmemLimit,
|
||||
long currentPmemUsage, long pmemLimit,
|
||||
String pId, ContainerId containerId, ResourceCalculatorProcessTree pTree) {
|
||||
return
|
||||
String.format("Container [pid=%s,containerID=%s] is running beyond %s memory limits. ",
|
||||
pId, containerId, memTypeExceeded) +
|
||||
"Current usage: " +
|
||||
/**
|
||||
* Initialize any uninitialized processTrees.
|
||||
* @param entry process tree entry to fill in
|
||||
*/
|
||||
private void initializeProcessTrees(
|
||||
Entry<ContainerId, ProcessTreeInfo> entry) {
|
||||
ContainerId containerId = entry.getKey();
|
||||
ProcessTreeInfo ptInfo = entry.getValue();
|
||||
String pId = ptInfo.getPID();
|
||||
|
||||
// Initialize any uninitialized processTrees
|
||||
if (pId == null) {
|
||||
// get pid from ContainerId
|
||||
pId = containerExecutor.getProcessId(ptInfo.getContainerId());
|
||||
if (pId != null) {
|
||||
// pId will be null, either if the container is not spawned yet
|
||||
// or if the container's pid is removed from ContainerExecutor
|
||||
LOG.debug("Tracking ProcessTree " + pId
|
||||
+ " for the first time");
|
||||
|
||||
ResourceCalculatorProcessTree pt =
|
||||
ResourceCalculatorProcessTree.
|
||||
getResourceCalculatorProcessTree(
|
||||
pId, processTreeClass, conf);
|
||||
ptInfo.setPid(pId);
|
||||
ptInfo.setProcessTree(pt);
|
||||
|
||||
if (containerMetricsEnabled) {
|
||||
ContainerMetrics usageMetrics = ContainerMetrics
|
||||
.forContainer(containerId, containerMetricsPeriodMs,
|
||||
containerMetricsUnregisterDelayMs);
|
||||
usageMetrics.recordProcessId(pId);
|
||||
}
|
||||
|
||||
Container container = context.getContainers().get(containerId);
|
||||
String[] ipAndHost = containerExecutor.getIpAndHost(container);
|
||||
if (ipAndHost != null && ipAndHost[0] != null
|
||||
&& ipAndHost[1] != null) {
|
||||
container.setIpAndHost(ipAndHost);
|
||||
LOG.info(containerId + "'s ip = " + ipAndHost[0]
|
||||
+ ", and hostname = " + ipAndHost[1]);
|
||||
} else {
|
||||
LOG.info("Can not get both ip and hostname: " + Arrays
|
||||
.toString(ipAndHost));
|
||||
}
|
||||
}
|
||||
}
|
||||
// End of initializing any uninitialized processTrees
|
||||
}
|
||||
|
||||
/**
|
||||
* Record usage metrics.
|
||||
* @param containerId container id
|
||||
* @param pId process id
|
||||
* @param pTree valid process tree entry with CPU measurement
|
||||
* @param ptInfo process tree info with limit information
|
||||
* @param currentVmemUsage virtual memory measurement
|
||||
* @param currentPmemUsage physical memory measurement
|
||||
* @param trackedContainersUtilization utilization tracker to update
|
||||
*/
|
||||
private void recordUsage(ContainerId containerId, String pId,
|
||||
ResourceCalculatorProcessTree pTree,
|
||||
ProcessTreeInfo ptInfo,
|
||||
long currentVmemUsage, long currentPmemUsage,
|
||||
ResourceUtilization trackedContainersUtilization) {
|
||||
float cpuUsagePercentPerCore = pTree.getCpuUsagePercent();
|
||||
float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore /
|
||||
resourceCalculatorPlugin.getNumProcessors();
|
||||
|
||||
// Multiply by 1000 to avoid losing data when converting to int
|
||||
int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000
|
||||
* maxVCoresAllottedForContainers /nodeCpuPercentageForYARN);
|
||||
long vmemLimit = ptInfo.getVmemLimit();
|
||||
long pmemLimit = ptInfo.getPmemLimit();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(String.format(
|
||||
"Memory usage of ProcessTree %s for container-id %s: ",
|
||||
pId, containerId.toString()) +
|
||||
formatUsageString(
|
||||
currentVmemUsage, vmemLimit,
|
||||
currentPmemUsage, pmemLimit));
|
||||
}
|
||||
|
||||
// Add resource utilization for this container
|
||||
trackedContainersUtilization.addTo(
|
||||
(int) (currentPmemUsage >> 20),
|
||||
(int) (currentVmemUsage >> 20),
|
||||
milliVcoresUsed / 1000.0f);
|
||||
|
||||
// Add usage to container metrics
|
||||
if (containerMetricsEnabled) {
|
||||
ContainerMetrics.forContainer(
|
||||
containerId, containerMetricsPeriodMs,
|
||||
containerMetricsUnregisterDelayMs).recordMemoryUsage(
|
||||
(int) (currentPmemUsage >> 20));
|
||||
ContainerMetrics.forContainer(
|
||||
containerId, containerMetricsPeriodMs,
|
||||
containerMetricsUnregisterDelayMs).recordCpuUsage((int)
|
||||
cpuUsagePercentPerCore, milliVcoresUsed);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check resource limits and take actions if the limit is exceeded.
|
||||
* @param containerId container id
|
||||
* @param pId process id
|
||||
* @param pTree valid process tree entry with CPU measurement
|
||||
* @param ptInfo process tree info with limit information
|
||||
* @param currentVmemUsage virtual memory measurement
|
||||
* @param currentPmemUsage physical memory measurement
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void checkLimit(ContainerId containerId, String pId,
|
||||
ResourceCalculatorProcessTree pTree,
|
||||
ProcessTreeInfo ptInfo,
|
||||
long currentVmemUsage,
|
||||
long currentPmemUsage) {
|
||||
boolean isMemoryOverLimit = false;
|
||||
long vmemLimit = ptInfo.getVmemLimit();
|
||||
long pmemLimit = ptInfo.getPmemLimit();
|
||||
// as processes begin with an age 1, we want to see if there
|
||||
// are processes more than 1 iteration old.
|
||||
long curMemUsageOfAgedProcesses = pTree.getVirtualMemorySize(1);
|
||||
long curRssMemUsageOfAgedProcesses = pTree.getRssMemorySize(1);
|
||||
String msg = "";
|
||||
int containerExitStatus = ContainerExitStatus.INVALID;
|
||||
if (isVmemCheckEnabled()
|
||||
&& isProcessTreeOverLimit(containerId.toString(),
|
||||
currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
|
||||
// Container (the root process) is still alive and overflowing
|
||||
// memory.
|
||||
// Dump the process-tree and then clean it up.
|
||||
msg = formatErrorMessage("virtual",
|
||||
formatUsageString(currentVmemUsage, vmemLimit,
|
||||
currentPmemUsage, pmemLimit) +
|
||||
currentPmemUsage, pmemLimit),
|
||||
pId, containerId, pTree);
|
||||
isMemoryOverLimit = true;
|
||||
containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM;
|
||||
} else if (isPmemCheckEnabled()
|
||||
&& isProcessTreeOverLimit(containerId.toString(),
|
||||
currentPmemUsage, curRssMemUsageOfAgedProcesses,
|
||||
pmemLimit)) {
|
||||
// Container (the root process) is still alive and overflowing
|
||||
// memory.
|
||||
// Dump the process-tree and then clean it up.
|
||||
msg = formatErrorMessage("physical",
|
||||
formatUsageString(currentVmemUsage, vmemLimit,
|
||||
currentPmemUsage, pmemLimit),
|
||||
pId, containerId, pTree);
|
||||
isMemoryOverLimit = true;
|
||||
containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;
|
||||
}
|
||||
|
||||
if (isMemoryOverLimit) {
|
||||
// Virtual or physical memory over limit. Fail the container and
|
||||
// remove
|
||||
// the corresponding process tree
|
||||
LOG.warn(msg);
|
||||
// warn if not a leader
|
||||
if (!pTree.checkPidPgrpidForMatch()) {
|
||||
LOG.error("Killed container process with PID " + pId
|
||||
+ " but it is not a process group leader.");
|
||||
}
|
||||
// kill the container
|
||||
eventDispatcher.getEventHandler().handle(
|
||||
new ContainerKillEvent(containerId,
|
||||
containerExitStatus, msg));
|
||||
trackingContainers.remove(containerId);
|
||||
LOG.info("Removed ProcessTree with root " + pId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Report usage metrics to the timeline service.
|
||||
* @param containerId container id
|
||||
* @param currentPmemUsage physical memory measurement
|
||||
* @param cpuUsagePercentPerCore CPU usage
|
||||
*/
|
||||
private void reportResourceUsage(ContainerId containerId,
|
||||
long currentPmemUsage, float cpuUsagePercentPerCore) {
|
||||
ContainerImpl container =
|
||||
(ContainerImpl) context.getContainers().get(containerId);
|
||||
NMTimelinePublisher nmMetricsPublisher =
|
||||
container.getNMTimelinePublisher();
|
||||
if (nmMetricsPublisher != null) {
|
||||
nmMetricsPublisher.reportContainerResourceUsage(container,
|
||||
currentPmemUsage, cpuUsagePercentPerCore);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Format string when memory limit has been exceeded.
|
||||
* @param memTypeExceeded type of memory
|
||||
* @param usageString general memory usage information string
|
||||
* @param pId process id
|
||||
* @param containerId container id
|
||||
* @param pTree process tree to dump full resource utilization graph
|
||||
* @return formatted resource usage information
|
||||
*/
|
||||
private String formatErrorMessage(String memTypeExceeded,
|
||||
String usageString, String pId, ContainerId containerId,
|
||||
ResourceCalculatorProcessTree pTree) {
|
||||
return
|
||||
String.format("Container [pid=%s,containerID=%s] is " +
|
||||
"running beyond %s memory limits. ",
|
||||
pId, containerId, memTypeExceeded) +
|
||||
"Current usage: " + usageString +
|
||||
". Killing container.\n" +
|
||||
"Dump of the process-tree for " + containerId + " :\n" +
|
||||
pTree.getProcessTreeDump();
|
||||
}
|
||||
|
||||
/**
|
||||
* Format memory usage string for reporting.
|
||||
* @param currentVmemUsage virtual memory usage
|
||||
* @param vmemLimit virtual memory limit
|
||||
* @param currentPmemUsage physical memory usage
|
||||
* @param pmemLimit physical memory limit
|
||||
* @return formatted memory information
|
||||
*/
|
||||
private String formatUsageString(long currentVmemUsage, long vmemLimit,
|
||||
long currentPmemUsage, long pmemLimit) {
|
||||
return String.format("%sB of %sB physical memory used; " +
|
||||
@ -746,7 +844,7 @@ public ResourceUtilization getContainersUtilization() {
|
||||
return this.containersUtilization;
|
||||
}
|
||||
|
||||
public void setContainersUtilization(ResourceUtilization utilization) {
|
||||
private void setContainersUtilization(ResourceUtilization utilization) {
|
||||
this.containersUtilization = utilization;
|
||||
}
|
||||
|
||||
@ -858,7 +956,7 @@ public void handle(ContainersMonitorEvent monitoringEvent) {
|
||||
}
|
||||
}
|
||||
|
||||
protected void onChangeMonitoringContainerResource(
|
||||
private void onChangeMonitoringContainerResource(
|
||||
ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
|
||||
ChangeMonitoringContainerResourceEvent changeEvent =
|
||||
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
|
||||
@ -878,14 +976,14 @@ protected void onChangeMonitoringContainerResource(
|
||||
changeContainerResource(containerId, changeEvent.getResource());
|
||||
}
|
||||
|
||||
protected void onStopMonitoringContainer(
|
||||
private void onStopMonitoringContainer(
|
||||
ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
|
||||
LOG.info("Stopping resource-monitoring for " + containerId);
|
||||
updateContainerMetrics(monitoringEvent);
|
||||
trackingContainers.remove(containerId);
|
||||
}
|
||||
|
||||
protected void onStartMonitoringContainer(
|
||||
private void onStartMonitoringContainer(
|
||||
ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
|
||||
ContainerStartMonitoringEvent startEvent =
|
||||
(ContainerStartMonitoringEvent) monitoringEvent;
|
||||
|
Loading…
Reference in New Issue
Block a user