YARN-9382 Publish container killed, paused and resumed events to ATSv2. Contributed by Abhishesk Modi.

This commit is contained in:
Vrushali C 2019-04-05 12:02:43 -07:00
parent 5750bb94ed
commit 27039a29ae
3 changed files with 247 additions and 1 deletions

View File

@ -34,6 +34,15 @@ public class ContainerMetricsConstants {
public static final String CREATED_IN_RM_EVENT_TYPE =
"YARN_RM_CONTAINER_CREATED";
// Event of this type will be emitted by NM.
public static final String PAUSED_EVENT_TYPE = "YARN_CONTAINER_PAUSED";
// Event of this type will be emitted by NM.
public static final String RESUMED_EVENT_TYPE = "YARN_CONTAINER_RESUMED";
// Event of this type will be emitted by NM.
public static final String KILLED_EVENT_TYPE = "YARN_CONTAINER_KILLED";
// Event of this type will be emitted by NM.
public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED";

View File

@ -25,6 +25,9 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -252,6 +255,95 @@ private void publishContainerCreatedEvent(ContainerEvent event) {
}
}
@SuppressWarnings("unchecked")
private void publishContainerResumedEvent(
ContainerEvent event) {
if (publishNMContainerEvents) {
ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
ContainerId containerId = resumeEvent.getContainerID();
ContainerEntity entity = createContainerEntity(containerId);
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
resumeEvent.getDiagnostic());
entity.setInfo(entityInfo);
Container container = context.getContainers().get(containerId);
if (container != null) {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.RESUMED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
long containerStartTime = container.getContainerStartTime();
entity.addEvent(tEvent);
entity
.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
}
}
}
@SuppressWarnings("unchecked")
private void publishContainerPausedEvent(
ContainerEvent event) {
if (publishNMContainerEvents) {
ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
ContainerId containerId = pauseEvent.getContainerID();
ContainerEntity entity = createContainerEntity(containerId);
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
pauseEvent.getDiagnostic());
entity.setInfo(entityInfo);
Container container = context.getContainers().get(containerId);
if (container != null) {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.PAUSED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
long containerStartTime = container.getContainerStartTime();
entity.addEvent(tEvent);
entity
.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
}
}
}
@SuppressWarnings("unchecked")
private void publishContainerKilledEvent(
ContainerEvent event) {
if (publishNMContainerEvents) {
ContainerKillEvent killEvent = (ContainerKillEvent) event;
ContainerId containerId = killEvent.getContainerID();
ContainerEntity entity = createContainerEntity(containerId);
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
killEvent.getDiagnostic());
entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
killEvent.getContainerExitStatus());
entity.setInfo(entityInfo);
Container container = context.getContainers().get(containerId);
if (container != null) {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.KILLED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
long containerStartTime = container.getContainerStartTime();
entity.addEvent(tEvent);
entity
.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
}
}
}
@SuppressWarnings("unchecked")
private void publishContainerFinishedEvent(ContainerStatus containerStatus,
long containerFinishTime, long containerStartTime) {
@ -384,7 +476,15 @@ public void publishContainerEvent(ContainerEvent event) {
case INIT_CONTAINER:
publishContainerCreatedEvent(event);
break;
case KILL_CONTAINER:
publishContainerKilledEvent(event);
break;
case PAUSE_CONTAINER:
publishContainerPausedEvent(event);
break;
case RESUME_CONTAINER:
publishContainerResumedEvent(event);
break;
default:
LOG.debug("{} is not a desired ContainerEvent which needs to be "
+ " published by NMTimelinePublisher", event.getType());

View File

@ -25,7 +25,11 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -35,6 +39,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -45,6 +50,10 @@
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerPauseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResumeEvent;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.junit.Assert;
@ -94,6 +103,19 @@ public void createTimelineClient(ApplicationId appId) {
private Context createMockContext() {
Context context = mock(Context.class);
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
ConcurrentMap<ContainerId, Container> containers =
new ConcurrentHashMap<>();
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
Container container = mock(Container.class);
when(container.getContainerStartTime())
.thenReturn(System.currentTimeMillis());
containers.putIfAbsent(cId, container);
when(context.getContainers()).thenReturn(containers);
return context;
}
@ -145,6 +167,121 @@ private Context createMockContext() {
cId.getContainerId()), entity.getIdPrefix());
}
@Test
public void testPublishContainerPausedEvent() {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
ContainerEvent containerEvent =
new ContainerPauseEvent(cId, "test pause");
publisher.createTimelineClient(appId);
publisher.publishContainerEvent(containerEvent);
publisher.stopTimelineClient(appId);
dispatcher.await();
ContainerEntity cEntity = new ContainerEntity();
cEntity.setId(cId.toString());
TimelineEntity[] lastPublishedEntities =
timelineClient.getLastPublishedEntities();
Assert.assertNotNull(lastPublishedEntities);
Assert.assertEquals(1, lastPublishedEntities.length);
TimelineEntity entity = lastPublishedEntities[0];
Assert.assertEquals(cEntity, entity);
NavigableSet<TimelineEvent> events = entity.getEvents();
Assert.assertEquals(1, events.size());
Assert.assertEquals(ContainerMetricsConstants.PAUSED_EVENT_TYPE,
events.iterator().next().getId());
Map<String, Object> info = entity.getInfo();
Assert.assertTrue(
info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
Assert.assertEquals("test pause",
info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
}
@Test
public void testPublishContainerResumedEvent() {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
ContainerEvent containerEvent =
new ContainerResumeEvent(cId, "test resume");
publisher.createTimelineClient(appId);
publisher.publishContainerEvent(containerEvent);
publisher.stopTimelineClient(appId);
dispatcher.await();
ContainerEntity cEntity = new ContainerEntity();
cEntity.setId(cId.toString());
TimelineEntity[] lastPublishedEntities =
timelineClient.getLastPublishedEntities();
Assert.assertNotNull(lastPublishedEntities);
Assert.assertEquals(1, lastPublishedEntities.length);
TimelineEntity entity = lastPublishedEntities[0];
Assert.assertEquals(cEntity, entity);
NavigableSet<TimelineEvent> events = entity.getEvents();
Assert.assertEquals(1, events.size());
Assert.assertEquals(ContainerMetricsConstants.RESUMED_EVENT_TYPE,
events.iterator().next().getId());
Map<String, Object> info = entity.getInfo();
Assert.assertTrue(
info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
Assert.assertEquals("test resume",
info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
}
@Test
public void testPublishContainerKilledEvent() {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
ContainerEvent containerEvent =
new ContainerKillEvent(cId, 1, "test kill");
publisher.createTimelineClient(appId);
publisher.publishContainerEvent(containerEvent);
publisher.stopTimelineClient(appId);
dispatcher.await();
ContainerEntity cEntity = new ContainerEntity();
cEntity.setId(cId.toString());
TimelineEntity[] lastPublishedEntities =
timelineClient.getLastPublishedEntities();
Assert.assertNotNull(lastPublishedEntities);
Assert.assertEquals(1, lastPublishedEntities.length);
TimelineEntity entity = lastPublishedEntities[0];
Assert.assertEquals(cEntity, entity);
NavigableSet<TimelineEvent> events = entity.getEvents();
Assert.assertEquals(1, events.size());
Assert.assertEquals(ContainerMetricsConstants.KILLED_EVENT_TYPE,
events.iterator().next().getId());
Map<String, Object> info = entity.getInfo();
Assert.assertTrue(
info.containsKey(ContainerMetricsConstants.DIAGNOSTICS_INFO));
Assert.assertEquals("test kill",
info.get(ContainerMetricsConstants.DIAGNOSTICS_INFO));
Assert.assertTrue(
info.containsKey(ContainerMetricsConstants.EXIT_STATUS_INFO));
Assert.assertEquals(1,
info.get(ContainerMetricsConstants.EXIT_STATUS_INFO));
}
@Test public void testContainerResourceUsage() {
ApplicationId appId = ApplicationId.newInstance(0, 1);
publisher.createTimelineClient(appId);