YARN-4696. Improving EntityGroupFSTimelineStore on exception handling, test setup, and concurrency. (Steve Loughran via gtcarrera9)

This commit is contained in:
Li Lu 2016-03-10 10:51:55 -08:00
parent 318c9b68b0
commit d49cfb3504
10 changed files with 279 additions and 110 deletions

View File

@ -1747,6 +1747,12 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long
TIMELINE_SERVICE_CLIENT_INTERNAL_TIMERS_TTL_SECS_DEFAULT = 7 * 60;
// This is temporary solution. The configuration will be deleted once we have
// the FileSystem API to check whether append operation is supported or not.
public static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
= TIMELINE_SERVICE_PREFIX
+ "entity-file.fs-support-append";
// mark app-history related configs @Private as application history is going
// to be integrated into the timeline service
@Private

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client.api;
import java.io.Flushable;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -41,7 +42,8 @@
*/
@Public
@Evolving
public abstract class TimelineClient extends AbstractService {
public abstract class TimelineClient extends AbstractService implements
Flushable {
/**
* Create a timeline client. The current UGI when the user initialize the

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.Flushable;
import java.io.IOException;
import java.net.URI;
@ -78,12 +79,6 @@ public class FileSystemTimelineWriter extends TimelineWriter{
private static final Log LOG = LogFactory
.getLog(FileSystemTimelineWriter.class);
// This is temporary solution. The configuration will be deleted once we have
// the FileSystem API to check whether append operation is supported or not.
private static final String TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
= YarnConfiguration.TIMELINE_SERVICE_PREFIX
+ "entity-file.fs-support-append";
// App log directory must be readable by group so server can access logs
// and writable by group so it can be deleted by server
private static final short APP_LOG_DIR_PERMISSIONS = 0770;
@ -122,20 +117,10 @@ public FileSystemTimelineWriter(Configuration conf,
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
fs = FileSystem.newInstance(activePath.toUri(), fsConf);
String scheme = activePath.toUri().getScheme();
if (scheme == null) {
scheme = FileSystem.getDefaultUri(fsConf).getScheme();
}
if (scheme != null) {
String disableCacheName = String.format("fs.%s.impl.disable.cache",
scheme);
fsConf.setBoolean(disableCacheName, true);
}
fs = activePath.getFileSystem(fsConf);
if (!fs.exists(activePath)) {
throw new IOException(activePath + " does not exist");
throw new FileNotFoundException(activePath + " does not exist");
}
summaryEntityTypes = new HashSet<String>(
@ -168,7 +153,8 @@ public FileSystemTimelineWriter(Configuration conf,
timerTaskTTL);
this.isAppendSupported =
conf.getBoolean(TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
conf.getBoolean(
YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, true);
objMapper = createObjectMapper();
@ -181,7 +167,7 @@ public FileSystemTimelineWriter(Configuration conf,
+ "=" + cleanIntervalSecs + ", " +
YarnConfiguration.TIMELINE_SERVICE_CLIENT_FD_RETAIN_SECS
+ "=" + ttl + ", " +
TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND
+ "=" + isAppendSupported + ", " +
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR
+ "=" + activePath);
@ -195,6 +181,11 @@ public FileSystemTimelineWriter(Configuration conf,
}
}
@Override
public String toString() {
return "FileSystemTimelineWriter writing to " + activePath;
}
@Override
public TimelinePutResponse putEntities(
ApplicationAttemptId appAttemptId, TimelineEntityGroupId groupId,
@ -263,9 +254,20 @@ public void putDomain(ApplicationAttemptId appAttemptId,
}
@Override
public void close() throws Exception {
if (this.logFDsCache != null) {
this.logFDsCache.close();
public synchronized void close() throws Exception {
if (logFDsCache != null) {
LOG.debug("Closing cache");
logFDsCache.flush();
logFDsCache.close();
logFDsCache = null;
}
}
@Override
public void flush() throws IOException {
if (logFDsCache != null) {
LOG.debug("Flushing cache");
logFDsCache.flush();
}
}
@ -333,6 +335,9 @@ public void writeEntities(List<TimelineEntity> entities)
if (writerClosed()) {
prepareForWrite();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Writing entity list of size " + entities.size());
}
for (TimelineEntity entity : entities) {
getObjectMapper().writeValue(getJsonGenerator(), entity);
}

View File

@ -325,6 +325,13 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
@Override
public void flush() throws IOException {
if (timelineWriter != null) {
timelineWriter.flush();
}
}
@Override
public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException {
@ -432,6 +439,12 @@ public Void run() throws Exception {
operateDelegationToken(cancelDTAction);
}
@Override
public String toString() {
return super.toString() + " with timeline server " + resURI
+ " and writer " + timelineWriter;
}
private Object operateDelegationToken(
final PrivilegedExceptionAction<?> action)
throws IOException, YarnException {

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.yarn.client.api.impl;
import java.io.Flushable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
@ -48,7 +50,7 @@
*/
@Private
@Unstable
public abstract class TimelineWriter {
public abstract class TimelineWriter implements Flushable {
private static final Log LOG = LogFactory
.getLog(TimelineWriter.class);
@ -68,6 +70,16 @@ public void close() throws Exception {
// DO NOTHING
}
@Override
public void flush() throws IOException {
// DO NOTHING
}
@Override
public String toString() {
return "Timeline writer posting to " + resURI;
}
public TimelinePutResponse putEntities(
TimelineEntity... entities) throws IOException, YarnException {
TimelineEntities entitiesContainer = new TimelineEntities();
@ -104,19 +116,27 @@ public ClientResponse run() throws Exception {
}
});
} catch (UndeclaredThrowableException e) {
throw new IOException(e.getCause());
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
} else {
throw new IOException(cause);
}
} catch (InterruptedException ie) {
throw new IOException(ie);
throw (IOException)new InterruptedIOException().initCause(ie);
}
if (resp == null ||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
String msg =
"Failed to get the response from the timeline server.";
LOG.error(msg);
if (LOG.isDebugEnabled() && resp != null) {
String output = resp.getEntity(String.class);
LOG.debug("HTTP error code: " + resp.getStatus()
+ " Server response : \n" + output);
if (resp != null) {
msg += " HTTP error code: " + resp.getStatus();
if (LOG.isDebugEnabled()) {
String output = resp.getEntity(String.class);
LOG.debug("HTTP error code: " + resp.getStatus()
+ " Server response : \n" + output);
}
}
throw new YarnException(msg);
}
@ -128,10 +148,16 @@ public ClientResponse run() throws Exception {
public ClientResponse doPostingObject(Object object, String path) {
WebResource webResource = client.resource(resURI);
if (path == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("POST to " + resURI);
}
return webResource.accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, object);
} else if (path.equals("domain")) {
if (LOG.isDebugEnabled()) {
LOG.debug("PUT to " + resURI +"/" + path);
}
return webResource.path(path).accept(MediaType.APPLICATION_JSON)
.type(MediaType.APPLICATION_JSON)
.put(ClientResponse.class, object);

View File

@ -129,9 +129,9 @@ public TimelineEntities getEntities(
getUser(req));
} catch (NumberFormatException e) {
throw new BadRequestException(
"windowStart, windowEnd or limit is not a numeric value.");
"windowStart, windowEnd, fromTs or limit is not a numeric value: " + e);
} catch (IllegalArgumentException e) {
throw new BadRequestException("requested invalid field.");
throw new BadRequestException("requested invalid field: " + e);
} catch (Exception e) {
LOG.error("Error getting entities", e);
throw new WebApplicationException(e,
@ -160,8 +160,7 @@ public TimelineEntity getEntity(
parseFieldsStr(fields, ","),
getUser(req));
} catch (IllegalArgumentException e) {
throw new BadRequestException(
"requested invalid field.");
throw new BadRequestException(e);
} catch (Exception e) {
LOG.error("Error getting entity", e);
throw new WebApplicationException(e,
@ -201,8 +200,9 @@ public TimelineEvents getEvents(
parseLongStr(limit),
getUser(req));
} catch (NumberFormatException e) {
throw new BadRequestException(
"windowStart, windowEnd or limit is not a numeric value.");
throw (BadRequestException)new BadRequestException(
"windowStart, windowEnd or limit is not a numeric value.")
.initCause(e);
} catch (Exception e) {
LOG.error("Error getting entity timelines", e);
throw new WebApplicationException(e,

View File

@ -107,30 +107,30 @@ public synchronized TimelineStore refreshCache(TimelineEntityGroupId groupId,
store.init(config);
store.start();
}
TimelineDataManager tdm = new TimelineDataManager(store,
aclManager);
tdm.init(config);
tdm.start();
List<LogInfo> removeList = new ArrayList<LogInfo>();
for (LogInfo log : appLogs.getDetailLogs()) {
LOG.debug("Try refresh logs for {}", log.getFilename());
// Only refresh the log that matches the cache id
if (log.matchesGroupId(groupId)) {
Path appDirPath = appLogs.getAppDirPath();
if (fs.exists(log.getPath(appDirPath))) {
LOG.debug("Refresh logs for cache id {}", groupId);
log.parseForStore(tdm, appDirPath, appLogs.isDone(), jsonFactory,
objMapper, fs);
} else {
// The log may have been removed, remove the log
removeList.add(log);
LOG.info("File {} no longer exists, remove it from log list",
log.getPath(appDirPath));
List<LogInfo> removeList = new ArrayList<>();
try(TimelineDataManager tdm =
new TimelineDataManager(store, aclManager)) {
tdm.init(config);
tdm.start();
for (LogInfo log : appLogs.getDetailLogs()) {
LOG.debug("Try refresh logs for {}", log.getFilename());
// Only refresh the log that matches the cache id
if (log.matchesGroupId(groupId)) {
Path appDirPath = appLogs.getAppDirPath();
if (fs.exists(log.getPath(appDirPath))) {
LOG.debug("Refresh logs for cache id {}", groupId);
log.parseForStore(tdm, appDirPath, appLogs.isDone(),
jsonFactory, objMapper, fs);
} else {
// The log may have been removed, remove the log
removeList.add(log);
LOG.info("File {} no longer exists, removing it from log list",
log.getPath(appDirPath));
}
}
}
}
appLogs.getDetailLogs().removeAll(removeList);
tdm.close();
}
updateRefreshTimeToNow();
} else {

View File

@ -26,7 +26,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -55,6 +56,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -71,12 +73,13 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Plugin timeline storage to support timeline server v1.5 API. This storage
* uses a file system to store timeline entities in their groups.
*/
public class EntityGroupFSTimelineStore extends AbstractService
public class EntityGroupFSTimelineStore extends CompositeService
implements TimelineStore {
static final String DOMAIN_LOG_PREFIX = "domainlog-";
@ -110,6 +113,7 @@ public class EntityGroupFSTimelineStore extends AbstractService
private ConcurrentMap<ApplicationId, AppLogs> appIdLogMap =
new ConcurrentHashMap<ApplicationId, AppLogs>();
private ScheduledThreadPoolExecutor executor;
private AtomicBoolean stopExecutors = new AtomicBoolean(false);
private FileSystem fs;
private ObjectMapper objMapper;
private JsonFactory jsonFactory;
@ -128,7 +132,8 @@ public EntityGroupFSTimelineStore() {
@Override
protected void serviceInit(Configuration conf) throws Exception {
summaryStore = createSummaryStore();
summaryStore.init(conf);
addService(summaryStore);
long logRetainSecs = conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS,
YarnConfiguration
@ -170,17 +175,28 @@ protected boolean removeEldestEntry(
});
cacheIdPlugins = loadPlugIns(conf);
// Initialize yarn client for application status
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient = createAndInitYarnClient(conf);
// if non-null, hook its lifecycle up
addIfService(yarnClient);
activeRootPath = new Path(conf.get(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
doneRootPath = new Path(conf.get(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
fs = activeRootPath.getFileSystem(conf);
super.serviceInit(conf);
}
private List<TimelineEntityGroupPlugin> loadPlugIns(Configuration conf)
throws RuntimeException {
Collection<String> pluginNames = conf.getStringCollection(
Collection<String> pluginNames = conf.getTrimmedStringCollection(
YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES);
List<TimelineEntityGroupPlugin> pluginList
= new LinkedList<TimelineEntityGroupPlugin>();
Exception caught = null;
for (final String name : pluginNames) {
LOG.debug("Trying to load plugin class {}", name);
TimelineEntityGroupPlugin cacheIdPlugin = null;
@ -191,10 +207,11 @@ private List<TimelineEntityGroupPlugin> loadPlugIns(Configuration conf)
clazz, conf);
} catch (Exception e) {
LOG.warn("Error loading plugin " + name, e);
caught = e;
}
if (cacheIdPlugin == null) {
throw new RuntimeException("No class defined for " + name);
throw new RuntimeException("No class defined for " + name, caught);
}
LOG.info("Load plugin class {}", cacheIdPlugin.getClass().getName());
pluginList.add(cacheIdPlugin);
@ -210,8 +227,9 @@ private TimelineStore createSummaryStore() {
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
LOG.info("Starting {}", getName());
yarnClient.start();
summaryStore.start();
Configuration conf = getConfig();
@ -219,16 +237,10 @@ protected void serviceStart() throws Exception {
aclManager.setTimelineStore(summaryStore);
summaryTdm = new TimelineDataManager(summaryStore, aclManager);
summaryTdm.init(conf);
summaryTdm.start();
activeRootPath = new Path(conf.get(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR_DEFAULT));
doneRootPath = new Path(conf.get(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
fs = activeRootPath.getFileSystem(conf);
addService(summaryTdm);
// start child services that aren't already started
super.serviceStart();
if (!fs.exists(activeRootPath)) {
fs.mkdirs(activeRootPath);
fs.setPermission(activeRootPath, ACTIVE_DIR_PERMISSION);
@ -257,7 +269,8 @@ protected void serviceStart() throws Exception {
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS,
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_THREADS_DEFAULT);
LOG.info("Scanning active directory every {} seconds", scanIntervalSecs);
LOG.info("Scanning active directory {} every {} seconds", activeRootPath,
scanIntervalSecs);
LOG.info("Cleaning logs every {} seconds", cleanerIntervalSecs);
executor = new ScheduledThreadPoolExecutor(numThreads,
@ -267,12 +280,12 @@ protected void serviceStart() throws Exception {
TimeUnit.SECONDS);
executor.scheduleAtFixedRate(new EntityLogCleaner(), cleanerIntervalSecs,
cleanerIntervalSecs, TimeUnit.SECONDS);
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping {}", getName());
stopExecutors.set(true);
if (executor != null) {
executor.shutdown();
if (executor.isTerminating()) {
@ -286,18 +299,9 @@ protected void serviceStop() throws Exception {
}
}
}
if (summaryTdm != null) {
summaryTdm.stop();
}
if (summaryStore != null) {
summaryStore.stop();
}
if (yarnClient != null) {
yarnClient.stop();
}
synchronized (cachedLogs) {
for (EntityCacheItem cacheItem : cachedLogs.values()) {
cacheItem.getStore().close();
ServiceOperations.stopQuietly(cacheItem.getStore());
}
}
super.serviceStop();
@ -305,17 +309,34 @@ protected void serviceStop() throws Exception {
@InterfaceAudience.Private
@VisibleForTesting
void scanActiveLogs() throws IOException {
RemoteIterator<FileStatus> iter = fs.listStatusIterator(activeRootPath);
int scanActiveLogs() throws IOException {
RemoteIterator<FileStatus> iter = list(activeRootPath);
int logsToScanCount = 0;
while (iter.hasNext()) {
FileStatus stat = iter.next();
ApplicationId appId = parseApplicationId(stat.getPath().getName());
String name = stat.getPath().getName();
ApplicationId appId = parseApplicationId(name);
if (appId != null) {
LOG.debug("scan logs for {} in {}", appId, stat.getPath());
logsToScanCount++;
AppLogs logs = getAndSetActiveLog(appId, stat.getPath());
executor.execute(new ActiveLogParser(logs));
} else {
LOG.debug("Unable to parse entry {}", name);
}
}
return logsToScanCount;
}
/**
* List a directory, returning an iterator which will fail fast if this
* service has been stopped
* @param path path to list
* @return an iterator over the contents of the directory
* @throws IOException
*/
private RemoteIterator<FileStatus> list(Path path) throws IOException {
return new StoppableRemoteIterator(fs.listStatusIterator(path));
}
private AppLogs createAndPutAppLogsIfAbsent(ApplicationId appId,
@ -377,11 +398,11 @@ private AppLogs getAndSetAppLogs(ApplicationId applicationId)
*/
@InterfaceAudience.Private
@VisibleForTesting
static void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
void cleanLogs(Path dirpath, FileSystem fs, long retainMillis)
throws IOException {
long now = Time.now();
// Depth first search from root directory for all application log dirs
RemoteIterator<FileStatus> iter = fs.listStatusIterator(dirpath);
RemoteIterator<FileStatus> iter = list(dirpath);
while (iter.hasNext()) {
FileStatus stat = iter.next();
if (stat.isDirectory()) {
@ -456,7 +477,42 @@ private Path getDoneAppPath(ApplicationId appId) {
bucket1, bucket2, appId.toString()));
}
// This method has to be synchronized to control traffic to RM
/**
* Create and initialize the YARN Client. Tests may override/mock this.
* If they return null, then {@link #getAppState(ApplicationId)} MUST
* also be overridden
* @param conf configuration
* @return the yarn client, or null.
*
*/
@VisibleForTesting
protected YarnClient createAndInitYarnClient(Configuration conf) {
YarnClient client = YarnClient.createYarnClient();
client.init(conf);
return client;
}
/**
* Get the application state.
* @param appId application ID
* @return the state or {@link AppState#UNKNOWN} if it could not
* be determined
* @throws IOException on IO problems
*/
@VisibleForTesting
protected AppState getAppState(ApplicationId appId) throws IOException {
return getAppState(appId, yarnClient);
}
/**
* Ask the RM for the state of the application.
* This method has to be synchronized to control traffic to RM
* @param appId application ID
* @param yarnClient
* @return the state or {@link AppState#UNKNOWN} if it could not
* be determined
* @throws IOException
*/
private static synchronized AppState getAppState(ApplicationId appId,
YarnClient yarnClient) throws IOException {
AppState appState = AppState.ACTIVE;
@ -474,9 +530,12 @@ private static synchronized AppState getAppState(ApplicationId appId,
return appState;
}
/**
* Application states,
*/
@InterfaceAudience.Private
@VisibleForTesting
enum AppState {
public enum AppState {
ACTIVE,
UNKNOWN,
COMPLETED
@ -526,7 +585,7 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm)
if (!isDone()) {
LOG.debug("Try to parse summary log for log {} in {}",
appId, appDirPath);
appState = EntityGroupFSTimelineStore.getAppState(appId, yarnClient);
appState = getAppState(appId);
long recentLogModTime = scanForLogs();
if (appState == AppState.UNKNOWN) {
if (Time.now() - recentLogModTime > unknownActiveMillis) {
@ -559,8 +618,7 @@ synchronized void parseSummaryLogs(TimelineDataManager tdm)
long scanForLogs() throws IOException {
LOG.debug("scanForLogs on {}", appDirPath);
long newestModTime = 0;
RemoteIterator<FileStatus> iterAttempt =
fs.listStatusIterator(appDirPath);
RemoteIterator<FileStatus> iterAttempt = list(appDirPath);
while (iterAttempt.hasNext()) {
FileStatus statAttempt = iterAttempt.next();
LOG.debug("scanForLogs on {}", statAttempt.getPath().getName());
@ -572,8 +630,7 @@ long scanForLogs() throws IOException {
continue;
}
String attemptDirName = statAttempt.getPath().getName();
RemoteIterator<FileStatus> iterCache
= fs.listStatusIterator(statAttempt.getPath());
RemoteIterator<FileStatus> iterCache = list(statAttempt.getPath());
while (iterCache.hasNext()) {
FileStatus statCache = iterCache.next();
if (!statCache.isFile()) {
@ -659,14 +716,34 @@ public synchronized void moveToDone() throws IOException {
}
}
/**
* Extract any nested throwable forwarded from IPC operations.
* @param e exception
* @return either the exception passed an an argument, or any nested
* exception which was wrapped inside an {@link UndeclaredThrowableException}
*/
private Throwable extract(Exception e) {
Throwable t = e;
if (e instanceof UndeclaredThrowableException && e.getCause() != null) {
t = e.getCause();
}
return t;
}
private class EntityLogScanner implements Runnable {
@Override
public void run() {
LOG.debug("Active scan starting");
try {
scanActiveLogs();
int scanned = scanActiveLogs();
LOG.debug("Scanned {} active applications", scanned);
} catch (Exception e) {
LOG.error("Error scanning active files", e);
Throwable t = extract(e);
if (t instanceof InterruptedException) {
LOG.info("File scanner interrupted");
} else {
LOG.error("Error scanning active files", t);
}
}
LOG.debug("Active scan complete");
}
@ -690,7 +767,12 @@ public void run() {
}
LOG.debug("End parsing summary logs. ");
} catch (Exception e) {
LOG.error("Error processing logs for " + appLogs.getAppId(), e);
Throwable t = extract(e);
if (t instanceof InterruptedException) {
LOG.info("Log parser interrupted");
} else {
LOG.error("Error processing logs for " + appLogs.getAppId(), t);
}
}
}
}
@ -702,7 +784,12 @@ public void run() {
try {
cleanLogs(doneRootPath, fs, logRetainMillis);
} catch (Exception e) {
LOG.error("Error cleaning files", e);
Throwable t = extract(e);
if (t instanceof InterruptedException) {
LOG.info("Cleaner interrupted");
} else {
LOG.error("Error cleaning files", e);
}
}
LOG.debug("Cleaner finished");
}
@ -892,4 +979,29 @@ public TimelinePutResponse put(TimelineEntities data) throws IOException {
public void put(TimelineDomain domain) throws IOException {
summaryStore.put(domain);
}
/**
* This is a special remote iterator whose {@link #hasNext()} method
* returns false if {@link #stopExecutors} is true.
*
* This provides an implicit shutdown of all iterative file list and scan
* operations without needing to implement it in the while loops themselves.
*/
private class StoppableRemoteIterator implements RemoteIterator<FileStatus> {
private final RemoteIterator<FileStatus> remote;
public StoppableRemoteIterator(RemoteIterator<FileStatus> remote) {
this.remote = remote;
}
@Override
public boolean hasNext() throws IOException {
return !stopExecutors.get() && remote.hasNext();
}
@Override
public FileStatus next() throws IOException {
return remote.next();
}
}
}

View File

@ -19,6 +19,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
@ -103,7 +104,8 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath,
LOG.debug("Parsing for log dir {} on attempt {}", appDirPath,
attemptDirName);
Path logPath = getPath(appDirPath);
if (fs.exists(logPath)) {
FileStatus status = fs.getFileStatus(logPath);
if (status != null) {
long startTime = Time.monotonicNow();
try {
LOG.debug("Parsing {} at offset {}", logPath, offset);
@ -112,8 +114,11 @@ public void parseForStore(TimelineDataManager tdm, Path appDirPath,
LOG.info("Parsed {} entities from {} in {} msec",
count, logPath, Time.monotonicNow() - startTime);
} catch (RuntimeException e) {
if (e.getCause() instanceof JsonParseException) {
// If AppLogs cannot parse this log, it may be corrupted
// If AppLogs cannot parse this log, it may be corrupted or just empty
if (e.getCause() instanceof JsonParseException &&
(status.getLen() > 0 || offset > 0)) {
// log on parse problems if the file as been read in the past or
// is visibly non-empty
LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
}
}

View File

@ -116,14 +116,14 @@ public void setup() throws Exception {
EntityGroupPlugInForTest.class.getName());
}
store.init(config);
store.start();
store.setFs(fs);
store.start();
}
@After
public void tearDown() throws Exception {
fs.delete(TEST_APP_DIR_PATH, true);
store.stop();
fs.delete(TEST_APP_DIR_PATH, true);
}
@AfterClass
@ -222,7 +222,7 @@ public void testCleanLogs() throws Exception {
fs.mkdirs(dirPathEmpty);
// Should retain all logs after this run
EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000);
store.cleanLogs(TEST_DONE_DIR_PATH, fs, 10000);
assertTrue(fs.exists(irrelevantDirPath));
assertTrue(fs.exists(irrelevantFilePath));
assertTrue(fs.exists(filePath));
@ -239,7 +239,7 @@ public void testCleanLogs() throws Exception {
// Touch the third application by creating a new dir
fs.mkdirs(new Path(dirPathHold, "holdByMe"));
EntityGroupFSTimelineStore.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000);
store.cleanLogs(TEST_DONE_DIR_PATH, fs, 1000);
// Verification after the second cleaner call
assertTrue(fs.exists(irrelevantDirPath));