diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java new file mode 100644 index 0000000000..3c0999ed5d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.impl; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsTag; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Utility class mainly for tests + */ +public class MetricsRecords { + + public static void assertTag(MetricsRecord record, String tagName, + String expectedValue) { + MetricsTag processIdTag = getFirstTagByName(record, + tagName); + assertNotNull(processIdTag); + assertEquals(expectedValue, processIdTag.value()); + } + + public static void assertMetric(MetricsRecord record, + String metricName, + Number expectedValue) { + AbstractMetric resourceLimitMetric = getFirstMetricByName( + record, metricName); + assertNotNull(resourceLimitMetric); + assertEquals(expectedValue, resourceLimitMetric.value()); + } + + private static MetricsTag getFirstTagByName(MetricsRecord record, String name) { + return Iterables.getFirst(Iterables.filter(record.tags(), + new MetricsTagPredicate(name)), null); + } + + private static AbstractMetric getFirstMetricByName( + MetricsRecord record, String name) { + return Iterables.getFirst( + Iterables.filter(record.metrics(), new AbstractMetricPredicate(name)), + null); + } + + private static class MetricsTagPredicate implements Predicate { + private String tagName; + + public MetricsTagPredicate(String tagName) { + + this.tagName = tagName; + } + + @Override + public boolean apply(MetricsTag input) { + return input.name().equals(tagName); + } + } + + private static class AbstractMetricPredicate + implements Predicate { + private String metricName; + + public AbstractMetricPredicate( + String metricName) { + this.metricName = metricName; + } + + @Override + public boolean apply(AbstractMetric input) { + return input.name().equals(metricName); + } + } +} diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3ece974ea5..de3572e6ef 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -228,6 +228,9 @@ Release 2.7.0 - UNRELEASED YARN-3085. Application summary should include the application type (Rohith via jlowe) + YARN-3022. Expose Container resource information from NodeManager for + monitoring (adhoot via ranter) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 3f6616a85a..57b0bdac2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -494,10 +494,11 @@ private void sendContainerMonitorStartEvent() { YarnConfiguration.NM_VMEM_PMEM_RATIO, YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); long vmemBytes = (long) (pmemRatio * pmemBytes); + int cpuVcores = getResource().getVirtualCores(); dispatcher.getEventHandler().handle( new ContainerStartMonitoringEvent(containerId, - vmemBytes, pmemBytes)); + vmemBytes, pmemBytes, cpuVcores)); } private void addDiagnostics(String... diags) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java index 12201dd3ac..7850688a6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java @@ -27,6 +27,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableStat; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -41,11 +42,28 @@ @Metrics(context="container") public class ContainerMetrics implements MetricsSource { + public static final String PMEM_LIMIT_METRIC_NAME = "pMemLimit"; + public static final String VMEM_LIMIT_METRIC_NAME = "vMemLimit"; + public static final String VCORE_LIMIT_METRIC_NAME = "vCoreLimit"; + public static final String PMEM_USAGE_METRIC_NAME = "pMemUsage"; + @Metric public MutableStat pMemMBsStat; + @Metric + public MutableGaugeInt pMemLimitMbs; + + @Metric + public MutableGaugeInt vMemLimitMbs; + + @Metric + public MutableGaugeInt cpuVcores; + static final MetricsInfo RECORD_INFO = - info("ContainerUsage", "Resource usage by container"); + info("ContainerResource", "Resource limit and usage by container"); + + public static final MetricsInfo PROCESSID_INFO = + info("ContainerPid", "Container Process Id"); final MetricsInfo recordInfo; final MetricsRegistry registry; @@ -76,7 +94,13 @@ public class ContainerMetrics implements MetricsSource { scheduleTimerTaskIfRequired(); this.pMemMBsStat = registry.newStat( - "pMem", "Physical memory stats", "Usage", "MBs", true); + PMEM_USAGE_METRIC_NAME, "Physical memory stats", "Usage", "MBs", true); + this.pMemLimitMbs = registry.newGauge( + PMEM_LIMIT_METRIC_NAME, "Physical memory limit in MBs", 0); + this.vMemLimitMbs = registry.newGauge( + VMEM_LIMIT_METRIC_NAME, "Virtual memory limit in MBs", 0); + this.cpuVcores = registry.newGauge( + VCORE_LIMIT_METRIC_NAME, "CPU limit in number of vcores", 0); } ContainerMetrics tag(MetricsInfo info, ContainerId containerId) { @@ -88,10 +112,6 @@ static String sourceName(ContainerId containerId) { return RECORD_INFO.name() + "_" + containerId.toString(); } - public static ContainerMetrics forContainer(ContainerId containerId) { - return forContainer(containerId, -1L); - } - public static ContainerMetrics forContainer( ContainerId containerId, long flushPeriodMs) { return forContainer( @@ -150,6 +170,16 @@ public void recordMemoryUsage(int memoryMBs) { this.pMemMBsStat.add(memoryMBs); } + public void recordProcessId(String processId) { + registry.tag(PROCESSID_INFO, processId); + } + + public void recordResourceLimit(int vmemLimit, int pmemLimit, int cpuVcores) { + this.vMemLimitMbs.set(vmemLimit); + this.pMemLimitMbs.set(pmemLimit); + this.cpuVcores.set(cpuVcores); + } + private synchronized void scheduleTimerTaskIfRequired() { if (flushPeriodMs > 0) { // Lazily initialize timer diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java index 407ada56b5..56e2d8eed5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java @@ -24,12 +24,14 @@ public class ContainerStartMonitoringEvent extends ContainersMonitorEvent { private final long vmemLimit; private final long pmemLimit; + private final int cpuVcores; public ContainerStartMonitoringEvent(ContainerId containerId, - long vmemLimit, long pmemLimit) { + long vmemLimit, long pmemLimit, int cpuVcores) { super(containerId, ContainersMonitorEventType.START_MONITORING_CONTAINER); this.vmemLimit = vmemLimit; this.pmemLimit = pmemLimit; + this.cpuVcores = cpuVcores; } public long getVmemLimit() { @@ -40,4 +42,7 @@ public long getPmemLimit() { return this.pmemLimit; } + public int getCpuVcores() { + return this.cpuVcores; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 0ae432515b..2cecda6cf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -219,14 +219,17 @@ private static class ProcessTreeInfo { private ResourceCalculatorProcessTree pTree; private long vmemLimit; private long pmemLimit; + private int cpuVcores; public ProcessTreeInfo(ContainerId containerId, String pid, - ResourceCalculatorProcessTree pTree, long vmemLimit, long pmemLimit) { + ResourceCalculatorProcessTree pTree, long vmemLimit, long pmemLimit, + int cpuVcores) { this.containerId = containerId; this.pid = pid; this.pTree = pTree; this.vmemLimit = vmemLimit; this.pmemLimit = pmemLimit; + this.cpuVcores = cpuVcores; } public ContainerId getContainerId() { @@ -259,6 +262,14 @@ public long getVmemLimit() { public long getPmemLimit() { return this.pmemLimit; } + + /** + * Return the number of cpu vcores assigned + * @return + */ + public int getCpuVcores() { + return this.cpuVcores; + } } @@ -362,7 +373,8 @@ public void run() { synchronized (containersToBeRemoved) { for (ContainerId containerId : containersToBeRemoved) { if (containerMetricsEnabled) { - ContainerMetrics.forContainer(containerId).finished(); + ContainerMetrics.forContainer( + containerId, containerMetricsPeriodMs).finished(); } trackingContainers.remove(containerId); LOG.info("Stopping resource-monitoring for " + containerId); @@ -397,6 +409,17 @@ public void run() { ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf); ptInfo.setPid(pId); ptInfo.setProcessTree(pt); + + if (containerMetricsEnabled) { + ContainerMetrics usageMetrics = ContainerMetrics + .forContainer(containerId, containerMetricsPeriodMs); + int cpuVcores = ptInfo.getCpuVcores(); + final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20); + final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20); + usageMetrics.recordResourceLimit( + vmemLimit, pmemLimit, cpuVcores); + usageMetrics.recordProcessId(pId); + } } } // End of initializing any uninitialized processTrees @@ -576,7 +599,8 @@ public void handle(ContainersMonitorEvent monitoringEvent) { synchronized (this.containersToBeAdded) { ProcessTreeInfo processTreeInfo = new ProcessTreeInfo(containerId, null, null, - startEvent.getVmemLimit(), startEvent.getPmemLimit()); + startEvent.getVmemLimit(), startEvent.getPmemLimit(), + startEvent.getCpuVcores()); this.containersToBeAdded.put(containerId, processTreeInfo); } break; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java index 158e2a88ac..c628648608 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java @@ -18,19 +18,20 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; +import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.metrics2.impl.MetricsRecords; import org.apache.hadoop.yarn.api.records.ContainerId; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import java.util.Timer; - public class TestContainerMetrics { @Test @@ -71,4 +72,39 @@ public void testContainerMetricsFlow() throws InterruptedException { metrics.getMetrics(collector, true); assertEquals(ERR, 0, collector.getRecords().size()); } + + @Test + public void testContainerMetricsLimit() throws InterruptedException { + final String ERR = "Error in number of records"; + + MetricsSystem system = mock(MetricsSystem.class); + doReturn(this).when(system).register(anyString(), anyString(), any()); + + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + ContainerId containerId = mock(ContainerId.class); + ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); + + int anyPmemLimit = 1024; + int anyVmemLimit = 2048; + int anyVcores = 10; + String anyProcessId = "1234"; + + metrics.recordResourceLimit(anyVmemLimit, anyPmemLimit, anyVcores); + metrics.recordProcessId(anyProcessId); + + Thread.sleep(110); + metrics.getMetrics(collector, true); + assertEquals(ERR, 1, collector.getRecords().size()); + MetricsRecord record = collector.getRecords().get(0); + + MetricsRecords.assertTag(record, ContainerMetrics.PROCESSID_INFO.name(), + anyProcessId); + + MetricsRecords.assertMetric(record, ContainerMetrics + .PMEM_LIMIT_METRIC_NAME, anyPmemLimit); + MetricsRecords.assertMetric(record, ContainerMetrics.VMEM_LIMIT_METRIC_NAME, anyVmemLimit); + MetricsRecords.assertMetric(record, ContainerMetrics.VCORE_LIMIT_METRIC_NAME, anyVcores); + + collector.clear(); + } }