diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java index 7d6fc920df..8b2fb85ca1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java @@ -34,6 +34,15 @@ public class ContainerMetricsConstants { public static final String CREATED_IN_RM_EVENT_TYPE = "YARN_RM_CONTAINER_CREATED"; + // Event of this type will be emitted by NM. + public static final String PAUSED_EVENT_TYPE = "YARN_CONTAINER_PAUSED"; + + // Event of this type will be emitted by NM. + public static final String RESUMED_EVENT_TYPE = "YARN_CONTAINER_RESUMED"; + + // Event of this type will be emitted by NM. + public static final String KILLED_EVENT_TYPE = "YARN_CONTAINER_KILLED"; + // Event of this type will be emitted by NM. public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java index b2d9376939..ba574952f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java @@ -25,6 +25,9 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -252,6 +255,95 @@ private void publishContainerCreatedEvent(ContainerEvent event) { } } + @SuppressWarnings("unchecked") + private void publishContainerResumedEvent( + ContainerEvent event) { + if (publishNMContainerEvents) { + ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event; + ContainerId containerId = resumeEvent.getContainerID(); + ContainerEntity entity = createContainerEntity(containerId); + + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO, + resumeEvent.getDiagnostic()); + entity.setInfo(entityInfo); + + Container container = context.getContainers().get(containerId); + if (container != null) { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.RESUMED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + long containerStartTime = container.getContainerStartTime(); + entity.addEvent(tEvent); + entity + .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); + dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, + containerId.getApplicationAttemptId().getApplicationId())); + } + } + } + + @SuppressWarnings("unchecked") + private void publishContainerPausedEvent( + ContainerEvent event) { + if (publishNMContainerEvents) { + ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event; + ContainerId containerId = pauseEvent.getContainerID(); + ContainerEntity entity = createContainerEntity(containerId); + + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO, + pauseEvent.getDiagnostic()); + entity.setInfo(entityInfo); + + Container container = context.getContainers().get(containerId); + if (container != null) { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.PAUSED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + long containerStartTime = container.getContainerStartTime(); + entity.addEvent(tEvent); + entity + .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); + dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, + containerId.getApplicationAttemptId().getApplicationId())); + } + } + } + + @SuppressWarnings("unchecked") + private void publishContainerKilledEvent( + ContainerEvent event) { + if (publishNMContainerEvents) { + ContainerKillEvent killEvent = (ContainerKillEvent) event; + ContainerId containerId = killEvent.getContainerID(); + ContainerEntity entity = createContainerEntity(containerId); + + Map entityInfo = new HashMap(); + entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO, + killEvent.getDiagnostic()); + entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO, + killEvent.getContainerExitStatus()); + entity.setInfo(entityInfo); + + Container container = context.getContainers().get(containerId); + if (container != null) { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(ContainerMetricsConstants.KILLED_EVENT_TYPE); + tEvent.setTimestamp(event.getTimestamp()); + + long containerStartTime = container.getContainerStartTime(); + entity.addEvent(tEvent); + entity + .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); + dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity, + containerId.getApplicationAttemptId().getApplicationId())); + } + } + } + @SuppressWarnings("unchecked") private void publishContainerFinishedEvent(ContainerStatus containerStatus, long containerFinishTime, long containerStartTime) { @@ -384,7 +476,15 @@ public void publishContainerEvent(ContainerEvent event) { case INIT_CONTAINER: publishContainerCreatedEvent(event); break; - + case KILL_CONTAINER: + publishContainerKilledEvent(event); + break; + case PAUSE_CONTAINER: + publishContainerPausedEvent(event); + break; + case RESUME_CONTAINER: + publishContainerResumedEvent(event); + break; default: LOG.debug("{} is not a desired ContainerEvent which needs to be " + " published by NMTimelinePublisher", event.getType()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java index ae51f8598f..abd27ff14a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java @@ -25,7 +25,11 @@ import java.io.IOException; import java.util.Iterator; +import java.util.Map; import java.util.Map.Entry; +import java.util.NavigableSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -35,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -45,6 +50,10 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.junit.Assert; @@ -94,6 +103,19 @@ public void createTimelineClient(ApplicationId appId) { private Context createMockContext() { Context context = mock(Context.class); when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0)); + + ConcurrentMap containers = + new ConcurrentHashMap<>(); + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); + Container container = mock(Container.class); + when(container.getContainerStartTime()) + .thenReturn(System.currentTimeMillis()); + containers.putIfAbsent(cId, container); + when(context.getContainers()).thenReturn(containers); + return context; } @@ -145,6 +167,121 @@ private Context createMockContext() { cId.getContainerId()), entity.getIdPrefix()); } + @Test + public void testPublishContainerPausedEvent() { + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); + + ContainerEvent containerEvent = + new ContainerPauseEvent(cId, "test pause"); + + publisher.createTimelineClient(appId); + publisher.publishContainerEvent(containerEvent); + publisher.stopTimelineClient(appId); + dispatcher.await(); + + ContainerEntity cEntity = new ContainerEntity(); + cEntity.setId(cId.toString()); + TimelineEntity[] lastPublishedEntities = + timelineClient.getLastPublishedEntities(); + + Assert.assertNotNull(lastPublishedEntities); + Assert.assertEquals(1, lastPublishedEntities.length); + TimelineEntity entity = lastPublishedEntities[0]; + Assert.assertEquals(cEntity, entity); + + NavigableSet events = entity.getEvents(); + Assert.assertEquals(1, events.size()); + Assert.assertEquals(ContainerMetricsConstants.PAUSED_EVENT_TYPE, + events.iterator().next().getId()); + + Map info = entity.getInfo(); + Assert.assertTrue( + info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + Assert.assertEquals("test pause", + info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + } + + @Test + public void testPublishContainerResumedEvent() { + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); + + ContainerEvent containerEvent = + new ContainerResumeEvent(cId, "test resume"); + + publisher.createTimelineClient(appId); + publisher.publishContainerEvent(containerEvent); + publisher.stopTimelineClient(appId); + dispatcher.await(); + + ContainerEntity cEntity = new ContainerEntity(); + cEntity.setId(cId.toString()); + TimelineEntity[] lastPublishedEntities = + timelineClient.getLastPublishedEntities(); + + Assert.assertNotNull(lastPublishedEntities); + Assert.assertEquals(1, lastPublishedEntities.length); + TimelineEntity entity = lastPublishedEntities[0]; + Assert.assertEquals(cEntity, entity); + + NavigableSet events = entity.getEvents(); + Assert.assertEquals(1, events.size()); + Assert.assertEquals(ContainerMetricsConstants.RESUMED_EVENT_TYPE, + events.iterator().next().getId()); + + Map info = entity.getInfo(); + Assert.assertTrue( + info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + Assert.assertEquals("test resume", + info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + } + + @Test + public void testPublishContainerKilledEvent() { + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cId = ContainerId.newContainerId(appAttemptId, 1); + + ContainerEvent containerEvent = + new ContainerKillEvent(cId, 1, "test kill"); + + publisher.createTimelineClient(appId); + publisher.publishContainerEvent(containerEvent); + publisher.stopTimelineClient(appId); + dispatcher.await(); + + ContainerEntity cEntity = new ContainerEntity(); + cEntity.setId(cId.toString()); + TimelineEntity[] lastPublishedEntities = + timelineClient.getLastPublishedEntities(); + + Assert.assertNotNull(lastPublishedEntities); + Assert.assertEquals(1, lastPublishedEntities.length); + TimelineEntity entity = lastPublishedEntities[0]; + Assert.assertEquals(cEntity, entity); + + NavigableSet events = entity.getEvents(); + Assert.assertEquals(1, events.size()); + Assert.assertEquals(ContainerMetricsConstants.KILLED_EVENT_TYPE, + events.iterator().next().getId()); + + Map info = entity.getInfo(); + Assert.assertTrue( + info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + Assert.assertEquals("test kill", + info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO)); + Assert.assertTrue( + info.containsKey(ContainerMetricsConstants.EXIT_STATUS_INFO)); + Assert.assertEquals(1, + info.get(ContainerMetricsConstants.EXIT_STATUS_INFO)); + } + @Test public void testContainerResourceUsage() { ApplicationId appId = ApplicationId.newInstance(0, 1); publisher.createTimelineClient(appId);