diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java index 61b92dddd4..646aa6dfdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java @@ -168,17 +168,31 @@ public static class LogValue { private final Set alreadyUploadedLogFiles; private Set allExistingFileMeta = new HashSet(); private final boolean appFinished; + + /** + * The retention context to determine if log files are older than + * the retention policy configured. + */ + private final LogRetentionContext logRetentionContext; + /** + * The set of log files that are older than retention policy that will + * not be uploaded but ready for deletion. + */ + private final Set obseleteRetentionLogFiles = new HashSet(); + // TODO Maybe add a version string here. Instead of changing the version of // the entire k-v format public LogValue(List rootLogDirs, ContainerId containerId, String user) { - this(rootLogDirs, containerId, user, null, new HashSet(), true); + this(rootLogDirs, containerId, user, null, new HashSet(), + null, true); } public LogValue(List rootLogDirs, ContainerId containerId, String user, LogAggregationContext logAggregationContext, - Set alreadyUploadedLogFiles, boolean appFinished) { + Set alreadyUploadedLogFiles, + LogRetentionContext retentionContext, boolean appFinished) { this.rootLogDirs = new ArrayList(rootLogDirs); this.containerId = containerId; this.user = user; @@ -188,9 +202,11 @@ public LogValue(List rootLogDirs, ContainerId containerId, this.logAggregationContext = logAggregationContext; this.alreadyUploadedLogFiles = alreadyUploadedLogFiles; this.appFinished = appFinished; + this.logRetentionContext = retentionContext; } - private Set getPendingLogFilesToUploadForThisContainer() { + @VisibleForTesting + public Set getPendingLogFilesToUploadForThisContainer() { Set pendingUploadFiles = new HashSet(); for (String rootLogDir : this.rootLogDirs) { File appLogDir = @@ -297,6 +313,14 @@ private Set getPendingLogFilesToUpload(File containerLogDir) { this.allExistingFileMeta.add(getLogFileMetaData(logFile)); } + // if log files are older than retention policy, do not upload them. + // but schedule them for deletion. + if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){ + obseleteRetentionLogFiles.addAll(candidates); + candidates.clear(); + return candidates; + } + if (this.logAggregationContext != null && candidates.size() > 0) { filterFiles( this.appFinished ? this.logAggregationContext.getIncludePattern() @@ -318,6 +342,7 @@ public boolean apply(File next) { }); candidates = Sets.newHashSet(mask); } + return candidates; } @@ -352,6 +377,14 @@ public Set getCurrentUpLoadedFileMeta() { return info; } + public Set getObseleteRetentionLogFiles() { + Set path = new HashSet(); + for(File file: this.obseleteRetentionLogFiles) { + path.add(new Path(file.getAbsolutePath())); + } + return path; + } + public Set getAllExistingFilesMeta() { return this.allExistingFileMeta; } @@ -362,6 +395,39 @@ private String getLogFileMetaData(File file) { } } + /** + * A context for log retention to determine if files are older than + * the retention policy configured in YarnConfiguration. + */ + public static class LogRetentionContext { + /** + * The time used with logRetentionMillis, to determine ages of + * log files and if files need to be uploaded. + */ + private final long logInitedTimeMillis; + /** + * The numbers of milli seconds since a log file is created to determine + * if we should upload it. -1 if disabled. + * see YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS for details. + */ + private final long logRetentionMillis; + + public LogRetentionContext(long logInitedTimeMillis, long + logRetentionMillis) { + this.logInitedTimeMillis = logInitedTimeMillis; + this.logRetentionMillis = logRetentionMillis; + } + + public boolean isDisabled() { + return logInitedTimeMillis < 0 || logRetentionMillis < 0; + } + + public boolean shouldRetainLog() { + return isDisabled() || + System.currentTimeMillis() - logInitedTimeMillis < logRetentionMillis; + } + } + /** * The writer that writes out the aggregated logs. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index e1c4131e58..d7800a8dd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -329,7 +329,7 @@ private void recoverApplication(ContainerManagerApplicationProto p) LOG.info("Recovering application " + appId); ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, - creds, context); + creds, context, p.getAppLogAggregationInitedTime()); context.getApplications().put(appId, app); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index efa258a467..c179dadbe7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -26,15 +26,22 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; @@ -47,13 +54,13 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; - import com.google.common.annotations.VisibleForTesting; /** @@ -79,18 +86,35 @@ public class ApplicationImpl implements Application { Map containers = new HashMap(); - public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId, - Credentials credentials, Context context) { + /** + * The timestamp when the log aggregation has started for this application. + * Used to determine the age of application log files during log aggregation. + * When logAggregationRentention policy is enabled, log files older than + * the retention policy will not be uploaded but scheduled for deletion. + */ + private long applicationLogInitedTimestamp = -1; + private final NMStateStoreService appStateStore; + + public ApplicationImpl(Dispatcher dispatcher, String user, + ApplicationId appId, Credentials credentials, + Context context, long recoveredLogInitedTime) { this.dispatcher = dispatcher; this.user = user; this.appId = appId; this.credentials = credentials; this.aclsManager = context.getApplicationACLsManager(); this.context = context; + this.appStateStore = context.getNMStateStore(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); stateMachine = stateMachineFactory.make(this); + setAppLogInitedTimestamp(recoveredLogInitedTime); + } + + public ApplicationImpl(Dispatcher dispatcher, String user, + ApplicationId appId, Credentials credentials, Context context) { + this(dispatcher, user, appId, credentials, context, -1); } @Override @@ -242,7 +266,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.dispatcher.getEventHandler().handle( new LogHandlerAppStartedEvent(app.appId, app.user, app.credentials, app.applicationACLs, - app.logAggregationContext)); + app.logAggregationContext, app.applicationLogInitedTimestamp)); } } @@ -262,9 +286,57 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.dispatcher.getEventHandler().handle( new ApplicationLocalizationEvent( LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + app.setAppLogInitedTimestamp(event.getTimestamp()); + try { + app.appStateStore.storeApplication(app.appId, buildAppProto(app)); + } catch (Exception ex) { + LOG.warn("failed to update application state in state store", ex); + } } } + @VisibleForTesting + void setAppLogInitedTimestamp(long appLogInitedTimestamp) { + this.applicationLogInitedTimestamp = appLogInitedTimestamp; + } + + static ContainerManagerApplicationProto buildAppProto(ApplicationImpl app) + throws IOException { + ContainerManagerApplicationProto.Builder builder = + ContainerManagerApplicationProto.newBuilder(); + builder.setId(((ApplicationIdPBImpl) app.appId).getProto()); + builder.setUser(app.getUser()); + + if (app.logAggregationContext != null) { + builder.setLogAggregationContext(( + (LogAggregationContextPBImpl)app.logAggregationContext).getProto()); + } + + builder.clearCredentials(); + if (app.credentials != null) { + DataOutputBuffer dob = new DataOutputBuffer(); + app.credentials.writeTokenStorageToStream(dob); + builder.setCredentials(ByteString.copyFrom(dob.getData())); + } + + builder.clearAcls(); + if (app.applicationACLs != null) { + for (Map.Entry acl : app + .applicationACLs.entrySet()) { + YarnProtos.ApplicationACLMapProto p = YarnProtos + .ApplicationACLMapProto.newBuilder() + .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey())) + .setAcl(acl.getValue()) + .build(); + builder.addAcls(p); + } + } + + builder.setAppLogAggregationInitedTime(app.applicationLogInitedTimestamp); + + return builder.build(); + } + /** * Handles the APPLICATION_LOG_HANDLING_FAILED event that occurs after * {@link LogAggregationService} has failed to initialize the log diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index ac43ecf5a6..ba7836a040 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; @@ -131,6 +132,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator { new HashMap(); private final ContainerLogAggregationPolicy logAggPolicy; + + /** + * The value recovered from state store to determine the age of application + * log files if log retention is enabled. Files older than retention policy + * will not be uploaded but scheduled for cleaning up. -1 if not recovered. + */ + private final long recoveredLogInitedTime; + public AppLogAggregatorImpl(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, @@ -138,6 +147,19 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, Map appAcls, LogAggregationContext logAggregationContext, Context context, FileContext lfs, long rollingMonitorInterval) { + this(dispatcher, deletionService, conf, appId, userUgi, nodeId, + dirsHandler, remoteNodeLogFileForApp, appAcls, + logAggregationContext, context, lfs, rollingMonitorInterval, -1); + } + + public AppLogAggregatorImpl(Dispatcher dispatcher, + DeletionService deletionService, Configuration conf, + ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, + LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, + Map appAcls, + LogAggregationContext logAggregationContext, Context context, + FileContext lfs, long rollingMonitorInterval, + long recoveredLogInitedTime) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; @@ -169,6 +191,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, || this.logAggregationContext.getRolledLogsIncludePattern() .isEmpty() ? false : true; this.logAggPolicy = getLogAggPolicy(conf); + this.recoveredLogInitedTime = recoveredLogInitedTime; } private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { @@ -283,9 +306,7 @@ private void uploadLogsForContainers(boolean appFinished) { logAggregationTimes++; try { - writer = - new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, - this.userUgi); + writer = createLogWriter(); // Write ACLs once when the writer is created. writer.writeApplicationACLs(appAcls); writer.writeApplicationOwner(this.userUgi.getShortUserName()); @@ -396,6 +417,11 @@ public Object run() throws Exception { } } + protected LogWriter createLogWriter() throws IOException { + return new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, + this.userUgi); + } + private void sendLogAggregationReport( LogAggregationStatus logAggregationStatus, String diagnosticMessage) { LogAggregationReport report = @@ -599,13 +625,21 @@ public synchronized void doLogAggregationOutOfBand() { this.notifyAll(); } - private class ContainerLogAggregator { + class ContainerLogAggregator { + private final AggregatedLogFormat.LogRetentionContext retentionContext; private final ContainerId containerId; - private Set uploadedFileMeta = - new HashSet(); - + private Set uploadedFileMeta = new HashSet(); public ContainerLogAggregator(ContainerId containerId) { this.containerId = containerId; + this.retentionContext = getRetentionContext(); + } + + private AggregatedLogFormat.LogRetentionContext getRetentionContext() { + final long logRetentionSecs = + conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS); + return new AggregatedLogFormat.LogRetentionContext( + recoveredLogInitedTime, logRetentionSecs * 1000); } public Set doContainerLogAggregation(LogWriter writer, @@ -617,7 +651,7 @@ public Set doContainerLogAggregation(LogWriter writer, final LogValue logValue = new LogValue(dirsHandler.getLogDirsForRead(), containerId, userUgi.getShortUserName(), logAggregationContext, - this.uploadedFileMeta, appFinished); + this.uploadedFileMeta, retentionContext, appFinished); try { writer.append(logKey, logValue); } catch (Exception e) { @@ -638,7 +672,11 @@ public boolean apply(String next) { }); this.uploadedFileMeta = Sets.newHashSet(mask); - return logValue.getCurrentUpLoadedFilesPath(); + + // need to return files uploaded or older-than-retention clean up. + return Sets.union(logValue.getCurrentUpLoadedFilesPath(), + logValue.getObseleteRetentionLogFiles()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index afc75c1af7..a4ae643a13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -357,12 +357,13 @@ public Object run() throws Exception { @SuppressWarnings("unchecked") private void initApp(final ApplicationId appId, String user, Credentials credentials, Map appAcls, - LogAggregationContext logAggregationContext) { + LogAggregationContext logAggregationContext, + long recoveredLogInitedTime) { ApplicationEvent eventResponse; try { verifyAndCreateRemoteLogDir(getConfig()); initAppAggregator(appId, user, credentials, appAcls, - logAggregationContext); + logAggregationContext, recoveredLogInitedTime); eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); } catch (YarnRuntimeException e) { @@ -381,10 +382,10 @@ FileContext getLocalFileContext(Configuration conf) { } } - protected void initAppAggregator(final ApplicationId appId, String user, Credentials credentials, Map appAcls, - LogAggregationContext logAggregationContext) { + LogAggregationContext logAggregationContext, + long recoveredLogInitedTime) { // Get user's FileSystem credentials final UserGroupInformation userUgi = @@ -399,7 +400,8 @@ protected void initAppAggregator(final ApplicationId appId, String user, getConfig(), appId, userUgi, this.nodeId, dirsHandler, getRemoteNodeLogFileForApp(appId, user), appAcls, logAggregationContext, this.context, - getLocalFileContext(getConfig()), this.rollingMonitorInterval); + getLocalFileContext(getConfig()), this.rollingMonitorInterval, + recoveredLogInitedTime); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } @@ -501,7 +503,8 @@ public void handle(LogHandlerEvent event) { initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(), appStartEvent.getCredentials(), appStartEvent.getApplicationAcls(), - appStartEvent.getLogAggregationContext()); + appStartEvent.getLogAggregationContext(), + appStartEvent.getRecoveredAppLogInitedTime()); break; case CONTAINER_FINISHED: LogHandlerContainerFinishedEvent containerFinishEvent = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java index d3ff771c15..bf6af1708f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java @@ -32,21 +32,36 @@ public class LogHandlerAppStartedEvent extends LogHandlerEvent { private final Credentials credentials; private final Map appAcls; private final LogAggregationContext logAggregationContext; + /** + * The value will be set when the application is recovered from state store. + * We use this value in AppLogAggregatorImpl to determine, if log retention + * policy is enabled, if we need to upload old application log files. Files + * older than retention policy will not be uploaded but scheduled for + * deletion. + */ + private final long recoveredAppLogInitedTime; public LogHandlerAppStartedEvent(ApplicationId appId, String user, Credentials credentials, Map appAcls) { - this(appId, user, credentials, appAcls, null); + this(appId, user, credentials, appAcls, null, -1); } public LogHandlerAppStartedEvent(ApplicationId appId, String user, Credentials credentials, Map appAcls, LogAggregationContext logAggregationContext) { + this(appId, user, credentials, appAcls, logAggregationContext, -1); + } + + public LogHandlerAppStartedEvent(ApplicationId appId, String user, + Credentials credentials, Map appAcls, + LogAggregationContext logAggregationContext, long appLogInitedTime) { super(LogHandlerEventType.APPLICATION_STARTED); this.applicationId = appId; this.user = user; this.credentials = credentials; this.appAcls = appAcls; this.logAggregationContext = logAggregationContext; + this.recoveredAppLogInitedTime = appLogInitedTime; } public ApplicationId getApplicationId() { @@ -68,4 +83,8 @@ public Map getApplicationAcls() { public LogAggregationContext getLogAggregationContext() { return this.logAggregationContext; } + + public long getRecoveredAppLogInitedTime() { + return this.recoveredAppLogInitedTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index ade8c1ac86..0dfa20e465 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -30,6 +30,7 @@ message ContainerManagerApplicationProto { optional bytes credentials = 3; repeated ApplicationACLMapProto acls = 4; optional LogAggregationContextProto log_aggregation_context = 5; + optional int64 appLogAggregationInitedTime = 6 [ default = -1 ]; } message DeletionServiceDeleteTaskProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 370a20734f..157ba97cfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -18,14 +18,10 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.refEq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -41,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -61,12 +58,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; @@ -295,6 +294,26 @@ protected ContainerTokenIdentifier waitForContainerTokenToExpire( return identifier; } + @Test + public void testApplicationOnAppLogHandlingInitedEvtShouldStoreLogInitedTime() + throws IOException { + WrappedApplication wa = new WrappedApplication(5, 314159265358979L, + "yak", 0); + wa.initApplication(); + + ArgumentCaptor applicationProto = + ArgumentCaptor.forClass(ContainerManagerApplicationProto.class); + + final long timestamp = wa.applicationLogInited(); + + verify(wa.stateStoreService).storeApplication(any(ApplicationId.class), + applicationProto.capture()); + + assertEquals(applicationProto.getValue().getAppLogAggregationInitedTime() + , timestamp); + } + + @Test @SuppressWarnings("unchecked") public void testAppFinishedOnCompletedContainers() { @@ -484,7 +503,7 @@ private class WrappedApplication { final Context context; final Map containerTokenIdentifierMap; final NMTokenSecretManagerInNM nmTokenSecretMgr; - + final NMStateStoreService stateStoreService; final ApplicationId appId; final Application app; @@ -511,7 +530,7 @@ private class WrappedApplication { dispatcher.register(LogHandlerEventType.class, logAggregationBus); nmTokenSecretMgr = mock(NMTokenSecretManagerInNM.class); - + stateStoreService = mock(NMStateStoreService.class); context = mock(Context.class); when(context.getContainerTokenSecretManager()).thenReturn( @@ -519,6 +538,7 @@ private class WrappedApplication { when(context.getApplicationACLsManager()).thenReturn( new ApplicationACLsManager(conf)); when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr); + when(context.getNMStateStore()).thenReturn(stateStoreService); // Setting master key MasterKey masterKey = new MasterKeyPBImpl(); @@ -586,6 +606,13 @@ public void applicationInited() { drainDispatcherEvents(); } + public long applicationLogInited() { + ApplicationEvent appEvt = new ApplicationEvent(app.getAppId(), + ApplicationEventType.APPLICATION_LOG_HANDLING_INITED); + app.handle(appEvt); + return appEvt.getTimestamp(); + } + public void appFinished() { app.handle(new ApplicationFinishEvent(appId, "Finish Application")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java new file mode 100644 index 0000000000..0127923b0d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java @@ -0,0 +1,436 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; +import org.apache.hadoop.yarn.server.api.ContainerLogContext; +import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +/** + * Unit tests of AppLogAggregatorImpl class. + */ +public class TestAppLogAggregatorImpl { + + private static final File LOCAL_LOG_DIR = new File("target", + TestAppLogAggregatorImpl.class.getName() + "-localLogDir"); + private static final File REMOTE_LOG_FILE = new File("target", + TestAppLogAggregatorImpl.class.getName() + "-remoteLogFile"); + + @Before + public void setUp() throws IOException { + if(LOCAL_LOG_DIR.exists()) { + FileUtils.cleanDirectory(LOCAL_LOG_DIR); + } + if(REMOTE_LOG_FILE.exists()) { + FileUtils.cleanDirectory(REMOTE_LOG_FILE); + } + } + + @After + public void cleanUp() throws IOException { + FileUtils.deleteDirectory(LOCAL_LOG_DIR); + FileUtils.deleteQuietly(REMOTE_LOG_FILE); + } + + @Test + public void testAggregatorWithRetentionPolicyDisabledShouldUploadAllFiles() + throws Exception { + final ApplicationId applicationId = + ApplicationId.newInstance(System.currentTimeMillis(), 0); + final ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(applicationId, 0); + final ContainerId containerId = ContainerId.newContainerId(attemptId, 0); + + // create artificial log files + final File appLogDir = new File(LOCAL_LOG_DIR, + ConverterUtils.toString(applicationId)); + final File containerLogDir = new File(appLogDir, + ConverterUtils.toString(containerId)); + containerLogDir.mkdirs(); + final Set logFiles = createContainerLogFiles(containerLogDir, 3); + + final long logRetentionSecs = 10000; + final long recoveredLogInitedTime = -1; + + verifyLogAggregationWithExpectedFiles2DeleteAndUpload( + applicationId, containerId, logRetentionSecs, + recoveredLogInitedTime, logFiles, logFiles); + } + + @Test + public void testAggregatorWhenNoFileOlderThanRetentionPolicyShouldUploadAll() + throws IOException { + + final ApplicationId applicationId = + ApplicationId.newInstance(System.currentTimeMillis(), 0); + final ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(applicationId, 0); + final ContainerId containerId = ContainerId.newContainerId(attemptId, 0); + + // create artificial log files + final File appLogDir = new File(LOCAL_LOG_DIR, + ConverterUtils.toString(applicationId)); + final File containerLogDir = new File(appLogDir, + ConverterUtils.toString(containerId)); + containerLogDir.mkdirs(); + final Set logFiles = createContainerLogFiles(containerLogDir, 3); + + // set log retention period to 1 week. + final long logRententionSec = 7 * 24 * 60 * 60; + final long recoveredLogInitedTimeMillis = + System.currentTimeMillis() - 60*60; + + verifyLogAggregationWithExpectedFiles2DeleteAndUpload(applicationId, + containerId, logRententionSec, recoveredLogInitedTimeMillis, + logFiles, new HashSet()); + } + + @Test + public void testAggregatorWhenAllFilesOlderThanRetentionShouldUploadNone() + throws IOException { + + final ApplicationId applicationId = + ApplicationId.newInstance(System.currentTimeMillis(), 0); + final ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(applicationId, 0); + final ContainerId containerId = ContainerId.newContainerId(attemptId, 0); + + // create artificial log files + final File appLogDir = new File(LOCAL_LOG_DIR, + ConverterUtils.toString(applicationId)); + final File containerLogDir = new File(appLogDir, + ConverterUtils.toString(containerId)); + containerLogDir.mkdirs(); + final Set logFiles = createContainerLogFiles(containerLogDir, 3); + + + final long week = 7 * 24 * 60 * 60; + final long recoveredLogInitedTimeMillis = System.currentTimeMillis() - + 2*week; + verifyLogAggregationWithExpectedFiles2DeleteAndUpload( + applicationId, containerId, week, recoveredLogInitedTimeMillis, + logFiles, new HashSet()); + } + + /** + * Create the given number of log files under the container log directory. + * @param containerLogDir the directory to create container log files + * @param numOfFiles the number of log files to create + * @return the set of log files created + */ + private static Set createContainerLogFiles(File containerLogDir, + int numOfFiles) throws IOException { + assert(numOfFiles >= 0); + assert(containerLogDir.exists()); + + Set containerLogFiles = new HashSet<>(); + for(int i = 0; i < numOfFiles; i++) { + final File logFile = new File(containerLogDir, "logfile" + i); + logFile.createNewFile(); + containerLogFiles.add(logFile); + } + return containerLogFiles; + } + + /** + * Verify if the application log aggregator, configured with given log + * retention period and the recovered log initialization time of + * the application, uploads and deletes the set of log files as expected. + * @param appId application id + * @param containerId container id + * @param logRetentionSecs log retention period + * @param recoveredLogInitedTimeMillis recovered log initialization time + * @param expectedFilesToDelete the set of files expected to be deleted + * @param expectedFilesToUpload the set of files expected to be uploaded. + */ + public void verifyLogAggregationWithExpectedFiles2DeleteAndUpload( + ApplicationId appId, ContainerId containerId, long logRetentionSecs, + long recoveredLogInitedTimeMillis, Set expectedFilesToDelete, + Set expectedFilesToUpload) throws IOException { + + final Set filesExpected2Delete = new HashSet<>(); + for(File file: expectedFilesToDelete) { + filesExpected2Delete.add(file.getAbsolutePath()); + } + final Set filesExpected2Upload = new HashSet<>(); + for(File file: expectedFilesToUpload) { + filesExpected2Upload.add(file.getAbsolutePath()); + } + + // deletion service with verification to check files to delete + DeletionService deletionServiceWithExpectedFiles = + createDeletionServiceWithExpectedFile2Delete(filesExpected2Delete); + + final YarnConfiguration config = new YarnConfiguration(); + config.setLong( + YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs); + + final AppLogAggregatorInTest appLogAggregator = + createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(), + config, recoveredLogInitedTimeMillis, + deletionServiceWithExpectedFiles); + appLogAggregator.startContainerLogAggregation( + new ContainerLogContext(containerId, ContainerType.TASK, 0)); + // set app finished flag first + appLogAggregator.finishLogAggregation(); + appLogAggregator.run(); + + // verify uploaded files + ArgumentCaptor logValCaptor = + ArgumentCaptor.forClass(LogValue.class); + verify(appLogAggregator.logWriter).append(any(LogKey.class), + logValCaptor.capture()); + Set filesUploaded = new HashSet<>(); + LogValue logValue = logValCaptor.getValue(); + for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) { + filesUploaded.add(file.getAbsolutePath()); + } + verifyFilesUploaded(filesUploaded , filesExpected2Upload); + } + + + private static void verifyFilesUploaded(Set filesUploaded, + Set filesExpected) { + final String errMsgPrefix = "The set of files uploaded are not the same " + + "as expected"; + if(filesUploaded.size() != filesUploaded.size()) { + fail(errMsgPrefix + ": actual size: " + filesUploaded.size() + " vs " + + "expected size: " + filesExpected.size()); + } + for(String file: filesExpected) { + if(!filesUploaded.contains(file)) { + fail(errMsgPrefix + ": expecting " + file); + } + } + } + + private static AppLogAggregatorInTest createAppLogAggregator( + ApplicationId applicationId, String rootLogDir, + YarnConfiguration config, long recoveredLogInitedTimeMillis, + DeletionService deletionServiceWithFilesToExpect) + throws IOException { + + final Dispatcher dispatcher = createNullDispatcher(); + final NodeId nodeId = NodeId.newInstance("localhost", 0); + final String userId = "AppLogAggregatorTest"; + final UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(userId); + final LocalDirsHandlerService dirsService = + createLocalDirsHandlerService(config, rootLogDir); + final DeletionService deletionService = deletionServiceWithFilesToExpect; + final LogAggregationContext logAggregationContext = null; + final Map appAcls = new HashMap<>(); + + final Context context = createContext(config); + final FileContext fakeLfs = mock(FileContext.class); + final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath()); + + return new AppLogAggregatorInTest(dispatcher, deletionService, + config, applicationId, ugi, nodeId, dirsService, + remoteLogDirForApp, appAcls, logAggregationContext, + context, fakeLfs, recoveredLogInitedTimeMillis); + } + + /** + * Create a deletionService that verifies the paths of container log files + * passed to the delete method of DeletionService by AppLogAggregatorImpl. + * This approach is taken due to lack of support of varargs captor in the + * current mockito version 1.8.5 (The support is added in 1.10.x). + **/ + private static DeletionService createDeletionServiceWithExpectedFile2Delete( + final Set expectedPathsForDeletion) { + DeletionService deletionServiceWithExpectedFiles = mock(DeletionService + .class); + // verify paths passed to first invocation of delete method against + // expected paths + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + Set paths = new HashSet<>(); + Object[] args = invocationOnMock.getArguments(); + for(int i = 2; i < args.length; i++) { + Path path = (Path) args[i]; + paths.add(path.toUri().getRawPath()); + } + verifyFilesToDelete(expectedPathsForDeletion, paths); + return null; + } + }).doNothing().when(deletionServiceWithExpectedFiles).delete( + any(String.class), any(Path.class), Matchers.anyVararg()); + + return deletionServiceWithExpectedFiles; + } + + private static void verifyFilesToDelete(Set files2ToDelete, + Set filesExpected) { + final String errMsgPrefix = "The set of paths for deletion are not the " + + "same as expected"; + if(files2ToDelete.size() != filesExpected.size()) { + fail(errMsgPrefix + ": actual size: " + files2ToDelete.size() + " vs " + + "expected size: " + filesExpected.size()); + } + for(String file: filesExpected) { + if(!files2ToDelete.contains(file)) { + fail(errMsgPrefix + ": expecting " + file); + } + } + } + + private static Dispatcher createNullDispatcher() { + return new Dispatcher() { + @Override + public EventHandler getEventHandler() { + return new EventHandler() { + @Override + public void handle(Event event) { + // do nothing + } + }; + } + + @Override + public void register(Class eventType, + EventHandler handler) { + // do nothing + } + }; + } + + private static LocalDirsHandlerService createLocalDirsHandlerService( + YarnConfiguration conf, final String rootLogDir) { + LocalDirsHandlerService dirsHandlerService = new LocalDirsHandlerService() { + @Override + public List getLogDirsForRead() { + return new ArrayList() { + { + add(rootLogDir); + } + }; + } + @Override + public List getLogDirsForCleanup() { + return new ArrayList() { + { + add(rootLogDir); + } + }; + } + }; + + dirsHandlerService.init(conf); + // appLogAggregator only calls LocalDirsHandlerServer for local directories + // so it is ok to not start the service. + return dirsHandlerService; + } + + private static Context createContext(YarnConfiguration conf) { + return new NodeManager.NMContext( + new NMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInNM(), + null, + new ApplicationACLsManager(conf), + new NMNullStateStoreService(), false); + } + + private static final class AppLogAggregatorInTest extends + AppLogAggregatorImpl { + + final DeletionService deletionService; + final ApplicationId applicationId; + final LogWriter logWriter; + final ArgumentCaptor logValue; + + public AppLogAggregatorInTest(Dispatcher dispatcher, + DeletionService deletionService, Configuration conf, + ApplicationId appId, UserGroupInformation ugi, NodeId nodeId, + LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, + Map appAcls, + LogAggregationContext logAggregationContext, Context context, + FileContext lfs, long recoveredLogInitedTime) throws IOException { + super(dispatcher, deletionService, conf, appId, ugi, nodeId, + dirsHandler, remoteNodeLogFileForApp, appAcls, + logAggregationContext, context, lfs, recoveredLogInitedTime); + this.applicationId = appId; + this.deletionService = deletionService; + + this.logWriter = getSpiedLogWriter(conf, ugi, remoteNodeLogFileForApp); + this.logValue = ArgumentCaptor.forClass(LogValue.class); + } + + @Override + protected LogWriter createLogWriter() { + return this.logWriter; + } + + private LogWriter getSpiedLogWriter(Configuration conf, + UserGroupInformation ugi, Path remoteAppLogFile) throws IOException { + return spy(new LogWriter(conf, remoteAppLogFile, ugi)); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index fa9a0b50a7..3961e1ac51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -20,10 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyMap; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.*; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -712,7 +709,7 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception { doThrow(new YarnRuntimeException("KABOOM!")) .when(logAggregationService).initAppAggregator( eq(appId), eq(user), any(Credentials.class), - anyMap(), any(LogAggregationContext.class)); + anyMap(), any(LogAggregationContext.class), anyLong()); LogAggregationContext contextWithAMAndFailed = Records.newRecord(LogAggregationContext.class); contextWithAMAndFailed.setLogAggregationPolicyClassName(