YARN-2984. Metrics for container's actual memory usage. (kasha)

This commit is contained in:
Karthik Kambatla 2015-01-17 05:44:04 +05:30
parent 60cbcff2f7
commit 84198564ba
6 changed files with 285 additions and 2 deletions

View File

@ -69,7 +69,8 @@ public Iterator<MetricsRecordBuilderImpl> iterator() {
return rbs.iterator(); return rbs.iterator();
} }
void clear() { rbs.clear(); } @InterfaceAudience.Private
public void clear() { rbs.clear(); }
MetricsCollectorImpl setRecordFilter(MetricsFilter rf) { MetricsCollectorImpl setRecordFilter(MetricsFilter rf) {
recordFilter = rf; recordFilter = rf;

View File

@ -195,6 +195,8 @@ Release 2.7.0 - UNRELEASED
YARN-2807. Option "--forceactive" not works as described in usage of YARN-2807. Option "--forceactive" not works as described in usage of
"yarn rmadmin -transitionToActive". (Masatake Iwasaki via xgong) "yarn rmadmin -transitionToActive". (Masatake Iwasaki via xgong)
YARN-2984. Metrics for container's actual memory usage. (kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -822,6 +822,20 @@ private static void addDeprecatedKeys() {
"container-monitor.procfs-tree.smaps-based-rss.enabled"; "container-monitor.procfs-tree.smaps-based-rss.enabled";
public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED = public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED =
false; false;
/** Enable/disable container metrics. */
@Private
public static final String NM_CONTAINER_METRICS_ENABLE =
NM_PREFIX + "container-metrics.enable";
@Private
public static final boolean DEFAULT_NM_CONTAINER_METRICS_ENABLE = true;
/** Container metrics flush period. -1 for flush on completion. */
@Private
public static final String NM_CONTAINER_METRICS_PERIOD_MS =
NM_PREFIX + "container-metrics.period-ms";
@Private
public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1;
/** Prefix for all node manager disk health checker configs. */ /** Prefix for all node manager disk health checker configs. */
private static final String NM_DISK_HEALTH_CHECK_PREFIX = private static final String NM_DISK_HEALTH_CHECK_PREFIX =

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableStat;
import org.apache.hadoop.yarn.api.records.ContainerId;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import static org.apache.hadoop.metrics2.lib.Interns.info;
@InterfaceAudience.Private
@Metrics(context="container")
public class ContainerMetrics implements MetricsSource {
@Metric
public MutableStat pMemMBsStat;
static final MetricsInfo RECORD_INFO =
info("ContainerUsage", "Resource usage by container");
final MetricsInfo recordInfo;
final MetricsRegistry registry;
final ContainerId containerId;
final MetricsSystem metricsSystem;
// Metrics publishing status
private long flushPeriodMs;
private boolean flushOnPeriod = false; // true if period elapsed
private boolean finished = false; // true if container finished
private boolean unregister = false; // unregister
private Timer timer; // lazily initialized
/**
* Simple metrics cache to help prevent re-registrations.
*/
protected final static Map<ContainerId, ContainerMetrics>
usageMetrics = new HashMap<>();
ContainerMetrics(
MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
this.recordInfo =
info(sourceName(containerId), RECORD_INFO.description());
this.registry = new MetricsRegistry(recordInfo);
this.metricsSystem = ms;
this.containerId = containerId;
this.flushPeriodMs = flushPeriodMs;
scheduleTimerTaskIfRequired();
this.pMemMBsStat = registry.newStat(
"pMem", "Physical memory stats", "Usage", "MBs", true);
}
ContainerMetrics tag(MetricsInfo info, ContainerId containerId) {
registry.tag(info, containerId.toString());
return this;
}
static String sourceName(ContainerId containerId) {
return RECORD_INFO.name() + "_" + containerId.toString();
}
public static ContainerMetrics forContainer(ContainerId containerId) {
return forContainer(containerId, -1L);
}
public static ContainerMetrics forContainer(
ContainerId containerId, long flushPeriodMs) {
return forContainer(
DefaultMetricsSystem.instance(), containerId, flushPeriodMs);
}
synchronized static ContainerMetrics forContainer(
MetricsSystem ms, ContainerId containerId, long flushPeriodMs) {
ContainerMetrics metrics = usageMetrics.get(containerId);
if (metrics == null) {
metrics = new ContainerMetrics(
ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId);
// Register with the MetricsSystems
if (ms != null) {
metrics =
ms.register(sourceName(containerId),
"Metrics for container: " + containerId, metrics);
}
usageMetrics.put(containerId, metrics);
}
return metrics;
}
@Override
public synchronized void getMetrics(MetricsCollector collector, boolean all) {
//Container goes through registered -> finished -> unregistered.
if (unregister) {
metricsSystem.unregisterSource(recordInfo.name());
usageMetrics.remove(containerId);
return;
}
if (finished || flushOnPeriod) {
registry.snapshot(collector.addRecord(registry.info()), all);
}
if (finished) {
this.unregister = true;
} else if (flushOnPeriod) {
flushOnPeriod = false;
scheduleTimerTaskIfRequired();
}
}
public synchronized void finished() {
this.finished = true;
if (timer != null) {
timer.cancel();
timer = null;
}
}
public void recordMemoryUsage(int memoryMBs) {
this.pMemMBsStat.add(memoryMBs);
}
private synchronized void scheduleTimerTaskIfRequired() {
if (flushPeriodMs > 0) {
// Lazily initialize timer
if (timer == null) {
this.timer = new Timer("Metrics flush checker", true);
}
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
synchronized (ContainerMetrics.this) {
if (!finished) {
flushOnPeriod = true;
}
}
}
};
timer.schedule(timerTask, flushPeriodMs);
}
}
}

