diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 61d1d72f56..ff4b493807 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1747,6 +1747,12 @@ public static boolean isAclEnabled(Configuration conf) { public static final long TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT = 7 * 60; + // This is temporary solution. The configuration will be deleted once we have + // the FileSystem API to check whether append operation is supported or not. + public static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND + = TIMELINE_SERVICE_PREFIX + + "entity-file.fs-support-append"; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index 258b9f5c32..09298b5991 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.client.api; +import java.io.Flushable; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -41,7 +42,8 @@ */ @Public @Evolving -public abstract class TimelineClient extends AbstractService { +public abstract class TimelineClient extends AbstractService implements + Flushable { /** * Create a timeline client. The current UGI when the user initialize the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java index aa1f1f8f1e..9e719b7326 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/FileSystemTimelineWriter.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.Closeable; +import java.io.FileNotFoundException; import java.io.Flushable; import java.io.IOException; import java.net.URI; @@ -78,12 +79,6 @@ public class FileSystemTimelineWriter extends TimelineWriter{ private static final Log LOG = LogFactory .getLog(FileSystemTimelineWriter.class); - // This is temporary solution. The configuration will be deleted once we have - // the FileSystem API to check whether append operation is supported or not. - private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND - = YarnConfiguration.TIMELINE_SERVICE_PREFIX - + "entity-file.fs-support-append"; - // App log directory must be readable by group so server can access logs // and writable by group so it can be deleted by server private static final short APP_LOG_DIR_PERMISSIONS = 0770; @@ -122,20 +117,10 @@ public FileSystemTimelineWriter(Configuration conf, .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, YarnConfiguration .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT)); + fs = FileSystem.newInstance(activePath.toUri(), fsConf); - String scheme = activePath.toUri().getScheme(); - if (scheme == null) { - scheme = FileSystem.getDefaultUri(fsConf).getScheme(); - } - if (scheme != null) { - String disableCacheName = String.format("fs.%s.impl.disable.cache", - scheme); - fsConf.setBoolean(disableCacheName, true); - } - - fs = activePath.getFileSystem(fsConf); if (!fs.exists(activePath)) { - throw new IOException(activePath + " does not exist"); + throw new FileNotFoundException(activePath + " does not exist"); } summaryEntityTypes = new HashSet( @@ -168,7 +153,8 @@ public FileSystemTimelineWriter(Configuration conf, timerTaskTTL); this.isAppendSupported = - conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true); + conf.getBoolean( + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true); objMapper = createObjectMapper(); @@ -181,7 +167,7 @@ public FileSystemTimelineWriter(Configuration conf, + "=" + cleanIntervalSecs + ", " + YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS + "=" + ttl + ", " + - TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND + "=" + isAppendSupported + ", " + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR + "=" + activePath); @@ -195,6 +181,11 @@ public FileSystemTimelineWriter(Configuration conf, } } + @Override + public String toString() { + return "FileSystemTimelineWriter writing to " + activePath; + } + @Override public TimelinePutResponse putEntities( ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId, @@ -263,9 +254,20 @@ public void putDomain(ApplicationAttemptId appAttemptId, } @Override - public void close() throws Exception { - if (this.logFDsCache != null) { - this.logFDsCache.close(); + public synchronized void close() throws Exception { + if (logFDsCache != null) { + LOG.debug("Closing cache"); + logFDsCache.flush(); + logFDsCache.close(); + logFDsCache = null; + } + } + + @Override + public void flush() throws IOException { + if (logFDsCache != null) { + LOG.debug("Flushing cache"); + logFDsCache.flush(); } } @@ -333,6 +335,9 @@ public void writeEntities(List entities) if (writerClosed()) { prepareForWrite(); } + if (LOG.isDebugEnabled()) { + LOG.debug("Writing entity list of size " + entities.size()); + } for (TimelineEntity entity : entities) { getObjectMapper().writeValue(getJsonGenerator(), entity); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 195a6617f9..ef4622972f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -325,6 +325,13 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + @Override + public void flush() throws IOException { + if (timelineWriter != null) { + timelineWriter.flush(); + } + } + @Override public TimelinePutResponse putEntities( TimelineEntity... entities) throws IOException, YarnException { @@ -432,6 +439,12 @@ public Void run() throws Exception { operateDelegationToken(cancelDTAction); } + @Override + public String toString() { + return super.toString() + " with timeline server " + resURI + + " and writer " + timelineWriter; + } + private Object operateDelegationToken( final PrivilegedExceptionAction action) throws IOException, YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java index c616e63936..9590f4a84a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineWriter.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.client.api.impl; +import java.io.Flushable; import java.io.IOException; +import java.io.InterruptedIOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.URI; import java.security.PrivilegedExceptionAction; @@ -48,7 +50,7 @@ */ @Private @Unstable -public abstract class TimelineWriter { +public abstract class TimelineWriter implements Flushable { private static final Log LOG = LogFactory .getLog(TimelineWriter.class); @@ -68,6 +70,16 @@ public void close() throws Exception { // DO NOTHING } + @Override + public void flush() throws IOException { + // DO NOTHING + } + + @Override + public String toString() { + return "Timeline writer posting to " + resURI; + } + public TimelinePutResponse putEntities( TimelineEntity... entities) throws IOException, YarnException { TimelineEntities entitiesContainer = new TimelineEntities(); @@ -104,19 +116,27 @@ public ClientResponse run() throws Exception { } }); } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException)cause; + } else { + throw new IOException(cause); + } } catch (InterruptedException ie) { - throw new IOException(ie); + throw (IOException)new InterruptedIOException().initCause(ie); } if (resp == null || resp.getClientResponseStatus() != ClientResponse.Status.OK) { String msg = "Failed to get the response from the timeline server."; LOG.error(msg); - if (LOG.isDebugEnabled() && resp != null) { - String output = resp.getEntity(String.class); - LOG.debug("HTTP error code: " + resp.getStatus() - + " Server response : \n" + output); + if (resp != null) { + msg += " HTTP error code: " + resp.getStatus(); + if (LOG.isDebugEnabled()) { + String output = resp.getEntity(String.class); + LOG.debug("HTTP error code: " + resp.getStatus() + + " Server response : \n" + output); + } } throw new YarnException(msg); } @@ -128,10 +148,16 @@ public ClientResponse run() throws Exception { public ClientResponse doPostingObject(Object object, String path) { WebResource webResource = client.resource(resURI); if (path == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("POST to " + resURI); + } return webResource.accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) .post(ClientResponse.class, object); } else if (path.equals("domain")) { + if (LOG.isDebugEnabled()) { + LOG.debug("PUT to " + resURI +"/" + path); + } return webResource.path(path).accept(MediaType.APPLICATION_JSON) .type(MediaType.APPLICATION_JSON) .put(ClientResponse.class, object); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java index eb47ef241e..e1e684b1e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java @@ -129,9 +129,9 @@ public TimelineEntities getEntities( getUser(req)); } catch (NumberFormatException e) { throw new BadRequestException( - "windowStart, windowEnd or limit is not a numeric value."); + "windowStart, windowEnd, fromTs or limit is not a numeric value: " + e); } catch (IllegalArgumentException e) { - throw new BadRequestException("requested invalid field."); + throw new BadRequestException("requested invalid field: " + e); } catch (Exception e) { LOG.error("Error getting entities", e); throw new WebApplicationException(e, @@ -160,8 +160,7 @@ public TimelineEntity getEntity( parseFieldsStr(fields, ","), getUser(req)); } catch (IllegalArgumentException e) { - throw new BadRequestException( - "requested invalid field."); + throw new BadRequestException(e); } catch (Exception e) { LOG.error("Error getting entity", e); throw new WebApplicationException(e, @@ -201,8 +200,9 @@ public TimelineEvents getEvents( parseLongStr(limit), getUser(req)); } catch (NumberFormatException e) { - throw new BadRequestException( - "windowStart, windowEnd or limit is not a numeric value."); + throw (BadRequestException)new BadRequestException( + "windowStart, windowEnd or limit is not a numeric value.") + .initCause(e); } catch (Exception e) { LOG.error("Error getting entity timelines", e); throw new WebApplicationException(e, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java index efbf994bbb..7eec7c393c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityCacheItem.java @@ -107,30 +107,30 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId, store.init(config); store.start(); } - TimelineDataManager tdm = new TimelineDataManager(store, - aclManager); - tdm.init(config); - tdm.start(); - List removeList = new ArrayList(); - for (LogInfo log : appLogs.getDetailLogs()) { - LOG.debug("Try refresh logs for {}", log.getFilename()); - // Only refresh the log that matches the cache id - if (log.matchesGroupId(groupId)) { - Path appDirPath = appLogs.getAppDirPath(); - if (fs.exists(log.getPath(appDirPath))) { - LOG.debug("Refresh logs for cache id {}", groupId); - log.parseForStore(tdm, appDirPath, appLogs.isDone(), jsonFactory, - objMapper, fs); - } else { - // The log may have been removed, remove the log - removeList.add(log); - LOG.info("File {} no longer exists, remove it from log list", - log.getPath(appDirPath)); + List removeList = new ArrayList<>(); + try(TimelineDataManager tdm = + new TimelineDataManager(store, aclManager)) { + tdm.init(config); + tdm.start(); + for (LogInfo log : appLogs.getDetailLogs()) { + LOG.debug("Try refresh logs for {}", log.getFilename()); + // Only refresh the log that matches the cache id + if (log.matchesGroupId(groupId)) { + Path appDirPath = appLogs.getAppDirPath(); + if (fs.exists(log.getPath(appDirPath))) { + LOG.debug("Refresh logs for cache id {}", groupId); + log.parseForStore(tdm, appDirPath, appLogs.isDone(), + jsonFactory, objMapper, fs); + } else { + // The log may have been removed, remove the log + removeList.add(log); + LOG.info("File {} no longer exists, removing it from log list", + log.getPath(appDirPath)); + } } } } appLogs.getDetailLogs().removeAll(removeList); - tdm.close(); } updateRefreshTimeToNow(); } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index b1fbd132d3..34a20720bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -26,7 +26,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -71,12 +73,13 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Plugin timeline storage to support timeline server v1.5 API. This storage * uses a file system to store timeline entities in their groups. */ -public class EntityGroupFSTimelineStore extends AbstractService +public class EntityGroupFSTimelineStore extends CompositeService implements TimelineStore { static final String DOMAIN_LOG_PREFIX = "domainlog-"; @@ -110,6 +113,7 @@ public class EntityGroupFSTimelineStore extends AbstractService private ConcurrentMap appIdLogMap = new ConcurrentHashMap(); private ScheduledThreadPoolExecutor executor; + private AtomicBoolean stopExecutors = new AtomicBoolean(false); private FileSystem fs; private ObjectMapper objMapper; private JsonFactory jsonFactory; @@ -128,7 +132,8 @@ public EntityGroupFSTimelineStore() { @Override protected void serviceInit(Configuration conf) throws Exception { summaryStore = createSummaryStore(); - summaryStore.init(conf); + addService(summaryStore); + long logRetainSecs = conf.getLong( YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS, YarnConfiguration @@ -170,17 +175,28 @@ protected boolean removeEldestEntry( }); cacheIdPlugins = loadPlugIns(conf); // Initialize yarn client for application status - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(conf); + yarnClient = createAndInitYarnClient(conf); + // if non-null, hook its lifecycle up + addIfService(yarnClient); + activeRootPath = new Path(conf.get( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT)); + doneRootPath = new Path(conf.get( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, + YarnConfiguration + .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT)); + fs = activeRootPath.getFileSystem(conf); super.serviceInit(conf); } private List loadPlugIns(Configuration conf) throws RuntimeException { - Collection pluginNames = conf.getStringCollection( + Collection pluginNames = conf.getTrimmedStringCollection( YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES); List pluginList = new LinkedList(); + Exception caught = null; for (final String name : pluginNames) { LOG.debug("Trying to load plugin class {}", name); TimelineEntityGroupPlugin cacheIdPlugin = null; @@ -191,10 +207,11 @@ private List loadPlugIns(Configuration conf) clazz, conf); } catch (Exception e) { LOG.warn("Error loading plugin " + name, e); + caught = e; } if (cacheIdPlugin == null) { - throw new RuntimeException("No class defined for " + name); + throw new RuntimeException("No class defined for " + name, caught); } LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName()); pluginList.add(cacheIdPlugin); @@ -210,8 +227,9 @@ private TimelineStore createSummaryStore() { @Override protected void serviceStart() throws Exception { + + super.serviceStart(); LOG.info("Starting {}", getName()); - yarnClient.start(); summaryStore.start(); Configuration conf = getConfig(); @@ -219,16 +237,10 @@ protected void serviceStart() throws Exception { aclManager.setTimelineStore(summaryStore); summaryTdm = new TimelineDataManager(summaryStore, aclManager); summaryTdm.init(conf); - summaryTdm.start(); - activeRootPath = new Path(conf.get( - YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, - YarnConfiguration - .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT)); - doneRootPath = new Path(conf.get( - YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, - YarnConfiguration - .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT)); - fs = activeRootPath.getFileSystem(conf); + addService(summaryTdm); + // start child services that aren't already started + super.serviceStart(); + if (!fs.exists(activeRootPath)) { fs.mkdirs(activeRootPath); fs.setPermission(activeRootPath, ACTIVE_DIR_PERMISSION); @@ -257,7 +269,8 @@ protected void serviceStart() throws Exception { YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS, YarnConfiguration .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT); - LOG.info("Scanning active directory every {} seconds", scanIntervalSecs); + LOG.info("Scanning active directory {} every {} seconds", activeRootPath, + scanIntervalSecs); LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs); executor = new ScheduledThreadPoolExecutor(numThreads, @@ -267,12 +280,12 @@ protected void serviceStart() throws Exception { TimeUnit.SECONDS); executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs, cleanerIntervalSecs, TimeUnit.SECONDS); - super.serviceStart(); } @Override protected void serviceStop() throws Exception { LOG.info("Stopping {}", getName()); + stopExecutors.set(true); if (executor != null) { executor.shutdown(); if (executor.isTerminating()) { @@ -286,18 +299,9 @@ protected void serviceStop() throws Exception { } } } - if (summaryTdm != null) { - summaryTdm.stop(); - } - if (summaryStore != null) { - summaryStore.stop(); - } - if (yarnClient != null) { - yarnClient.stop(); - } synchronized (cachedLogs) { for (EntityCacheItem cacheItem : cachedLogs.values()) { - cacheItem.getStore().close(); + ServiceOperations.stopQuietly(cacheItem.getStore()); } } super.serviceStop(); @@ -305,17 +309,34 @@ protected void serviceStop() throws Exception { @InterfaceAudience.Private @VisibleForTesting - void scanActiveLogs() throws IOException { - RemoteIterator iter = fs.listStatusIterator(activeRootPath); + int scanActiveLogs() throws IOException { + RemoteIterator iter = list(activeRootPath); + int logsToScanCount = 0; while (iter.hasNext()) { FileStatus stat = iter.next(); - ApplicationId appId = parseApplicationId(stat.getPath().getName()); + String name = stat.getPath().getName(); + ApplicationId appId = parseApplicationId(name); if (appId != null) { LOG.debug("scan logs for {} in {}", appId, stat.getPath()); + logsToScanCount++; AppLogs logs = getAndSetActiveLog(appId, stat.getPath()); executor.execute(new ActiveLogParser(logs)); + } else { + LOG.debug("Unable to parse entry {}", name); } } + return logsToScanCount; + } + + /** + * List a directory, returning an iterator which will fail fast if this + * service has been stopped + * @param path path to list + * @return an iterator over the contents of the directory + * @throws IOException + */ + private RemoteIterator list(Path path) throws IOException { + return new StoppableRemoteIterator(fs.listStatusIterator(path)); } private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId, @@ -377,11 +398,11 @@ private AppLogs getAndSetAppLogs(ApplicationId applicationId) */ @InterfaceAudience.Private @VisibleForTesting - static void cleanLogs(Path dirpath, FileSystem fs, long retainMillis) + void cleanLogs(Path dirpath, FileSystem fs, long retainMillis) throws IOException { long now = Time.now(); // Depth first search from root directory for all application log dirs - RemoteIterator iter = fs.listStatusIterator(dirpath); + RemoteIterator iter = list(dirpath); while (iter.hasNext()) { FileStatus stat = iter.next(); if (stat.isDirectory()) { @@ -456,7 +477,42 @@ private Path getDoneAppPath(ApplicationId appId) { bucket1, bucket2, appId.toString())); } - // This method has to be synchronized to control traffic to RM + /** + * Create and initialize the YARN Client. Tests may override/mock this. + * If they return null, then {@link #getAppState(ApplicationId)} MUST + * also be overridden + * @param conf configuration + * @return the yarn client, or null. + * + */ + @VisibleForTesting + protected YarnClient createAndInitYarnClient(Configuration conf) { + YarnClient client = YarnClient.createYarnClient(); + client.init(conf); + return client; + } + + /** + * Get the application state. + * @param appId application ID + * @return the state or {@link AppState#UNKNOWN} if it could not + * be determined + * @throws IOException on IO problems + */ + @VisibleForTesting + protected AppState getAppState(ApplicationId appId) throws IOException { + return getAppState(appId, yarnClient); + } + + /** + * Ask the RM for the state of the application. + * This method has to be synchronized to control traffic to RM + * @param appId application ID + * @param yarnClient + * @return the state or {@link AppState#UNKNOWN} if it could not + * be determined + * @throws IOException + */ private static synchronized AppState getAppState(ApplicationId appId, YarnClient yarnClient) throws IOException { AppState appState = AppState.ACTIVE; @@ -474,9 +530,12 @@ private static synchronized AppState getAppState(ApplicationId appId, return appState; } + /** + * Application states, + */ @InterfaceAudience.Private @VisibleForTesting - enum AppState { + public enum AppState { ACTIVE, UNKNOWN, COMPLETED @@ -526,7 +585,7 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm) if (!isDone()) { LOG.debug("Try to parse summary log for log {} in {}", appId, appDirPath); - appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient); + appState = getAppState(appId); long recentLogModTime = scanForLogs(); if (appState == AppState.UNKNOWN) { if (Time.now() - recentLogModTime > unknownActiveMillis) { @@ -559,8 +618,7 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm) long scanForLogs() throws IOException { LOG.debug("scanForLogs on {}", appDirPath); long newestModTime = 0; - RemoteIterator iterAttempt = - fs.listStatusIterator(appDirPath); + RemoteIterator iterAttempt = list(appDirPath); while (iterAttempt.hasNext()) { FileStatus statAttempt = iterAttempt.next(); LOG.debug("scanForLogs on {}", statAttempt.getPath().getName()); @@ -572,8 +630,7 @@ long scanForLogs() throws IOException { continue; } String attemptDirName = statAttempt.getPath().getName(); - RemoteIterator iterCache - = fs.listStatusIterator(statAttempt.getPath()); + RemoteIterator iterCache = list(statAttempt.getPath()); while (iterCache.hasNext()) { FileStatus statCache = iterCache.next(); if (!statCache.isFile()) { @@ -659,14 +716,34 @@ public synchronized void moveToDone() throws IOException { } } + /** + * Extract any nested throwable forwarded from IPC operations. + * @param e exception + * @return either the exception passed an an argument, or any nested + * exception which was wrapped inside an {@link UndeclaredThrowableException} + */ + private Throwable extract(Exception e) { + Throwable t = e; + if (e instanceof UndeclaredThrowableException && e.getCause() != null) { + t = e.getCause(); + } + return t; + } + private class EntityLogScanner implements Runnable { @Override public void run() { LOG.debug("Active scan starting"); try { - scanActiveLogs(); + int scanned = scanActiveLogs(); + LOG.debug("Scanned {} active applications", scanned); } catch (Exception e) { - LOG.error("Error scanning active files", e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("File scanner interrupted"); + } else { + LOG.error("Error scanning active files", t); + } } LOG.debug("Active scan complete"); } @@ -690,7 +767,12 @@ public void run() { } LOG.debug("End parsing summary logs. "); } catch (Exception e) { - LOG.error("Error processing logs for " + appLogs.getAppId(), e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("Log parser interrupted"); + } else { + LOG.error("Error processing logs for " + appLogs.getAppId(), t); + } } } } @@ -702,7 +784,12 @@ public void run() { try { cleanLogs(doneRootPath, fs, logRetainMillis); } catch (Exception e) { - LOG.error("Error cleaning files", e); + Throwable t = extract(e); + if (t instanceof InterruptedException) { + LOG.info("Cleaner interrupted"); + } else { + LOG.error("Error cleaning files", e); + } } LOG.debug("Cleaner finished"); } @@ -892,4 +979,29 @@ public TimelinePutResponse put(TimelineEntities data) throws IOException { public void put(TimelineDomain domain) throws IOException { summaryStore.put(domain); } + + /** + * This is a special remote iterator whose {@link #hasNext()} method + * returns false if {@link #stopExecutors} is true. + * + * This provides an implicit shutdown of all iterative file list and scan + * operations without needing to implement it in the while loops themselves. + */ + private class StoppableRemoteIterator implements RemoteIterator { + private final RemoteIterator remote; + + public StoppableRemoteIterator(RemoteIterator remote) { + this.remote = remote; + } + + @Override + public boolean hasNext() throws IOException { + return !stopExecutors.get() && remote.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + return remote.next(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java index 4caed8dffd..bc8017564f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; @@ -103,7 +104,8 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath, LOG.debug("Parsing for log dir {} on attempt {}", appDirPath, attemptDirName); Path logPath = getPath(appDirPath); - if (fs.exists(logPath)) { + FileStatus status = fs.getFileStatus(logPath); + if (status != null) { long startTime = Time.monotonicNow(); try { LOG.debug("Parsing {} at offset {}", logPath, offset); @@ -112,8 +114,11 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath, LOG.info("Parsed {} entities from {} in {} msec", count, logPath, Time.monotonicNow() - startTime); } catch (RuntimeException e) { - if (e.getCause() instanceof JsonParseException) { - // If AppLogs cannot parse this log, it may be corrupted + // If AppLogs cannot parse this log, it may be corrupted or just empty + if (e.getCause() instanceof JsonParseException && + (status.getLen() > 0 || offset > 0)) { + // log on parse problems if the file as been read in the past or + // is visibly non-empty LOG.info("Log {} appears to be corrupted. Skip. ", logPath); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java index e43b705b03..3e5bc06afa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java @@ -116,14 +116,14 @@ public void setup() throws Exception { EntityGroupPlugInForTest.class.getName()); } store.init(config); - store.start(); store.setFs(fs); + store.start(); } @After public void tearDown() throws Exception { - fs.delete(TEST_APP_DIR_PATH, true); store.stop(); + fs.delete(TEST_APP_DIR_PATH, true); } @AfterClass @@ -222,7 +222,7 @@ public void testCleanLogs() throws Exception { fs.mkdirs(dirPathEmpty); // Should retain all logs after this run - EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000); + store.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000); assertTrue(fs.exists(irrelevantDirPath)); assertTrue(fs.exists(irrelevantFilePath)); assertTrue(fs.exists(filePath)); @@ -239,7 +239,7 @@ public void testCleanLogs() throws Exception { // Touch the third application by creating a new dir fs.mkdirs(new Path(dirPathHold, "holdByMe")); - EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000); + store.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000); // Verification after the second cleaner call assertTrue(fs.exists(irrelevantDirPath));