YARN-3619. ContainerMetrics unregisters during getMetrics and leads to ConcurrentModificationException. Contributed by Zhihai Xu

This commit is contained in:
Jason Lowe 2015-10-02 20:09:13 +00:00
parent 2ecb5f6d71
commit fdf02d1f26
7 changed files with 107 additions and 16 deletions

View File

@ -395,7 +395,8 @@ public synchronized void publishMetricsNow() {
* Sample all the sources for a snapshot of metrics/tags * Sample all the sources for a snapshot of metrics/tags
* @return the metrics buffer containing the snapshot * @return the metrics buffer containing the snapshot
*/ */
synchronized MetricsBuffer sampleMetrics() { @VisibleForTesting
public synchronized MetricsBuffer sampleMetrics() {
collector.clear(); collector.clear();
MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder(); MetricsBufferBuilder bufferBuilder = new MetricsBufferBuilder();

View File

@ -1008,6 +1008,9 @@ Release 2.7.2 - UNRELEASED
YARN-3727. For better error recovery, check if the directory exists before YARN-3727. For better error recovery, check if the directory exists before
using it for localization. (Zhihai Xu via jlowe) using it for localization. (Zhihai Xu via jlowe)
YARN-3619. ContainerMetrics unregisters during getMetrics and leads to
ConcurrentModificationException (Zhihai Xu via jlowe)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1008,7 +1008,15 @@ private static void addDeprecatedKeys() {
NM_PREFIX + "container-metrics.period-ms"; NM_PREFIX + "container-metrics.period-ms";
@Private @Private
public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1; public static final int DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS = -1;
/** The delay time ms to unregister container metrics after completion. */
@Private
public static final String NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS =
NM_PREFIX + "container-metrics.unregister-delay-ms";
@Private
public static final int DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS =
10000;
/** Prefix for all node manager disk health checker configs. */ /** Prefix for all node manager disk health checker configs. */
private static final String NM_DISK_HEALTH_CHECK_PREFIX = private static final String NM_DISK_HEALTH_CHECK_PREFIX =
"yarn.nodemanager.disk-health-checker."; "yarn.nodemanager.disk-health-checker.";

View File

@ -1571,6 +1571,14 @@
<value>-1</value> <value>-1</value>
</property> </property>
<property>
<description>
The delay time ms to unregister container metrics after completion.
</description>
<name>yarn.nodemanager.container-metrics.unregister-delay-ms</name>
<value>10000</value>
</property>
<property> <property>
<description> <description>
Class used to calculate current container resource utilization. Class used to calculate current container resource utilization.

View File

@ -100,6 +100,7 @@ public class ContainerMetrics implements MetricsSource {
private boolean flushOnPeriod = false; // true if period elapsed private boolean flushOnPeriod = false; // true if period elapsed
private boolean finished = false; // true if container finished private boolean finished = false; // true if container finished
private boolean unregister = false; // unregister private boolean unregister = false; // unregister
private long unregisterDelayMs;
private Timer timer; // lazily initialized private Timer timer; // lazily initialized
/** /**
@ -107,15 +108,21 @@ public class ContainerMetrics implements MetricsSource {
*/ */
protected final static Map<ContainerId, ContainerMetrics> protected final static Map<ContainerId, ContainerMetrics>
usageMetrics = new HashMap<>(); usageMetrics = new HashMap<>();
// Create a timer to unregister container metrics,
// whose associated thread run as a daemon.
private final static Timer unregisterContainerMetricsTimer =
new Timer("Container metrics unregistration", true);
ContainerMetrics( ContainerMetrics(
MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
long delayMs) {
this.recordInfo = this.recordInfo =
info(sourceName(containerId), RECORD_INFO.description()); info(sourceName(containerId), RECORD_INFO.description());
this.registry = new MetricsRegistry(recordInfo); this.registry = new MetricsRegistry(recordInfo);
this.metricsSystem = ms; this.metricsSystem = ms;
this.containerId = containerId; this.containerId = containerId;
this.flushPeriodMs = flushPeriodMs; this.flushPeriodMs = flushPeriodMs;
this.unregisterDelayMs = delayMs < 0 ? 0 : delayMs;
scheduleTimerTaskIfRequired(); scheduleTimerTaskIfRequired();
this.pMemMBsStat = registry.newStat( this.pMemMBsStat = registry.newStat(
@ -148,17 +155,18 @@ static String sourceName(ContainerId containerId) {
} }
public static ContainerMetrics forContainer( public static ContainerMetrics forContainer(
ContainerId containerId, long flushPeriodMs) { ContainerId containerId, long flushPeriodMs, long delayMs) {
return forContainer( return forContainer(
DefaultMetricsSystem.instance(), containerId, flushPeriodMs); DefaultMetricsSystem.instance(), containerId, flushPeriodMs, delayMs);
} }
synchronized static ContainerMetrics forContainer( synchronized static ContainerMetrics forContainer(
MetricsSystem ms, ContainerId containerId, long flushPeriodMs) { MetricsSystem ms, ContainerId containerId, long flushPeriodMs,
long delayMs) {
ContainerMetrics metrics = usageMetrics.get(containerId); ContainerMetrics metrics = usageMetrics.get(containerId);
if (metrics == null) { if (metrics == null) {
metrics = new ContainerMetrics( metrics = new ContainerMetrics(ms, containerId, flushPeriodMs,
ms, containerId, flushPeriodMs).tag(RECORD_INFO, containerId); delayMs).tag(RECORD_INFO, containerId);
// Register with the MetricsSystems // Register with the MetricsSystems
if (ms != null) { if (ms != null) {
@ -172,12 +180,15 @@ synchronized static ContainerMetrics forContainer(
return metrics; return metrics;
} }
synchronized static void unregisterContainerMetrics(ContainerMetrics cm) {
cm.metricsSystem.unregisterSource(cm.recordInfo.name());
usageMetrics.remove(cm.containerId);
}
@Override @Override
public synchronized void getMetrics(MetricsCollector collector, boolean all) { public synchronized void getMetrics(MetricsCollector collector, boolean all) {
//Container goes through registered -> finished -> unregistered. //Container goes through registered -> finished -> unregistered.
if (unregister) { if (unregister) {
metricsSystem.unregisterSource(recordInfo.name());
usageMetrics.remove(containerId);
return; return;
} }
@ -199,6 +210,7 @@ public synchronized void finished() {
timer.cancel(); timer.cancel();
timer = null; timer = null;
} }
scheduleTimerTaskForUnregistration();
} }
public void recordMemoryUsage(int memoryMBs) { public void recordMemoryUsage(int memoryMBs) {
@ -252,4 +264,14 @@ public void run() {
timer.schedule(timerTask, flushPeriodMs); timer.schedule(timerTask, flushPeriodMs);
} }
} }
private void scheduleTimerTaskForUnregistration() {
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
ContainerMetrics.unregisterContainerMetrics(ContainerMetrics.this);
}
};
unregisterContainerMetricsTimer.schedule(timerTask, unregisterDelayMs);
}
} }

View File

@ -55,6 +55,7 @@ public class ContainersMonitorImpl extends AbstractService implements
private MonitoringThread monitoringThread; private MonitoringThread monitoringThread;
private boolean containerMetricsEnabled; private boolean containerMetricsEnabled;
private long containerMetricsPeriodMs; private long containerMetricsPeriodMs;
private long containerMetricsUnregisterDelayMs;
@VisibleForTesting @VisibleForTesting
final Map<ContainerId, ProcessTreeInfo> trackingContainers = final Map<ContainerId, ProcessTreeInfo> trackingContainers =
@ -126,6 +127,9 @@ protected void serviceInit(Configuration conf) throws Exception {
this.containerMetricsPeriodMs = this.containerMetricsPeriodMs =
conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS, conf.getLong(YarnConfiguration.NM_CONTAINER_METRICS_PERIOD_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS); YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_PERIOD_MS);
this.containerMetricsUnregisterDelayMs = conf.getLong(
YarnConfiguration.NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
long configuredPMemForContainers = long configuredPMemForContainers =
NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L; NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L;
@ -425,7 +429,8 @@ public void run() {
if (containerMetricsEnabled) { if (containerMetricsEnabled) {
ContainerMetrics usageMetrics = ContainerMetrics ContainerMetrics usageMetrics = ContainerMetrics
.forContainer(containerId, containerMetricsPeriodMs); .forContainer(containerId, containerMetricsPeriodMs,
containerMetricsUnregisterDelayMs);
usageMetrics.recordProcessId(pId); usageMetrics.recordProcessId(pId);
} }
} }
@ -476,10 +481,12 @@ public void run() {
// Add usage to container metrics // Add usage to container metrics
if (containerMetricsEnabled) { if (containerMetricsEnabled) {
ContainerMetrics.forContainer( ContainerMetrics.forContainer(
containerId, containerMetricsPeriodMs).recordMemoryUsage( containerId, containerMetricsPeriodMs,
containerMetricsUnregisterDelayMs).recordMemoryUsage(
(int) (currentPmemUsage >> 20)); (int) (currentPmemUsage >> 20));
ContainerMetrics.forContainer( ContainerMetrics.forContainer(
containerId, containerMetricsPeriodMs).recordCpuUsage containerId, containerMetricsPeriodMs,
containerMetricsUnregisterDelayMs).recordCpuUsage
((int)cpuUsagePercentPerCore, milliVcoresUsed); ((int)cpuUsagePercentPerCore, milliVcoresUsed);
} }
@ -609,7 +616,8 @@ private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
ContainerId containerId = monitoringEvent.getContainerId(); ContainerId containerId = monitoringEvent.getContainerId();
ContainerMetrics usageMetrics = ContainerMetrics ContainerMetrics usageMetrics = ContainerMetrics
.forContainer(containerId, containerMetricsPeriodMs); .forContainer(containerId, containerMetricsPeriodMs,
containerMetricsUnregisterDelayMs);
int vmemLimitMBs; int vmemLimitMBs;
int pmemLimitMBs; int pmemLimitMBs;

View File

@ -22,11 +22,15 @@
import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.metrics2.impl.MetricsRecords; import org.apache.hadoop.metrics2.impl.MetricsRecords;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@ -44,7 +48,8 @@ public void testContainerMetricsFlow() throws InterruptedException {
MetricsCollectorImpl collector = new MetricsCollectorImpl(); MetricsCollectorImpl collector = new MetricsCollectorImpl();
ContainerId containerId = mock(ContainerId.class); ContainerId containerId = mock(ContainerId.class);
ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); ContainerMetrics metrics = ContainerMetrics.forContainer(containerId,
100, 1);
metrics.recordMemoryUsage(1024); metrics.recordMemoryUsage(1024);
metrics.getMetrics(collector, true); metrics.getMetrics(collector, true);
@ -82,7 +87,8 @@ public void testContainerMetricsLimit() throws InterruptedException {
MetricsCollectorImpl collector = new MetricsCollectorImpl(); MetricsCollectorImpl collector = new MetricsCollectorImpl();
ContainerId containerId = mock(ContainerId.class); ContainerId containerId = mock(ContainerId.class);
ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); ContainerMetrics metrics = ContainerMetrics.forContainer(containerId,
100, 1);
int anyPmemLimit = 1024; int anyPmemLimit = 1024;
int anyVmemLimit = 2048; int anyVmemLimit = 2048;
@ -117,4 +123,39 @@ public void testContainerMetricsLimit() throws InterruptedException {
collector.clear(); collector.clear();
} }
@Test
public void testContainerMetricsFinished() throws InterruptedException {
MetricsSystemImpl system = new MetricsSystemImpl();
system.init("test");
MetricsCollectorImpl collector = new MetricsCollectorImpl();
ApplicationId appId = ApplicationId.newInstance(1234, 3);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 4);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
ContainerMetrics metrics1 = ContainerMetrics.forContainer(system,
containerId1, 1, 0);
ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2);
ContainerMetrics metrics2 = ContainerMetrics.forContainer(system,
containerId2, 1, 0);
ContainerId containerId3 = ContainerId.newContainerId(appAttemptId, 3);
ContainerMetrics metrics3 = ContainerMetrics.forContainer(system,
containerId3, 1, 0);
metrics1.finished();
metrics2.finished();
system.sampleMetrics();
system.sampleMetrics();
Thread.sleep(100);
system.stop();
// verify metrics1 is unregistered
assertTrue(metrics1 != ContainerMetrics.forContainer(
system, containerId1, 1, 0));
// verify metrics2 is unregistered
assertTrue(metrics2 != ContainerMetrics.forContainer(
system, containerId2, 1, 0));
// verify metrics3 is still registered
assertTrue(metrics3 == ContainerMetrics.forContainer(
system, containerId3, 1, 0));
system.shutdown();
}
} }