View File

@ -51,6 +51,8 @@ public class ContainersMonitorImpl extends AbstractService implements
private long monitoringInterval; private long monitoringInterval;
private MonitoringThread monitoringThread; private MonitoringThread monitoringThread;
private boolean containerMetricsEnabled;
private long containerMetricsPeriodMs;
final List<ContainerId> containersToBeRemoved; final List<ContainerId> containersToBeRemoved;
final Map<ContainerId, ProcessTreeInfo> containersToBeAdded; final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
@ -106,6 +108,13 @@ protected void serviceInit(Configuration conf) throws Exception {
LOG.info(" Using ResourceCalculatorProcessTree : " LOG.info(" Using ResourceCalculatorProcessTree : "
+ this.processTreeClass); + this.processTreeClass);
this.containerMetricsEnabled =
conf.getBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_ENABLE);
this.containerMetricsPeriodMs =
conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
long configuredPMemForContainers = conf.getLong( long configuredPMemForContainers = conf.getLong(
YarnConfiguration.NM_PMEM_MB, YarnConfiguration.NM_PMEM_MB,
YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l; YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;
@ -352,6 +361,9 @@ public void run() {
// Remove finished containers // Remove finished containers
synchronized (containersToBeRemoved) { synchronized (containersToBeRemoved) {
for (ContainerId containerId : containersToBeRemoved) { for (ContainerId containerId : containersToBeRemoved) {
if (containerMetricsEnabled) {
ContainerMetrics.forContainer(containerId).finished();
}
trackingContainers.remove(containerId); trackingContainers.remove(containerId);
LOG.info("Stopping resource-monitoring for " + containerId); LOG.info("Stopping resource-monitoring for " + containerId);
} }
@ -408,7 +420,15 @@ public void run() {
LOG.info(String.format( LOG.info(String.format(
"Memory usage of ProcessTree %s for container-id %s: ", "Memory usage of ProcessTree %s for container-id %s: ",
pId, containerId.toString()) + pId, containerId.toString()) +
formatUsageString(currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit)); formatUsageString(
currentVmemUsage, vmemLimit, currentPmemUsage, pmemLimit));
// Add usage to container metrics
if (containerMetricsEnabled) {
ContainerMetrics.forContainer(
containerId, containerMetricsPeriodMs).recordMemoryUsage(
(int) (currentPmemUsage >> 20));
}
boolean isMemoryOverLimit = false; boolean isMemoryOverLimit = false;
String msg = ""; String msg = "";

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.util.Timer;
public class TestContainerMetrics {
@Test
public void testContainerMetricsFlow() throws InterruptedException {
final String ERR = "Error in number of records";
// Create a dummy MetricsSystem
MetricsSystem system = mock(MetricsSystem.class);
doReturn(this).when(system).register(anyString(), anyString(), any());
MetricsCollectorImpl collector = new MetricsCollectorImpl();
ContainerId containerId = mock(ContainerId.class);
ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
metrics.recordMemoryUsage(1024);
metrics.getMetrics(collector, true);
assertEquals(ERR, 0, collector.getRecords().size());
Thread.sleep(110);
metrics.getMetrics(collector, true);
assertEquals(ERR, 1, collector.getRecords().size());
collector.clear();
Thread.sleep(110);
metrics.getMetrics(collector, true);
assertEquals(ERR, 1, collector.getRecords().size());
collector.clear();
metrics.finished();
metrics.getMetrics(collector, true);
assertEquals(ERR, 1, collector.getRecords().size());
collector.clear();
metrics.getMetrics(collector, true);
assertEquals(ERR, 0, collector.getRecords().size());
Thread.sleep(110);
metrics.getMetrics(collector, true);
assertEquals(ERR, 0, collector.getRecords().size());
}
}