diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineHealth.java index d592167b86..3462441161 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineHealth.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelineHealth.java @@ -39,12 +39,12 @@ public class TimelineHealth { * Timline health status. * * RUNNING - Service is up and running - * READER_CONNECTION_FAULURE - isConnectionAlive() of reader implementation + * CONNECTION_FAULURE - isConnectionAlive() of reader / writer implementation * reported an error */ public enum TimelineHealthStatus { RUNNING, - READER_CONNECTION_FAILURE + CONNECTION_FAILURE } private TimelineHealthStatus healthStatus; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java index 8de3b8645a..7d8098e40d 100755 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineReaderImpl.java @@ -108,7 +108,7 @@ public class DocumentStoreTimelineReaderImpl ""); } else { return new TimelineHealth( - TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE, + TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE, "Timeline store reader not initialized."); } } @@ -131,4 +131,4 @@ public class DocumentStoreTimelineReaderImpl } return timelineEntities; } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java index 572d888b52..0ea70f9dcf 100755 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-documentstore/src/main/java/org/apache/hadoop/yarn/server/timelineservice/documentstore/DocumentStoreTimelineWriterImpl.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.*; import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.documentstore.lib.DocumentStoreVendor; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack; @@ -151,6 +152,11 @@ public class DocumentStoreTimelineWriterImpl extends AbstractService return null; } + @Override + public TimelineHealth getHealthStatus() { + return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, ""); + } + private void appendSubAppUserIfExists(TimelineCollectorContext context, String subApplicationUser) { String userId = context.getUserId(); @@ -282,4 +288,4 @@ public class DocumentStoreTimelineWriterImpl extends AbstractService @Override public void flush() { } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index f3592d2924..b6e1d76902 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -115,7 +115,7 @@ public class HBaseTimelineReaderImpl ""); } catch (IOException e){ return new TimelineHealth( - TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE, + TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE, "HBase connection is down"); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index dda004d7ec..7233dab345 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; @@ -604,6 +605,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements return null; } + @Override + public TimelineHealth getHealthStatus() { + try { + storageMonitor.checkStorageIsUp(); + return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, + ""); + } catch (IOException e){ + return new TimelineHealth( + TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE, + "HBase connection is down"); + } + } + /* * (non-Javadoc) * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 0c54ed03f9..1c1fe118b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -71,6 +72,9 @@ public abstract class TimelineCollector extends CompositeService { private volatile boolean isStopped = false; + private int maxWriteRetries; + private long writeRetryInterval; + public TimelineCollector(String name) { super(name); } @@ -86,6 +90,13 @@ public abstract class TimelineCollector extends CompositeService { new ArrayBlockingQueue<>(capacity)); pool.setRejectedExecutionHandler( new ThreadPoolExecutor.DiscardOldestPolicy()); + + maxWriteRetries = + conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES); + writeRetryInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS); } @Override @@ -153,18 +164,54 @@ public abstract class TimelineCollector extends CompositeService { UserGroupInformation callerUgi) throws IOException { LOG.debug("putEntities(entities={}, callerUgi={})", entities, callerUgi); - TimelineWriteResponse response; - // synchronize on the writer object so that no other threads can - // flush the writer buffer concurrently and swallow any exception - // caused by the timeline enitites that are being put here. - synchronized (writer) { - response = writeTimelineEntities(entities, callerUgi); - flushBufferedTimelineEntities(); + TimelineWriteResponse response = null; + try { + boolean isStorageUp = checkRetryWithSleep(); + if (isStorageUp) { + // synchronize on the writer object so that no other threads can + // flush the writer buffer concurrently and swallow any exception + // caused by the timeline enitites that are being put here. + synchronized (writer) { + response = writeTimelineEntities(entities, callerUgi); + flushBufferedTimelineEntities(); + } + } else { + String msg = String.format("Failed to putEntities(" + + "entities=%s, callerUgi=%s) as Timeline Storage is Down", + entities, callerUgi); + throw new IOException(msg); + } + } catch (InterruptedException ex) { + String msg = String.format("Interrupted while retrying to putEntities(" + + "entities=%s, callerUgi=%s)", entities, callerUgi); + throw new IOException(msg); } return response; } + + private boolean checkRetryWithSleep() throws InterruptedException { + int retries = maxWriteRetries; + while (retries > 0) { + TimelineHealth timelineHealth = writer.getHealthStatus(); + if (timelineHealth.getHealthStatus().equals( + TimelineHealth.TimelineHealthStatus.RUNNING)) { + return true; + } else { + try { + Thread.sleep(writeRetryInterval); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw ex; + } + retries--; + } + } + return false; + } + + /** * Add or update an domain. If the domain already exists, only the owner * and the admin can update it. @@ -179,11 +226,25 @@ public abstract class TimelineCollector extends CompositeService { UserGroupInformation callerUgi) throws IOException { LOG.debug("putDomain(domain={}, callerUgi={})", domain, callerUgi); - TimelineWriteResponse response; - synchronized (writer) { - final TimelineCollectorContext context = getTimelineEntityContext(); - response = writer.write(context, domain); - flushBufferedTimelineEntities(); + TimelineWriteResponse response = null; + try { + boolean isStorageUp = checkRetryWithSleep(); + if (isStorageUp) { + synchronized (writer) { + final TimelineCollectorContext context = getTimelineEntityContext(); + response = writer.write(context, domain); + flushBufferedTimelineEntities(); + } + } else { + String msg = String.format("Failed to putDomain(" + + "domain=%s, callerUgi=%s) as Timeline Storage is Down", + domain, callerUgi); + throw new IOException(msg); + } + } catch (InterruptedException ex) { + String msg = String.format("Interrupted while retrying to putDomain(" + + "domain=%s, callerUgi=%s)", domain, callerUgi); + throw new IOException(msg); } return response; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index 3dd7396cfc..197fa5a407 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -454,7 +454,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService fs.exists(rootPath); } catch (IOException e) { return new TimelineHealth( - TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE, + TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE, e.getMessage() ); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index d5c70a0607..d407636cb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -194,6 +195,20 @@ public class FileSystemTimelineWriterImpl extends AbstractService // no op } + @Override + public TimelineHealth getHealthStatus() { + try { + fs.exists(rootPath); + } catch (IOException e) { + return new TimelineHealth( + TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE, + e.getMessage() + ); + } + return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, + ""); + } + private void mkdirs(Path... paths) throws IOException, InterruptedException { for (Path path: paths) { if (!existsWithRetries(path)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/NoOpTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/NoOpTimelineWriterImpl.java index 48b334800f..fd31209693 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/NoOpTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/NoOpTimelineWriterImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -77,4 +78,10 @@ public class NoOpTimelineWriterImpl extends AbstractService implements public void flush() throws IOException { LOG.debug("NoOpTimelineWriter is configured. Ignoring flush call"); } + + @Override + public TimelineHealth getHealthStatus() { + return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, + "NoOpTimelineWriter is configured. "); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 08cfc8becb..ccc7491037 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -95,4 +96,13 @@ public interface TimelineWriter extends Service { * entities to the backend storage. */ void flush() throws IOException; + + /** + * Check if writer connection is working properly. + * + * @return True if writer connection works as expected, false otherwise. + */ + TimelineHealth getHealthStatus(); + + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java index 96636f8bf8..09a1e6cb3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.timelineservice.collector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Sets; +import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -155,7 +156,17 @@ public class TestTimelineCollector { @Test public void testPutEntity() throws IOException { TimelineWriter writer = mock(TimelineWriter.class); + TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth. + TimelineHealthStatus.RUNNING, ""); + when(writer.getHealthStatus()).thenReturn(timelineHealth); + + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5); + conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + 500L); + TimelineCollector collector = new TimelineCollectorForTest(writer); + collector.init(conf); TimelineEntities entities = generateTestEntities(1, 1); collector.putEntities( @@ -166,6 +177,36 @@ public class TestTimelineCollector { verify(writer, times(1)).flush(); } + + @Test + public void testPutEntityWithStorageDown() throws IOException { + TimelineWriter writer = mock(TimelineWriter.class); + TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth. + TimelineHealthStatus.CONNECTION_FAILURE, ""); + when(writer.getHealthStatus()).thenReturn(timelineHealth); + + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5); + conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + 500L); + + TimelineCollector collector = new TimelineCollectorForTest(writer); + collector.init(conf); + + TimelineEntities entities = generateTestEntities(1, 1); + boolean exceptionCaught = false; + try { + collector.putEntities(entities, UserGroupInformation. + createRemoteUser("test-user")); + } catch (Exception e) { + if (e.getMessage().contains("Failed to putEntities")) { + exceptionCaught = true; + } + } + assertTrue("TimelineCollector putEntity failed to " + + "handle storage down", exceptionCaught); + } + /** * Test TimelineCollector's interaction with TimelineWriter upon * putEntityAsync() calls. @@ -222,7 +263,17 @@ public class TestTimelineCollector { */ @Test public void testPutDomain() throws IOException { TimelineWriter writer = mock(TimelineWriter.class); + TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth. + TimelineHealthStatus.RUNNING, ""); + when(writer.getHealthStatus()).thenReturn(timelineHealth); + + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5); + conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + 500L); + TimelineCollector collector = new TimelineCollectorForTest(writer); + collector.init(conf); TimelineDomain domain = generateDomain("id", "desc", "owner", "reader1,reader2", "writer", 0L, @@ -287,8 +338,19 @@ public class TestTimelineCollector { 1L, ApplicationId.newInstance(ts, 1).toString()); } }; - collector.init(new Configuration()); - collector.setWriter(mock(TimelineWriter.class)); + + TimelineWriter writer = mock(TimelineWriter.class); + TimelineHealth timelineHealth = new TimelineHealth(TimelineHealth. + TimelineHealthStatus.RUNNING, ""); + when(writer.getHealthStatus()).thenReturn(timelineHealth); + + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, 5); + conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + 500L); + + collector.init(conf); + collector.setWriter(writer); // Put 5 entities with different metric values. TimelineEntities entities = new TimelineEntities();