YARN-3022. Expose Container resource information from NodeManager for monitoring (adhoot via ranter)
This commit is contained in:
parent
80705e034b
commit
f7a77819a1
@ -0,0 +1,92 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.metrics2.impl;
|
||||||
|
|
||||||
|
import com.google.common.base.Predicate;
|
||||||
|
import com.google.common.collect.Iterables;
|
||||||
|
import org.apache.hadoop.metrics2.AbstractMetric;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsTag;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class mainly for tests
|
||||||
|
*/
|
||||||
|
public class MetricsRecords {
|
||||||
|
|
||||||
|
public static void assertTag(MetricsRecord record, String tagName,
|
||||||
|
String expectedValue) {
|
||||||
|
MetricsTag processIdTag = getFirstTagByName(record,
|
||||||
|
tagName);
|
||||||
|
assertNotNull(processIdTag);
|
||||||
|
assertEquals(expectedValue, processIdTag.value());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void assertMetric(MetricsRecord record,
|
||||||
|
String metricName,
|
||||||
|
Number expectedValue) {
|
||||||
|
AbstractMetric resourceLimitMetric = getFirstMetricByName(
|
||||||
|
record, metricName);
|
||||||
|
assertNotNull(resourceLimitMetric);
|
||||||
|
assertEquals(expectedValue, resourceLimitMetric.value());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MetricsTag getFirstTagByName(MetricsRecord record, String name) {
|
||||||
|
return Iterables.getFirst(Iterables.filter(record.tags(),
|
||||||
|
new MetricsTagPredicate(name)), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static AbstractMetric getFirstMetricByName(
|
||||||
|
MetricsRecord record, String name) {
|
||||||
|
return Iterables.getFirst(
|
||||||
|
Iterables.filter(record.metrics(), new AbstractMetricPredicate(name)),
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class MetricsTagPredicate implements Predicate<MetricsTag> {
|
||||||
|
private String tagName;
|
||||||
|
|
||||||
|
public MetricsTagPredicate(String tagName) {
|
||||||
|
|
||||||
|
this.tagName = tagName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean apply(MetricsTag input) {
|
||||||
|
return input.name().equals(tagName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class AbstractMetricPredicate
|
||||||
|
implements Predicate<AbstractMetric> {
|
||||||
|
private String metricName;
|
||||||
|
|
||||||
|
public AbstractMetricPredicate(
|
||||||
|
String metricName) {
|
||||||
|
this.metricName = metricName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean apply(AbstractMetric input) {
|
||||||
|
return input.name().equals(metricName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -228,6 +228,9 @@ Release 2.7.0 - UNRELEASED
|
|||||||
YARN-3085. Application summary should include the application type (Rohith
|
YARN-3085. Application summary should include the application type (Rohith
|
||||||
via jlowe)
|
via jlowe)
|
||||||
|
|
||||||
|
YARN-3022. Expose Container resource information from NodeManager for
|
||||||
|
monitoring (adhoot via ranter)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -494,10 +494,11 @@ private void sendContainerMonitorStartEvent() {
|
|||||||
YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
||||||
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
||||||
long vmemBytes = (long) (pmemRatio * pmemBytes);
|
long vmemBytes = (long) (pmemRatio * pmemBytes);
|
||||||
|
int cpuVcores = getResource().getVirtualCores();
|
||||||
|
|
||||||
dispatcher.getEventHandler().handle(
|
dispatcher.getEventHandler().handle(
|
||||||
new ContainerStartMonitoringEvent(containerId,
|
new ContainerStartMonitoringEvent(containerId,
|
||||||
vmemBytes, pmemBytes));
|
vmemBytes, pmemBytes, cpuVcores));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addDiagnostics(String... diags) {
|
private void addDiagnostics(String... diags) {
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableStat;
|
import org.apache.hadoop.metrics2.lib.MutableStat;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
|
||||||
@ -41,11 +42,28 @@
|
|||||||
@Metrics(context="container")
|
@Metrics(context="container")
|
||||||
public class ContainerMetrics implements MetricsSource {
|
public class ContainerMetrics implements MetricsSource {
|
||||||
|
|
||||||
|
public static final String PMEM_LIMIT_METRIC_NAME = "pMemLimit";
|
||||||
|
public static final String VMEM_LIMIT_METRIC_NAME = "vMemLimit";
|
||||||
|
public static final String VCORE_LIMIT_METRIC_NAME = "vCoreLimit";
|
||||||
|
public static final String PMEM_USAGE_METRIC_NAME = "pMemUsage";
|
||||||
|
|
||||||
@Metric
|
@Metric
|
||||||
public MutableStat pMemMBsStat;
|
public MutableStat pMemMBsStat;
|
||||||
|
|
||||||
|
@Metric
|
||||||
|
public MutableGaugeInt pMemLimitMbs;
|
||||||
|
|
||||||
|
@Metric
|
||||||
|
public MutableGaugeInt vMemLimitMbs;
|
||||||
|
|
||||||
|
@Metric
|
||||||
|
public MutableGaugeInt cpuVcores;
|
||||||
|
|
||||||
static final MetricsInfo RECORD_INFO =
|
static final MetricsInfo RECORD_INFO =
|
||||||
info("ContainerUsage", "Resource usage by container");
|
info("ContainerResource", "Resource limit and usage by container");
|
||||||
|
|
||||||
|
public static final MetricsInfo PROCESSID_INFO =
|
||||||
|
info("ContainerPid", "Container Process Id");
|
||||||
|
|
||||||
final MetricsInfo recordInfo;
|
final MetricsInfo recordInfo;
|
||||||
final MetricsRegistry registry;
|
final MetricsRegistry registry;
|
||||||
@ -76,7 +94,13 @@ public class ContainerMetrics implements MetricsSource {
|
|||||||
scheduleTimerTaskIfRequired();
|
scheduleTimerTaskIfRequired();
|
||||||
|
|
||||||
this.pMemMBsStat = registry.newStat(
|
this.pMemMBsStat = registry.newStat(
|
||||||
"pMem", "Physical memory stats", "Usage", "MBs", true);
|
PMEM_USAGE_METRIC_NAME, "Physical memory stats", "Usage", "MBs", true);
|
||||||
|
this.pMemLimitMbs = registry.newGauge(
|
||||||
|
PMEM_LIMIT_METRIC_NAME, "Physical memory limit in MBs", 0);
|
||||||
|
this.vMemLimitMbs = registry.newGauge(
|
||||||
|
VMEM_LIMIT_METRIC_NAME, "Virtual memory limit in MBs", 0);
|
||||||
|
this.cpuVcores = registry.newGauge(
|
||||||
|
VCORE_LIMIT_METRIC_NAME, "CPU limit in number of vcores", 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
ContainerMetrics tag(MetricsInfo info, ContainerId containerId) {
|
ContainerMetrics tag(MetricsInfo info, ContainerId containerId) {
|
||||||
@ -88,10 +112,6 @@ static String sourceName(ContainerId containerId) {
|
|||||||
return RECORD_INFO.name() + "_" + containerId.toString();
|
return RECORD_INFO.name() + "_" + containerId.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ContainerMetrics forContainer(ContainerId containerId) {
|
|
||||||
return forContainer(containerId, -1L);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ContainerMetrics forContainer(
|
public static ContainerMetrics forContainer(
|
||||||
ContainerId containerId, long flushPeriodMs) {
|
ContainerId containerId, long flushPeriodMs) {
|
||||||
return forContainer(
|
return forContainer(
|
||||||
@ -150,6 +170,16 @@ public void recordMemoryUsage(int memoryMBs) {
|
|||||||
this.pMemMBsStat.add(memoryMBs);
|
this.pMemMBsStat.add(memoryMBs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void recordProcessId(String processId) {
|
||||||
|
registry.tag(PROCESSID_INFO, processId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void recordResourceLimit(int vmemLimit, int pmemLimit, int cpuVcores) {
|
||||||
|
this.vMemLimitMbs.set(vmemLimit);
|
||||||
|
this.pMemLimitMbs.set(pmemLimit);
|
||||||
|
this.cpuVcores.set(cpuVcores);
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void scheduleTimerTaskIfRequired() {
|
private synchronized void scheduleTimerTaskIfRequired() {
|
||||||
if (flushPeriodMs > 0) {
|
if (flushPeriodMs > 0) {
|
||||||
// Lazily initialize timer
|
// Lazily initialize timer
|
||||||
|
@ -24,12 +24,14 @@ public class ContainerStartMonitoringEvent extends ContainersMonitorEvent {
|
|||||||
|
|
||||||
private final long vmemLimit;
|
private final long vmemLimit;
|
||||||
private final long pmemLimit;
|
private final long pmemLimit;
|
||||||
|
private final int cpuVcores;
|
||||||
|
|
||||||
public ContainerStartMonitoringEvent(ContainerId containerId,
|
public ContainerStartMonitoringEvent(ContainerId containerId,
|
||||||
long vmemLimit, long pmemLimit) {
|
long vmemLimit, long pmemLimit, int cpuVcores) {
|
||||||
super(containerId, ContainersMonitorEventType.START_MONITORING_CONTAINER);
|
super(containerId, ContainersMonitorEventType.START_MONITORING_CONTAINER);
|
||||||
this.vmemLimit = vmemLimit;
|
this.vmemLimit = vmemLimit;
|
||||||
this.pmemLimit = pmemLimit;
|
this.pmemLimit = pmemLimit;
|
||||||
|
this.cpuVcores = cpuVcores;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getVmemLimit() {
|
public long getVmemLimit() {
|
||||||
@ -40,4 +42,7 @@ public long getPmemLimit() {
|
|||||||
return this.pmemLimit;
|
return this.pmemLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getCpuVcores() {
|
||||||
|
return this.cpuVcores;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -219,14 +219,17 @@ private static class ProcessTreeInfo {
|
|||||||
private ResourceCalculatorProcessTree pTree;
|
private ResourceCalculatorProcessTree pTree;
|
||||||
private long vmemLimit;
|
private long vmemLimit;
|
||||||
private long pmemLimit;
|
private long pmemLimit;
|
||||||
|
private int cpuVcores;
|
||||||
|
|
||||||
public ProcessTreeInfo(ContainerId containerId, String pid,
|
public ProcessTreeInfo(ContainerId containerId, String pid,
|
||||||
ResourceCalculatorProcessTree pTree, long vmemLimit, long pmemLimit) {
|
ResourceCalculatorProcessTree pTree, long vmemLimit, long pmemLimit,
|
||||||
|
int cpuVcores) {
|
||||||
this.containerId = containerId;
|
this.containerId = containerId;
|
||||||
this.pid = pid;
|
this.pid = pid;
|
||||||
this.pTree = pTree;
|
this.pTree = pTree;
|
||||||
this.vmemLimit = vmemLimit;
|
this.vmemLimit = vmemLimit;
|
||||||
this.pmemLimit = pmemLimit;
|
this.pmemLimit = pmemLimit;
|
||||||
|
this.cpuVcores = cpuVcores;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ContainerId getContainerId() {
|
public ContainerId getContainerId() {
|
||||||
@ -259,6 +262,14 @@ public long getVmemLimit() {
|
|||||||
public long getPmemLimit() {
|
public long getPmemLimit() {
|
||||||
return this.pmemLimit;
|
return this.pmemLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the number of cpu vcores assigned
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public int getCpuVcores() {
|
||||||
|
return this.cpuVcores;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -362,7 +373,8 @@ public void run() {
|
|||||||
synchronized (containersToBeRemoved) {
|
synchronized (containersToBeRemoved) {
|
||||||
for (ContainerId containerId : containersToBeRemoved) {
|
for (ContainerId containerId : containersToBeRemoved) {
|
||||||
if (containerMetricsEnabled) {
|
if (containerMetricsEnabled) {
|
||||||
ContainerMetrics.forContainer(containerId).finished();
|
ContainerMetrics.forContainer(
|
||||||
|
containerId, containerMetricsPeriodMs).finished();
|
||||||
}
|
}
|
||||||
trackingContainers.remove(containerId);
|
trackingContainers.remove(containerId);
|
||||||
LOG.info("Stopping resource-monitoring for " + containerId);
|
LOG.info("Stopping resource-monitoring for " + containerId);
|
||||||
@ -397,6 +409,17 @@ public void run() {
|
|||||||
ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf);
|
ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf);
|
||||||
ptInfo.setPid(pId);
|
ptInfo.setPid(pId);
|
||||||
ptInfo.setProcessTree(pt);
|
ptInfo.setProcessTree(pt);
|
||||||
|
|
||||||
|
if (containerMetricsEnabled) {
|
||||||
|
ContainerMetrics usageMetrics = ContainerMetrics
|
||||||
|
.forContainer(containerId, containerMetricsPeriodMs);
|
||||||
|
int cpuVcores = ptInfo.getCpuVcores();
|
||||||
|
final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20);
|
||||||
|
final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20);
|
||||||
|
usageMetrics.recordResourceLimit(
|
||||||
|
vmemLimit, pmemLimit, cpuVcores);
|
||||||
|
usageMetrics.recordProcessId(pId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// End of initializing any uninitialized processTrees
|
// End of initializing any uninitialized processTrees
|
||||||
@ -576,7 +599,8 @@ public void handle(ContainersMonitorEvent monitoringEvent) {
|
|||||||
synchronized (this.containersToBeAdded) {
|
synchronized (this.containersToBeAdded) {
|
||||||
ProcessTreeInfo processTreeInfo =
|
ProcessTreeInfo processTreeInfo =
|
||||||
new ProcessTreeInfo(containerId, null, null,
|
new ProcessTreeInfo(containerId, null, null,
|
||||||
startEvent.getVmemLimit(), startEvent.getPmemLimit());
|
startEvent.getVmemLimit(), startEvent.getPmemLimit(),
|
||||||
|
startEvent.getCpuVcores());
|
||||||
this.containersToBeAdded.put(containerId, processTreeInfo);
|
this.containersToBeAdded.put(containerId, processTreeInfo);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -18,19 +18,20 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
||||||
|
import org.apache.hadoop.metrics2.impl.MetricsRecords;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import java.util.Timer;
|
|
||||||
|
|
||||||
public class TestContainerMetrics {
|
public class TestContainerMetrics {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -71,4 +72,39 @@ public void testContainerMetricsFlow() throws InterruptedException {
|
|||||||
metrics.getMetrics(collector, true);
|
metrics.getMetrics(collector, true);
|
||||||
assertEquals(ERR, 0, collector.getRecords().size());
|
assertEquals(ERR, 0, collector.getRecords().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerMetricsLimit() throws InterruptedException {
|
||||||
|
final String ERR = "Error in number of records";
|
||||||
|
|
||||||
|
MetricsSystem system = mock(MetricsSystem.class);
|
||||||
|
doReturn(this).when(system).register(anyString(), anyString(), any());
|
||||||
|
|
||||||
|
MetricsCollectorImpl collector = new MetricsCollectorImpl();
|
||||||
|
ContainerId containerId = mock(ContainerId.class);
|
||||||
|
ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100);
|
||||||
|
|
||||||
|
int anyPmemLimit = 1024;
|
||||||
|
int anyVmemLimit = 2048;
|
||||||
|
int anyVcores = 10;
|
||||||
|
String anyProcessId = "1234";
|
||||||
|
|
||||||
|
metrics.recordResourceLimit(anyVmemLimit, anyPmemLimit, anyVcores);
|
||||||
|
metrics.recordProcessId(anyProcessId);
|
||||||
|
|
||||||
|
Thread.sleep(110);
|
||||||
|
metrics.getMetrics(collector, true);
|
||||||
|
assertEquals(ERR, 1, collector.getRecords().size());
|
||||||
|
MetricsRecord record = collector.getRecords().get(0);
|
||||||
|
|
||||||
|
MetricsRecords.assertTag(record, ContainerMetrics.PROCESSID_INFO.name(),
|
||||||
|
anyProcessId);
|
||||||
|
|
||||||
|
MetricsRecords.assertMetric(record, ContainerMetrics
|
||||||
|
.PMEM_LIMIT_METRIC_NAME, anyPmemLimit);
|
||||||
|
MetricsRecords.assertMetric(record, ContainerMetrics.VMEM_LIMIT_METRIC_NAME, anyVmemLimit);
|
||||||
|
MetricsRecords.assertMetric(record, ContainerMetrics.VCORE_LIMIT_METRIC_NAME, anyVcores);
|
||||||
|
|
||||||
|
collector.clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user