diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index ddc1af9907..6f26dd966b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1967,6 +1967,15 @@ public class YarnConfiguration extends Configuration {
public static final String TIMELINE_SERVICE_READER_CLASS =
TIMELINE_SERVICE_PREFIX + "reader.class";
+ /** The setting that controls how often the timeline collector flushes the
+ * timeline writer.
+ */
+ public static final String TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS =
+ TIMELINE_SERVICE_PREFIX + "writer.flush-interval-seconds";
+
+ public static final int
+ DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
+
// mark app-history related configs @Private as application history is going
// to be integrated into the timeline service
@Private
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 46a661f951..b2f1a936aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -796,7 +796,15 @@
yarn.system-metrics-publisher.enabled
false
-
+
+
+ The setting that controls whether yarn container metrics is
+ published to the timeline server or not by RM. This configuration setting is
+ for ATS V2.
+ yarn.rm.system-metrics-publisher.emit-container-events
+ false
+
+
Number of worker threads that send the yarn system metrics
@@ -2197,6 +2205,13 @@
420
+
+ The setting that controls how often the timeline collector
+ flushes the timeline writer.
+ yarn.timeline-service.writer.flush-interval-seconds
+ 60
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 23ad4f4f9b..e9f2085d70 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -19,6 +19,13 @@
package org.apache.hadoop.yarn.server.timelineservice.collector;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -32,9 +39,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
/**
* Class that manages adding and removing collectors and their lifecycle. It
@@ -47,7 +52,10 @@ public abstract class TimelineCollectorManager extends AbstractService {
private static final Log LOG =
LogFactory.getLog(TimelineCollectorManager.class);
- protected TimelineWriter writer;
+ private TimelineWriter writer;
+ private ScheduledExecutorService writerFlusher;
+ private int flushInterval;
+ private boolean writerFlusherRunning;
@Override
public void serviceInit(Configuration conf) throws Exception {
@@ -56,6 +64,12 @@ public abstract class TimelineCollectorManager extends AbstractService {
FileSystemTimelineWriterImpl.class,
TimelineWriter.class), conf);
writer.init(conf);
+ // create a single dedicated thread for flushing the writer on a periodic
+ // basis
+ writerFlusher = Executors.newSingleThreadScheduledExecutor();
+ flushInterval = conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS);
super.serviceInit(conf);
}
@@ -65,6 +79,10 @@ public abstract class TimelineCollectorManager extends AbstractService {
if (writer != null) {
writer.start();
}
+ // schedule the flush task
+ writerFlusher.scheduleAtFixedRate(new WriterFlushTask(writer),
+ flushInterval, flushInterval, TimeUnit.SECONDS);
+ writerFlusherRunning = true;
}
// access to this map is synchronized with the map itself
@@ -161,9 +179,48 @@ public abstract class TimelineCollectorManager extends AbstractService {
c.serviceStop();
}
}
+ // stop the flusher first
+ if (writerFlusher != null) {
+ writerFlusher.shutdown();
+ writerFlusherRunning = false;
+ if (!writerFlusher.awaitTermination(30, TimeUnit.SECONDS)) {
+ // in reality it should be ample time for the flusher task to finish
+ // even if it times out, writers may be able to handle closing in this
+ // situation fine
+ // proceed to close the writer
+ LOG.warn("failed to stop the flusher task in time. " +
+ "will still proceed to close the writer.");
+ }
+ }
if (writer != null) {
writer.close();
}
super.serviceStop();
}
+
+ @VisibleForTesting
+ boolean writerFlusherRunning() {
+ return writerFlusherRunning;
+ }
+
+ /**
+ * Task that invokes the flush operation on the timeline writer.
+ */
+ private static class WriterFlushTask implements Runnable {
+ private final TimelineWriter writer;
+
+ public WriterFlushTask(TimelineWriter writer) {
+ this.writer = writer;
+ }
+
+ public void run() {
+ try {
+ writer.flush();
+ } catch (Throwable th) {
+ // we need to handle all exceptions or subsequent execution may be
+ // suppressed
+ LOG.error("exception during timeline writer flush!", th);
+ }
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index 34a6b7c2f9..b22b39f45c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -127,6 +127,11 @@ public class FileSystemTimelineWriterImpl extends AbstractService
mkdirs(outputRoot, ENTITIES_DIR);
}
+ @Override
+ public void flush() throws IOException {
+ // no op
+ }
+
private static String mkdirs(String... dirStrs) throws IOException {
StringBuilder path = new StringBuilder();
for (String dirStr : dirStrs) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index e48ca60262..876ad6ab34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -214,6 +214,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
return null;
}
+ @Override
+ public void flush() throws IOException {
+ // flush all buffered mutators
+ entityTable.flush();
+ }
+
/**
* close the hbase connections The close APIs perform flushing and release any
* resources held
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
index 5b4442ceb8..381ff17aec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
@@ -187,6 +187,11 @@ public class PhoenixTimelineWriterImpl extends AbstractService
}
+ @Override
+ public void flush() throws IOException {
+ // currently no-op
+ }
+
// Utility functions
@Private
@VisibleForTesting
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
index 494e8ad5f1..50136de766 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java
@@ -70,4 +70,13 @@ public interface TimelineWriter extends Service {
*/
TimelineWriteResponse aggregate(TimelineEntity data,
TimelineAggregationTrack track) throws IOException;
+
+ /**
+ * Flushes the data to the backend storage. Whatever may be buffered will be
+ * written to the storage when the method returns. This may be a potentially
+ * time-consuming operation, and should be used judiciously.
+ *
+ * @throws IOException
+ */
+ void flush() throws IOException;
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
index 87343fd637..0d69fbc5c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
@@ -66,6 +66,11 @@ public class TestNMTimelineCollectorManager {
}
}
+ @Test
+ public void testStartingWriterFlusher() throws Exception {
+ assertTrue(collectorManager.writerFlusherRunning());
+ }
+
@Test
public void testStartWebApp() throws Exception {
assertNotNull(collectorManager.getRestServerBindAddress());