diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index c233e72312..6df7211248 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -22,7 +22,6 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -32,11 +31,10 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.api.records.ResourceUtilization; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -47,6 +45,7 @@ import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; public class ContainersMonitorImpl extends AbstractService implements @@ -576,8 +575,8 @@ && isProcessTreeOverLimit(containerId.toString(), NMTimelinePublisher nmMetricsPublisher = container.getNMTimelinePublisher(); if (nmMetricsPublisher != null) { - nmMetricsPublisher.reportContainerResourceUsage(container, pId, - currentPmemUsage, cpuUsageTotalCoresPercentage); + nmMetricsPublisher.reportContainerResourceUsage(container, + currentPmemUsage, cpuUsagePercentPerCore); } } catch (Exception e) { // Log the exception and proceed to the next container. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index 684feaa287..70b7e8dd66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -113,29 +113,28 @@ protected void handleNMTimelineEvent(NMTimelineEvent event) { } @SuppressWarnings("unchecked") - public void reportContainerResourceUsage(Container container, String pId, - Long pmemUsage, Float cpuUsageTotalCoresPercentage) { + public void reportContainerResourceUsage(Container container, Long pmemUsage, + Float cpuUsagePercentPerCore) { if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || - cpuUsageTotalCoresPercentage != - ResourceCalculatorProcessTree.UNAVAILABLE) { + cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) { ContainerEntity entity = createContainerEntity(container.getContainerId()); long currentTimeMillis = System.currentTimeMillis(); if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) { TimelineMetric memoryMetric = new TimelineMetric(); - memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId); + memoryMetric.setId(ContainerMetric.MEMORY.toString()); memoryMetric.addValue(currentTimeMillis, pmemUsage); entity.addMetric(memoryMetric); } - if (cpuUsageTotalCoresPercentage != - ResourceCalculatorProcessTree.UNAVAILABLE) { + if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) { TimelineMetric cpuMetric = new TimelineMetric(); - cpuMetric.setId(ContainerMetric.CPU.toString() + pId); - cpuMetric.addValue(currentTimeMillis, cpuUsageTotalCoresPercentage); + cpuMetric.setId(ContainerMetric.CPU.toString()); + cpuMetric.addValue(currentTimeMillis, + Math.round(cpuUsagePercentPerCore)); entity.addMetric(cpuMetric); } - dispatcher.getEventHandler().handle( - new TimelinePublishEvent(entity, container.getContainerId() + dispatcher.getEventHandler() + .handle(new TimelinePublishEvent(entity, container.getContainerId() .getApplicationAttemptId().getApplicationId())); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java new file mode 100644 index 0000000000..830ed6b504 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -0,0 +1,157 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.timelineservice; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; +import org.junit.Assert; +import org.junit.Test; + +public class TestNMTimelinePublisher { + private static final String MEMORY_ID = "MEMORY"; + private static final String CPU_ID = "CPU"; + + @Test + public void testContainerResourceUsage() { + Context context = mock(Context.class); + @SuppressWarnings("unchecked") + ConcurrentMap map = mock(ConcurrentMap.class); + Application aApp = mock(Application.class); + when(map.get(any(ApplicationId.class))).thenReturn(aApp); + DummyTimelineClient timelineClient = new DummyTimelineClient(); + when(aApp.getTimelineClient()).thenReturn(timelineClient); + when(context.getApplications()).thenReturn(map); + when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); + when(context.getHttpPort()).thenReturn(0); + NMTimelinePublisher publisher = new NMTimelinePublisher(context); + publisher.init(new Configuration()); + publisher.start(); + Container aContainer = mock(Container.class); + when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1), + 0L)); + publisher.reportContainerResourceUsage(aContainer, 1024L, 8F); + verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8); + timelineClient.reset(); + + publisher.reportContainerResourceUsage(aContainer, 1024L, 0.8F); + verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 1); + timelineClient.reset(); + + publisher.reportContainerResourceUsage(aContainer, 1024L, 0.49F); + verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 0); + timelineClient.reset(); + + publisher.reportContainerResourceUsage(aContainer, 1024L, + (float) ResourceCalculatorProcessTree.UNAVAILABLE); + verifyPublishedResourceUsageMetrics(timelineClient, 1024L, + ResourceCalculatorProcessTree.UNAVAILABLE); + publisher.stop(); + } + + private void verifyPublishedResourceUsageMetrics( + DummyTimelineClient timelineClient, long memoryUsage, int cpuUsage) { + TimelineEntity[] entities = null; + for (int i = 0; i < 10; i++) { + entities = timelineClient.getLastPublishedEntities(); + if (entities != null) { + break; + } + try { + Thread.sleep(150L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + int numberOfResourceMetrics = 0; + numberOfResourceMetrics += + (memoryUsage == ResourceCalculatorProcessTree.UNAVAILABLE) ? 0 : 1; + numberOfResourceMetrics += + (cpuUsage == ResourceCalculatorProcessTree.UNAVAILABLE) ? 0 : 1; + assertNotNull("entities are expected to be published", entities); + assertEquals("Expected number of metrics notpublished", + numberOfResourceMetrics, entities[0].getMetrics().size()); + Iterator metrics = entities[0].getMetrics().iterator(); + while (metrics.hasNext()) { + TimelineMetric metric = metrics.next(); + Iterator> entrySet; + switch (metric.getId()) { + case CPU_ID: + if (cpuUsage == ResourceCalculatorProcessTree.UNAVAILABLE) { + Assert.fail("Not Expecting CPU Metric to be published"); + } + entrySet = metric.getValues().entrySet().iterator(); + assertEquals("CPU usage metric not matching", cpuUsage, + entrySet.next().getValue()); + break; + case MEMORY_ID: + if (memoryUsage == ResourceCalculatorProcessTree.UNAVAILABLE) { + Assert.fail("Not Expecting Memory Metric to be published"); + } + entrySet = metric.getValues().entrySet().iterator(); + assertEquals("Memory usage metric not matching", memoryUsage, + entrySet.next().getValue()); + break; + default: + Assert.fail("Invalid Resource Usage metric"); + break; + } + } + } + + protected static class DummyTimelineClient extends TimelineClientImpl { + private TimelineEntity[] lastPublishedEntities; + + @Override + public void putEntities(TimelineEntity... entities) + throws IOException, YarnException { + this.lastPublishedEntities = entities; + } + + public TimelineEntity[] getLastPublishedEntities() { + return lastPublishedEntities; + } + + public void reset() { + lastPublishedEntities = null; + } + } +}