diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index de66e7525e..04d0fd14be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1226,6 +1226,16 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS =
"0.0.0.0:" + DEFAULT_NM_COLLECTOR_SERVICE_PORT;
+ /**
+ * The setting that controls whether yarn container events are published to
+ * the timeline service or not by NM. This configuration setting is for ATS
+ * V2
+ */
+ public static final String NM_PUBLISH_CONTAINER_EVENTS_ENABLED = NM_PREFIX
+ + "emit-container-events";
+ public static final boolean DEFAULT_NM_PUBLISH_CONTAINER_EVENTS_ENABLED =
+ true;
+
/** Interval in between cache cleanups.*/
public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
NM_PREFIX + "localizer.cache.cleanup.interval-ms";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 45894e9461..bfcbf4b6be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1190,6 +1190,14 @@
${yarn.nodemanager.hostname}:8048
+
+ The setting that controls whether yarn container events are
+ published to the timeline service or not by NM. This configuration setting
+ is for ATS V2.
+ yarn.nodemanager.emit-container-events
+ true
+
+
Interval in between cache cleanups.
yarn.nodemanager.localizer.cache.cleanup.interval-ms
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index cbf3e5eb7f..e9bd9651d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,6 +89,8 @@ public class NMTimelinePublisher extends CompositeService {
private final Map appToClientMap;
+ private boolean publishNMContainerEvents = true;
+
public NMTimelinePublisher(Context context) {
super(NMTimelinePublisher.class.getName());
this.context = context;
@@ -110,6 +113,10 @@ protected void serviceInit(Configuration conf) throws Exception {
if (webAppURLWithoutScheme.contains(":")) {
httpPort = webAppURLWithoutScheme.split(":")[1];
}
+
+ publishNMContainerEvents = conf.getBoolean(
+ YarnConfiguration.NM_PUBLISH_CONTAINER_EVENTS_ENABLED,
+ YarnConfiguration.DEFAULT_NM_PUBLISH_CONTAINER_EVENTS_ENABLED);
super.serviceInit(conf);
}
@@ -155,31 +162,148 @@ protected void handleNMTimelineEvent(NMTimelineEvent event) {
public void reportContainerResourceUsage(Container container, Long pmemUsage,
Float cpuUsagePercentPerCore) {
- if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
- cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
- ContainerEntity entity =
- createContainerEntity(container.getContainerId());
- long currentTimeMillis = System.currentTimeMillis();
- if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
- TimelineMetric memoryMetric = new TimelineMetric();
- memoryMetric.setId(ContainerMetric.MEMORY.toString());
- memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
- memoryMetric.addValue(currentTimeMillis, pmemUsage);
- entity.addMetric(memoryMetric);
- }
- if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
- TimelineMetric cpuMetric = new TimelineMetric();
- cpuMetric.setId(ContainerMetric.CPU.toString());
- // TODO: support average
- cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
- cpuMetric.addValue(currentTimeMillis,
- Math.round(cpuUsagePercentPerCore));
- entity.addMetric(cpuMetric);
+ if (publishNMContainerEvents) {
+ if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE
+ || cpuUsagePercentPerCore !=
+ ResourceCalculatorProcessTree.UNAVAILABLE) {
+ ContainerEntity entity =
+ createContainerEntity(container.getContainerId());
+ long currentTimeMillis = System.currentTimeMillis();
+ if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
+ TimelineMetric memoryMetric = new TimelineMetric();
+ memoryMetric.setId(ContainerMetric.MEMORY.toString());
+ memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+ memoryMetric.addValue(currentTimeMillis, pmemUsage);
+ entity.addMetric(memoryMetric);
+ }
+ if (cpuUsagePercentPerCore !=
+ ResourceCalculatorProcessTree.UNAVAILABLE) {
+ TimelineMetric cpuMetric = new TimelineMetric();
+ cpuMetric.setId(ContainerMetric.CPU.toString());
+ // TODO: support average
+ cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+ cpuMetric.addValue(currentTimeMillis,
+ Math.round(cpuUsagePercentPerCore));
+ entity.addMetric(cpuMetric);
+ }
+ entity.setIdPrefix(TimelineServiceHelper.
+ invertLong(container.getContainerStartTime()));
+ ApplicationId appId = container.getContainerId().
+ getApplicationAttemptId().getApplicationId();
+ try {
+ // no need to put it as part of publisher as timeline client
+ // already has Queuing concept
+ TimelineV2Client timelineClient = getTimelineClient(appId);
+ if (timelineClient != null) {
+ timelineClient.putEntitiesAsync(entity);
+ } else {
+ LOG.error("Seems like client has been removed before the container"
+ + " metric could be published for " +
+ container.getContainerId());
+ }
+ } catch (IOException e) {
+ LOG.error(
+ "Failed to publish Container metrics for container " +
+ container.getContainerId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to publish Container metrics for container " +
+ container.getContainerId(), e);
+ }
+ } catch (YarnException e) {
+ LOG.error(
+ "Failed to publish Container metrics for container " +
+ container.getContainerId(), e.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to publish Container metrics for container " +
+ container.getContainerId(), e);
+ }
+ }
}
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void publishContainerCreatedEvent(ContainerEvent event) {
+ if (publishNMContainerEvents) {
+ ContainerId containerId = event.getContainerID();
+ ContainerEntity entity = createContainerEntity(containerId);
+ Container container = context.getContainers().get(containerId);
+ Resource resource = container.getResource();
+
+ Map entityInfo = new HashMap();
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO,
+ resource.getMemorySize());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO,
+ resource.getVirtualCores());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
+ nodeId.getHost());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
+ nodeId.getPort());
+ entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
+ container.getPriority().toString());
+ entityInfo.put(
+ ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
+ httpAddress);
+ entity.setInfo(entityInfo);
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
+ tEvent.setTimestamp(event.getTimestamp());
+
+ long containerStartTime = container.getContainerStartTime();
+ entity.addEvent(tEvent);
+ entity.setCreatedTime(containerStartTime);
+ entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
+ dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+ containerId.getApplicationAttemptId().getApplicationId()));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void publishContainerFinishedEvent(ContainerStatus containerStatus,
+ long containerFinishTime, long containerStartTime) {
+ if (publishNMContainerEvents) {
+ ContainerId containerId = containerStatus.getContainerId();
+ TimelineEntity entity = createContainerEntity(containerId);
+
+ Map entityInfo = new HashMap();
+ entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
+ containerStatus.getDiagnostics());
+ entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
+ containerStatus.getExitStatus());
+ entityInfo.put(ContainerMetricsConstants.STATE_INFO,
+ ContainerState.COMPLETE.toString());
+ entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
+ containerFinishTime);
+ entity.setInfo(entityInfo);
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+ tEvent.setTimestamp(containerFinishTime);
+ entity.addEvent(tEvent);
+ entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
+
+ dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+ containerId.getApplicationAttemptId().getApplicationId()));
+ }
+ }
+
+ private void publishContainerLocalizationEvent(
+ ContainerLocalizationEvent event, String eventType) {
+ if (publishNMContainerEvents) {
+ Container container = event.getContainer();
+ ContainerId containerId = container.getContainerId();
+ TimelineEntity entity = createContainerEntity(containerId);
+
+ TimelineEvent tEvent = new TimelineEvent();
+ tEvent.setId(eventType);
+ tEvent.setTimestamp(event.getTimestamp());
+ entity.addEvent(tEvent);
entity.setIdPrefix(TimelineServiceHelper.
invertLong(container.getContainerStartTime()));
- ApplicationId appId = container.getContainerId().getApplicationAttemptId()
- .getApplicationId();
+
+ ApplicationId appId = container.getContainerId().
+ getApplicationAttemptId().getApplicationId();
try {
// no need to put it as part of publisher as timeline client already has
// Queuing concept
@@ -187,8 +311,8 @@ public void reportContainerResourceUsage(Container container, Long pmemUsage,
if (timelineClient != null) {
timelineClient.putEntitiesAsync(entity);
} else {
- LOG.error("Seems like client has been removed before the container"
- + " metric could be published for " + container.getContainerId());
+ LOG.error("Seems like client has been removed before the event"
+ + " could be published for " + container.getContainerId());
}
} catch (IOException e) {
LOG.error("Failed to publish Container metrics for container "
@@ -208,110 +332,6 @@ public void reportContainerResourceUsage(Container container, Long pmemUsage,
}
}
- @SuppressWarnings("unchecked")
- private void publishContainerCreatedEvent(ContainerEvent event) {
- ContainerId containerId = event.getContainerID();
- ContainerEntity entity = createContainerEntity(containerId);
- Container container = context.getContainers().get(containerId);
- Resource resource = container.getResource();
-
- Map entityInfo = new HashMap();
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_INFO,
- resource.getMemorySize());
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_INFO,
- resource.getVirtualCores());
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_INFO,
- nodeId.getHost());
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_INFO,
- nodeId.getPort());
- entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_INFO,
- container.getPriority().toString());
- entityInfo.put(
- ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
- httpAddress);
- entity.setInfo(entityInfo);
-
- TimelineEvent tEvent = new TimelineEvent();
- tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
- tEvent.setTimestamp(event.getTimestamp());
-
- long containerStartTime = container.getContainerStartTime();
- entity.addEvent(tEvent);
- entity.setCreatedTime(containerStartTime);
- entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
- dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
- containerId.getApplicationAttemptId().getApplicationId()));
- }
-
- @SuppressWarnings("unchecked")
- private void publishContainerFinishedEvent(ContainerStatus containerStatus,
- long containerFinishTime, long containerStartTime) {
- ContainerId containerId = containerStatus.getContainerId();
- TimelineEntity entity = createContainerEntity(containerId);
-
- Map entityInfo = new HashMap();
- entityInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO,
- containerStatus.getDiagnostics());
- entityInfo.put(ContainerMetricsConstants.EXIT_STATUS_INFO,
- containerStatus.getExitStatus());
- entityInfo.put(ContainerMetricsConstants.STATE_INFO,
- ContainerState.COMPLETE.toString());
- entityInfo.put(ContainerMetricsConstants.CONTAINER_FINISHED_TIME,
- containerFinishTime);
- entity.setInfo(entityInfo);
-
- TimelineEvent tEvent = new TimelineEvent();
- tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
- tEvent.setTimestamp(containerFinishTime);
- entity.addEvent(tEvent);
- entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
-
- dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
- containerId.getApplicationAttemptId().getApplicationId()));
- }
-
- private void publishContainerLocalizationEvent(
- ContainerLocalizationEvent event, String eventType) {
- Container container = event.getContainer();
- ContainerId containerId = container.getContainerId();
- TimelineEntity entity = createContainerEntity(containerId);
-
- TimelineEvent tEvent = new TimelineEvent();
- tEvent.setId(eventType);
- tEvent.setTimestamp(event.getTimestamp());
- entity.addEvent(tEvent);
- entity.setIdPrefix(TimelineServiceHelper.
- invertLong(container.getContainerStartTime()));
-
- ApplicationId appId =
- container.getContainerId().getApplicationAttemptId().getApplicationId();
- try {
- // no need to put it as part of publisher as timeline client already has
- // Queuing concept
- TimelineV2Client timelineClient = getTimelineClient(appId);
- if (timelineClient != null) {
- timelineClient.putEntitiesAsync(entity);
- } else {
- LOG.error("Seems like client has been removed before the event could be"
- + " published for " + container.getContainerId());
- }
- } catch (IOException e) {
- LOG.error("Failed to publish Container metrics for container "
- + container.getContainerId());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to publish Container metrics for container "
- + container.getContainerId(), e);
- }
- } catch (YarnException e) {
- LOG.error("Failed to publish Container metrics for container "
- + container.getContainerId(), e.getMessage());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to publish Container metrics for container "
- + container.getContainerId(), e);
- }
- }
- }
-
private static ContainerEntity createContainerEntity(
ContainerId containerId) {
ContainerEntity entity = new ContainerEntity();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
index 2585262547..cf9ede0805 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
@@ -67,6 +67,8 @@ public class TestNMTimelinePublisher {
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setLong(YarnConfiguration.ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS,
3000L);
+ conf.setBoolean(YarnConfiguration.NM_PUBLISH_CONTAINER_EVENTS_ENABLED,
+ true);
timelineClient = new DummyTimelineClient(null);
Context context = createMockContext();
dispatcher = new DrainDispatcher();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
index 96a92fcd06..73176ef3f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md
@@ -142,7 +142,7 @@ New configuration parameters that are introduced with v.2 are marked bold.
| **`yarn.timeline-service.timeline-client.number-of-async-entities-to-merge`** | Time line V2 client tries to merge these many number of async entities (if available) and then call the REST ATS V2 API to submit. Defaults to `10`. |
| **`yarn.timeline-service.hbase.coprocessor.app-final-value-retention-milliseconds`** | The setting that controls how long the final value of a metric of a completed app is retained before merging into the flow sum. Defaults to `259200000` (3 days). This should be set in the HBase cluster. |
| **`yarn.rm.system-metrics-publisher.emit-container-events`** | The setting that controls whether yarn container metrics is published to the timeline server or not by RM. This configuration setting is for ATS V2. Defaults to `false`. |
-
+| **`yarn.nodemanager.emit-container-events`** | The setting that controls whether yarn container metrics is published to the timeline server or not by NM. This configuration setting is for ATS V2. Defaults to `true`. |
#### Security Configuration