YARN-4766. NM should not aggregate logs older than the retention policy (haibochen via rkanter)

This commit is contained in:
Robert Kanter 2016-05-25 10:25:44 -07:00
parent 9a31e5dfef
commit e07519b8db
10 changed files with 697 additions and 38 deletions

View File

@ -168,17 +168,31 @@ public static class LogValue {
private final Set<String> alreadyUploadedLogFiles;
private Set<String> allExistingFileMeta = new HashSet<String>();
private final boolean appFinished;
/**
* The retention context to determine if log files are older than
* the retention policy configured.
*/
private final LogRetentionContext logRetentionContext;
/**
* The set of log files that are older than retention policy that will
* not be uploaded but ready for deletion.
*/
private final Set<File> obseleteRetentionLogFiles = new HashSet<File>();
// TODO Maybe add a version string here. Instead of changing the version of
// the entire k-v format
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user) {
this(rootLogDirs, containerId, user, null, new HashSet<String>(), true);
this(rootLogDirs, containerId, user, null, new HashSet<String>(),
null, true);
}
public LogValue(List<String> rootLogDirs, ContainerId containerId,
String user, LogAggregationContext logAggregationContext,
Set<String> alreadyUploadedLogFiles, boolean appFinished) {
Set<String> alreadyUploadedLogFiles,
LogRetentionContext retentionContext, boolean appFinished) {
this.rootLogDirs = new ArrayList<String>(rootLogDirs);
this.containerId = containerId;
this.user = user;
@ -188,9 +202,11 @@ public LogValue(List<String> rootLogDirs, ContainerId containerId,
this.logAggregationContext = logAggregationContext;
this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
this.appFinished = appFinished;
this.logRetentionContext = retentionContext;
}
private Set<File> getPendingLogFilesToUploadForThisContainer() {
@VisibleForTesting
public Set<File> getPendingLogFilesToUploadForThisContainer() {
Set<File> pendingUploadFiles = new HashSet<File>();
for (String rootLogDir : this.rootLogDirs) {
File appLogDir =
@ -297,6 +313,14 @@ private Set<File> getPendingLogFilesToUpload(File containerLogDir) {
this.allExistingFileMeta.add(getLogFileMetaData(logFile));
}
// if log files are older than retention policy, do not upload them.
// but schedule them for deletion.
if(logRetentionContext != null && !logRetentionContext.shouldRetainLog()){
obseleteRetentionLogFiles.addAll(candidates);
candidates.clear();
return candidates;
}
if (this.logAggregationContext != null && candidates.size() > 0) {
filterFiles(
this.appFinished ? this.logAggregationContext.getIncludePattern()
@ -318,6 +342,7 @@ public boolean apply(File next) {
});
candidates = Sets.newHashSet(mask);
}
return candidates;
}
@ -352,6 +377,14 @@ public Set<String> getCurrentUpLoadedFileMeta() {
return info;
}
public Set<Path> getObseleteRetentionLogFiles() {
Set<Path> path = new HashSet<Path>();
for(File file: this.obseleteRetentionLogFiles) {
path.add(new Path(file.getAbsolutePath()));
}
return path;
}
public Set<String> getAllExistingFilesMeta() {
return this.allExistingFileMeta;
}
@ -362,6 +395,39 @@ private String getLogFileMetaData(File file) {
}
}
/**
* A context for log retention to determine if files are older than
* the retention policy configured in YarnConfiguration.
*/
public static class LogRetentionContext {
/**
* The time used with logRetentionMillis, to determine ages of
* log files and if files need to be uploaded.
*/
private final long logInitedTimeMillis;
/**
* The numbers of milli seconds since a log file is created to determine
* if we should upload it. -1 if disabled.
* see YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS for details.
*/
private final long logRetentionMillis;
public LogRetentionContext(long logInitedTimeMillis, long
logRetentionMillis) {
this.logInitedTimeMillis = logInitedTimeMillis;
this.logRetentionMillis = logRetentionMillis;
}
public boolean isDisabled() {
return logInitedTimeMillis < 0 || logRetentionMillis < 0;
}
public boolean shouldRetainLog() {
return isDisabled() ||
System.currentTimeMillis() - logInitedTimeMillis < logRetentionMillis;
}
}
/**
* The writer that writes out the aggregated logs.
*/

View File

@ -329,7 +329,7 @@ private void recoverApplication(ContainerManagerApplicationProto p)
LOG.info("Recovering application " + appId);
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
creds, context);
creds, context, p.getAppLogAggregationInitedTime());
context.getApplications().put(appId, app);
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
}

View File

@ -26,15 +26,22 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@ -47,13 +54,13 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import com.google.common.annotations.VisibleForTesting;
/**
@ -79,18 +86,35 @@ public class ApplicationImpl implements Application {
Map<ContainerId, Container> containers =
new HashMap<ContainerId, Container>();
public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId appId,
Credentials credentials, Context context) {
/**
* The timestamp when the log aggregation has started for this application.
* Used to determine the age of application log files during log aggregation.
* When logAggregationRentention policy is enabled, log files older than
* the retention policy will not be uploaded but scheduled for deletion.
*/
private long applicationLogInitedTimestamp = -1;
private final NMStateStoreService appStateStore;
public ApplicationImpl(Dispatcher dispatcher, String user,
ApplicationId appId, Credentials credentials,
Context context, long recoveredLogInitedTime) {
this.dispatcher = dispatcher;
this.user = user;
this.appId = appId;
this.credentials = credentials;
this.aclsManager = context.getApplicationACLsManager();
this.context = context;
this.appStateStore = context.getNMStateStore();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
stateMachine = stateMachineFactory.make(this);
setAppLogInitedTimestamp(recoveredLogInitedTime);
}
public ApplicationImpl(Dispatcher dispatcher, String user,
ApplicationId appId, Credentials credentials, Context context) {
this(dispatcher, user, appId, credentials, context, -1);
}
@Override
@ -242,7 +266,7 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, app.applicationACLs,
app.logAggregationContext));
app.logAggregationContext, app.applicationLogInitedTimestamp));
}
}
@ -262,9 +286,57 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
app.setAppLogInitedTimestamp(event.getTimestamp());
try {
app.appStateStore.storeApplication(app.appId, buildAppProto(app));
} catch (Exception ex) {
LOG.warn("failed to update application state in state store", ex);
}
}
}
@VisibleForTesting
void setAppLogInitedTimestamp(long appLogInitedTimestamp) {
this.applicationLogInitedTimestamp = appLogInitedTimestamp;
}
static ContainerManagerApplicationProto buildAppProto(ApplicationImpl app)
throws IOException {
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) app.appId).getProto());
builder.setUser(app.getUser());
if (app.logAggregationContext != null) {
builder.setLogAggregationContext((
(LogAggregationContextPBImpl)app.logAggregationContext).getProto());
}
builder.clearCredentials();
if (app.credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
app.credentials.writeTokenStorageToStream(dob);
builder.setCredentials(ByteString.copyFrom(dob.getData()));
}
builder.clearAcls();
if (app.applicationACLs != null) {
for (Map.Entry<ApplicationAccessType, String> acl : app
.applicationACLs.entrySet()) {
YarnProtos.ApplicationACLMapProto p = YarnProtos
.ApplicationACLMapProto.newBuilder()
.setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey()))
.setAcl(acl.getValue())
.build();
builder.addAcls(p);
}
}
builder.setAppLogAggregationInitedTime(app.applicationLogInitedTimestamp);
return builder.build();
}
/**
* Handles the APPLICATION_LOG_HANDLING_FAILED event that occurs after
* {@link LogAggregationService} has failed to initialize the log

View File

@ -54,6 +54,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
@ -131,6 +132,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
new HashMap<ContainerId, ContainerLogAggregator>();
private final ContainerLogAggregationPolicy logAggPolicy;
/**
* The value recovered from state store to determine the age of application
* log files if log retention is enabled. Files older than retention policy
* will not be uploaded but scheduled for cleaning up. -1 if not recovered.
*/
private final long recoveredLogInitedTime;
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
@ -138,6 +147,19 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long rollingMonitorInterval) {
this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
logAggregationContext, context, lfs, rollingMonitorInterval, -1);
}
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long rollingMonitorInterval,
long recoveredLogInitedTime) {
this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
@ -169,6 +191,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
|| this.logAggregationContext.getRolledLogsIncludePattern()
.isEmpty() ? false : true;
this.logAggPolicy = getLogAggPolicy(conf);
this.recoveredLogInitedTime = recoveredLogInitedTime;
}
private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
@ -283,9 +306,7 @@ private void uploadLogsForContainers(boolean appFinished) {
logAggregationTimes++;
try {
writer =
new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
this.userUgi);
writer = createLogWriter();
// Write ACLs once when the writer is created.
writer.writeApplicationACLs(appAcls);
writer.writeApplicationOwner(this.userUgi.getShortUserName());
@ -396,6 +417,11 @@ public Object run() throws Exception {
}
}
protected LogWriter createLogWriter() throws IOException {
return new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
this.userUgi);
}
private void sendLogAggregationReport(
LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
LogAggregationReport report =
@ -599,13 +625,21 @@ public synchronized void doLogAggregationOutOfBand() {
this.notifyAll();
}
private class ContainerLogAggregator {
class ContainerLogAggregator {
private final AggregatedLogFormat.LogRetentionContext retentionContext;
private final ContainerId containerId;
private Set<String> uploadedFileMeta =
new HashSet<String>();
private Set<String> uploadedFileMeta = new HashSet<String>();
public ContainerLogAggregator(ContainerId containerId) {
this.containerId = containerId;
this.retentionContext = getRetentionContext();
}
private AggregatedLogFormat.LogRetentionContext getRetentionContext() {
final long logRetentionSecs =
conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
return new AggregatedLogFormat.LogRetentionContext(
recoveredLogInitedTime, logRetentionSecs * 1000);
}
public Set<Path> doContainerLogAggregation(LogWriter writer,
@ -617,7 +651,7 @@ public Set<Path> doContainerLogAggregation(LogWriter writer,
final LogValue logValue =
new LogValue(dirsHandler.getLogDirsForRead(), containerId,
userUgi.getShortUserName(), logAggregationContext,
this.uploadedFileMeta, appFinished);
this.uploadedFileMeta, retentionContext, appFinished);
try {
writer.append(logKey, logValue);
} catch (Exception e) {
@ -638,7 +672,11 @@ public boolean apply(String next) {
});
this.uploadedFileMeta = Sets.newHashSet(mask);
return logValue.getCurrentUpLoadedFilesPath();
// need to return files uploaded or older-than-retention clean up.
return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
logValue.getObseleteRetentionLogFiles());
}
}

View File

@ -357,12 +357,13 @@ public Object run() throws Exception {
@SuppressWarnings("unchecked")
private void initApp(final ApplicationId appId, String user,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
LogAggregationContext logAggregationContext,
long recoveredLogInitedTime) {
ApplicationEvent eventResponse;
try {
verifyAndCreateRemoteLogDir(getConfig());
initAppAggregator(appId, user, credentials, appAcls,
logAggregationContext);
logAggregationContext, recoveredLogInitedTime);
eventResponse = new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
} catch (YarnRuntimeException e) {
@ -381,10 +382,10 @@ FileContext getLocalFileContext(Configuration conf) {
}
}
protected void initAppAggregator(final ApplicationId appId, String user,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
LogAggregationContext logAggregationContext,
long recoveredLogInitedTime) {
// Get user's FileSystem credentials
final UserGroupInformation userUgi =
@ -399,7 +400,8 @@ protected void initAppAggregator(final ApplicationId appId, String user,
getConfig(), appId, userUgi, this.nodeId, dirsHandler,
getRemoteNodeLogFileForApp(appId, user),
appAcls, logAggregationContext, this.context,
getLocalFileContext(getConfig()), this.rollingMonitorInterval);
getLocalFileContext(getConfig()), this.rollingMonitorInterval,
recoveredLogInitedTime);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnRuntimeException("Duplicate initApp for " + appId);
}
@ -501,7 +503,8 @@ public void handle(LogHandlerEvent event) {
initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
appStartEvent.getCredentials(),
appStartEvent.getApplicationAcls(),
appStartEvent.getLogAggregationContext());
appStartEvent.getLogAggregationContext(),
appStartEvent.getRecoveredAppLogInitedTime());
break;
case CONTAINER_FINISHED:
LogHandlerContainerFinishedEvent containerFinishEvent =

View File

@ -32,21 +32,36 @@ public class LogHandlerAppStartedEvent extends LogHandlerEvent {
private final Credentials credentials;
private final Map<ApplicationAccessType, String> appAcls;
private final LogAggregationContext logAggregationContext;
/**
* The value will be set when the application is recovered from state store.
* We use this value in AppLogAggregatorImpl to determine, if log retention
* policy is enabled, if we need to upload old application log files. Files
* older than retention policy will not be uploaded but scheduled for
* deletion.
*/
private final long recoveredAppLogInitedTime;
public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, Map<ApplicationAccessType, String> appAcls) {
this(appId, user, credentials, appAcls, null);
this(appId, user, credentials, appAcls, null, -1);
}
public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
this(appId, user, credentials, appAcls, logAggregationContext, -1);
}
public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, long appLogInitedTime) {
super(LogHandlerEventType.APPLICATION_STARTED);
this.applicationId = appId;
this.user = user;
this.credentials = credentials;
this.appAcls = appAcls;
this.logAggregationContext = logAggregationContext;
this.recoveredAppLogInitedTime = appLogInitedTime;
}
public ApplicationId getApplicationId() {
@ -68,4 +83,8 @@ public Map<ApplicationAccessType, String> getApplicationAcls() {
public LogAggregationContext getLogAggregationContext() {
return this.logAggregationContext;
}
public long getRecoveredAppLogInitedTime() {
return this.recoveredAppLogInitedTime;
}
}

View File

@ -30,6 +30,7 @@ message ContainerManagerApplicationProto {
optional bytes credentials = 3;
repeated ApplicationACLMapProto acls = 4;
optional LogAggregationContextProto log_aggregation_context = 5;
optional int64 appLogAggregationInitedTime = 6 [ default = -1 ];
}
message DeletionServiceDeleteTaskProto {

View File

@ -18,14 +18,10 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.refEq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@ -41,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@ -61,12 +58,14 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
@ -295,6 +294,26 @@ protected ContainerTokenIdentifier waitForContainerTokenToExpire(
return identifier;
}
@Test
public void testApplicationOnAppLogHandlingInitedEvtShouldStoreLogInitedTime()
throws IOException {
WrappedApplication wa = new WrappedApplication(5, 314159265358979L,
"yak", 0);
wa.initApplication();
ArgumentCaptor<ContainerManagerApplicationProto> applicationProto =
ArgumentCaptor.forClass(ContainerManagerApplicationProto.class);
final long timestamp = wa.applicationLogInited();
verify(wa.stateStoreService).storeApplication(any(ApplicationId.class),
applicationProto.capture());
assertEquals(applicationProto.getValue().getAppLogAggregationInitedTime()
, timestamp);
}
@Test
@SuppressWarnings("unchecked")
public void testAppFinishedOnCompletedContainers() {
@ -484,7 +503,7 @@ private class WrappedApplication {
final Context context;
final Map<ContainerId, ContainerTokenIdentifier> containerTokenIdentifierMap;
final NMTokenSecretManagerInNM nmTokenSecretMgr;
final NMStateStoreService stateStoreService;
final ApplicationId appId;
final Application app;
@ -511,7 +530,7 @@ private class WrappedApplication {
dispatcher.register(LogHandlerEventType.class, logAggregationBus);
nmTokenSecretMgr = mock(NMTokenSecretManagerInNM.class);
stateStoreService = mock(NMStateStoreService.class);
context = mock(Context.class);
when(context.getContainerTokenSecretManager()).thenReturn(
@ -519,6 +538,7 @@ private class WrappedApplication {
when(context.getApplicationACLsManager()).thenReturn(
new ApplicationACLsManager(conf));
when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr);
when(context.getNMStateStore()).thenReturn(stateStoreService);
// Setting master key
MasterKey masterKey = new MasterKeyPBImpl();
@ -586,6 +606,13 @@ public void applicationInited() {
drainDispatcherEvents();
}
public long applicationLogInited() {
ApplicationEvent appEvt = new ApplicationEvent(app.getAppId(),
ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
app.handle(appEvt);
return appEvt.getTimestamp();
}
public void appFinished() {
app.handle(new ApplicationFinishEvent(appId,
"Finish Application"));

View File

@ -0,0 +1,436 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
/**
* Unit tests of AppLogAggregatorImpl class.
*/
public class TestAppLogAggregatorImpl {
private static final File LOCAL_LOG_DIR = new File("target",
TestAppLogAggregatorImpl.class.getName() + "-localLogDir");
private static final File REMOTE_LOG_FILE = new File("target",
TestAppLogAggregatorImpl.class.getName() + "-remoteLogFile");
@Before
public void setUp() throws IOException {
if(LOCAL_LOG_DIR.exists()) {
FileUtils.cleanDirectory(LOCAL_LOG_DIR);
}
if(REMOTE_LOG_FILE.exists()) {
FileUtils.cleanDirectory(REMOTE_LOG_FILE);
}
}
@After
public void cleanUp() throws IOException {
FileUtils.deleteDirectory(LOCAL_LOG_DIR);
FileUtils.deleteQuietly(REMOTE_LOG_FILE);
}
@Test
public void testAggregatorWithRetentionPolicyDisabledShouldUploadAllFiles()
throws Exception {
final ApplicationId applicationId =
ApplicationId.newInstance(System.currentTimeMillis(), 0);
final ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(applicationId, 0);
final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);
// create artificial log files
final File appLogDir = new File(LOCAL_LOG_DIR,
ConverterUtils.toString(applicationId));
final File containerLogDir = new File(appLogDir,
ConverterUtils.toString(containerId));
containerLogDir.mkdirs();
final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);
final long logRetentionSecs = 10000;
final long recoveredLogInitedTime = -1;
verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
applicationId, containerId, logRetentionSecs,
recoveredLogInitedTime, logFiles, logFiles);
}
@Test
public void testAggregatorWhenNoFileOlderThanRetentionPolicyShouldUploadAll()
throws IOException {
final ApplicationId applicationId =
ApplicationId.newInstance(System.currentTimeMillis(), 0);
final ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(applicationId, 0);
final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);
// create artificial log files
final File appLogDir = new File(LOCAL_LOG_DIR,
ConverterUtils.toString(applicationId));
final File containerLogDir = new File(appLogDir,
ConverterUtils.toString(containerId));
containerLogDir.mkdirs();
final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);
// set log retention period to 1 week.
final long logRententionSec = 7 * 24 * 60 * 60;
final long recoveredLogInitedTimeMillis =
System.currentTimeMillis() - 60*60;
verifyLogAggregationWithExpectedFiles2DeleteAndUpload(applicationId,
containerId, logRententionSec, recoveredLogInitedTimeMillis,
logFiles, new HashSet<File>());
}
@Test
public void testAggregatorWhenAllFilesOlderThanRetentionShouldUploadNone()
throws IOException {
final ApplicationId applicationId =
ApplicationId.newInstance(System.currentTimeMillis(), 0);
final ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(applicationId, 0);
final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);
// create artificial log files
final File appLogDir = new File(LOCAL_LOG_DIR,
ConverterUtils.toString(applicationId));
final File containerLogDir = new File(appLogDir,
ConverterUtils.toString(containerId));
containerLogDir.mkdirs();
final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);
final long week = 7 * 24 * 60 * 60;
final long recoveredLogInitedTimeMillis = System.currentTimeMillis() -
2*week;
verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
applicationId, containerId, week, recoveredLogInitedTimeMillis,
logFiles, new HashSet<File>());
}
/**
* Create the given number of log files under the container log directory.
* @param containerLogDir the directory to create container log files
* @param numOfFiles the number of log files to create
* @return the set of log files created
*/
private static Set<File> createContainerLogFiles(File containerLogDir,
int numOfFiles) throws IOException {
assert(numOfFiles >= 0);
assert(containerLogDir.exists());
Set<File> containerLogFiles = new HashSet<>();
for(int i = 0; i < numOfFiles; i++) {
final File logFile = new File(containerLogDir, "logfile" + i);
logFile.createNewFile();
containerLogFiles.add(logFile);
}
return containerLogFiles;
}
/**
* Verify if the application log aggregator, configured with given log
* retention period and the recovered log initialization time of
* the application, uploads and deletes the set of log files as expected.
* @param appId application id
* @param containerId container id
* @param logRetentionSecs log retention period
* @param recoveredLogInitedTimeMillis recovered log initialization time
* @param expectedFilesToDelete the set of files expected to be deleted
* @param expectedFilesToUpload the set of files expected to be uploaded.
*/
public void verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
ApplicationId appId, ContainerId containerId, long logRetentionSecs,
long recoveredLogInitedTimeMillis, Set<File> expectedFilesToDelete,
Set<File> expectedFilesToUpload) throws IOException {
final Set<String> filesExpected2Delete = new HashSet<>();
for(File file: expectedFilesToDelete) {
filesExpected2Delete.add(file.getAbsolutePath());
}
final Set<String> filesExpected2Upload = new HashSet<>();
for(File file: expectedFilesToUpload) {
filesExpected2Upload.add(file.getAbsolutePath());
}
// deletion service with verification to check files to delete
DeletionService deletionServiceWithExpectedFiles =
createDeletionServiceWithExpectedFile2Delete(filesExpected2Delete);
final YarnConfiguration config = new YarnConfiguration();
config.setLong(
YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs);
final AppLogAggregatorInTest appLogAggregator =
createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
config, recoveredLogInitedTimeMillis,
deletionServiceWithExpectedFiles);
appLogAggregator.startContainerLogAggregation(
new ContainerLogContext(containerId, ContainerType.TASK, 0));
// set app finished flag first
appLogAggregator.finishLogAggregation();
appLogAggregator.run();
// verify uploaded files
ArgumentCaptor<LogValue> logValCaptor =
ArgumentCaptor.forClass(LogValue.class);
verify(appLogAggregator.logWriter).append(any(LogKey.class),
logValCaptor.capture());
Set<String> filesUploaded = new HashSet<>();
LogValue logValue = logValCaptor.getValue();
for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
filesUploaded.add(file.getAbsolutePath());
}
verifyFilesUploaded(filesUploaded , filesExpected2Upload);
}
private static void verifyFilesUploaded(Set<String> filesUploaded,
Set<String> filesExpected) {
final String errMsgPrefix = "The set of files uploaded are not the same " +
"as expected";
if(filesUploaded.size() != filesUploaded.size()) {
fail(errMsgPrefix + ": actual size: " + filesUploaded.size() + " vs " +
"expected size: " + filesExpected.size());
}
for(String file: filesExpected) {
if(!filesUploaded.contains(file)) {
fail(errMsgPrefix + ": expecting " + file);
}
}
}
private static AppLogAggregatorInTest createAppLogAggregator(
ApplicationId applicationId, String rootLogDir,
YarnConfiguration config, long recoveredLogInitedTimeMillis,
DeletionService deletionServiceWithFilesToExpect)
throws IOException {
final Dispatcher dispatcher = createNullDispatcher();
final NodeId nodeId = NodeId.newInstance("localhost", 0);
final String userId = "AppLogAggregatorTest";
final UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(userId);
final LocalDirsHandlerService dirsService =
createLocalDirsHandlerService(config, rootLogDir);
final DeletionService deletionService = deletionServiceWithFilesToExpect;
final LogAggregationContext logAggregationContext = null;
final Map<ApplicationAccessType, String> appAcls = new HashMap<>();
final Context context = createContext(config);
final FileContext fakeLfs = mock(FileContext.class);
final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath());
return new AppLogAggregatorInTest(dispatcher, deletionService,
config, applicationId, ugi, nodeId, dirsService,
remoteLogDirForApp, appAcls, logAggregationContext,
context, fakeLfs, recoveredLogInitedTimeMillis);
}
/**
* Create a deletionService that verifies the paths of container log files
* passed to the delete method of DeletionService by AppLogAggregatorImpl.
* This approach is taken due to lack of support of varargs captor in the
* current mockito version 1.8.5 (The support is added in 1.10.x).
**/
private static DeletionService createDeletionServiceWithExpectedFile2Delete(
final Set<String> expectedPathsForDeletion) {
DeletionService deletionServiceWithExpectedFiles = mock(DeletionService
.class);
// verify paths passed to first invocation of delete method against
// expected paths
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
Set<String> paths = new HashSet<>();
Object[] args = invocationOnMock.getArguments();
for(int i = 2; i < args.length; i++) {
Path path = (Path) args[i];
paths.add(path.toUri().getRawPath());
}
verifyFilesToDelete(expectedPathsForDeletion, paths);
return null;
}
}).doNothing().when(deletionServiceWithExpectedFiles).delete(
any(String.class), any(Path.class), Matchers.<Path>anyVararg());
return deletionServiceWithExpectedFiles;
}
private static void verifyFilesToDelete(Set<String> files2ToDelete,
Set<String> filesExpected) {
final String errMsgPrefix = "The set of paths for deletion are not the " +
"same as expected";
if(files2ToDelete.size() != filesExpected.size()) {
fail(errMsgPrefix + ": actual size: " + files2ToDelete.size() + " vs " +
"expected size: " + filesExpected.size());
}
for(String file: filesExpected) {
if(!files2ToDelete.contains(file)) {
fail(errMsgPrefix + ": expecting " + file);
}
}
}
private static Dispatcher createNullDispatcher() {
return new Dispatcher() {
@Override
public EventHandler getEventHandler() {
return new EventHandler() {
@Override
public void handle(Event event) {
// do nothing
}
};
}
@Override
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
// do nothing
}
};
}
private static LocalDirsHandlerService createLocalDirsHandlerService(
YarnConfiguration conf, final String rootLogDir) {
LocalDirsHandlerService dirsHandlerService = new LocalDirsHandlerService() {
@Override
public List<String> getLogDirsForRead() {
return new ArrayList<String>() {
{
add(rootLogDir);
}
};
}
@Override
public List<String> getLogDirsForCleanup() {
return new ArrayList<String>() {
{
add(rootLogDir);
}
};
}
};
dirsHandlerService.init(conf);
// appLogAggregator only calls LocalDirsHandlerServer for local directories
// so it is ok to not start the service.
return dirsHandlerService;
}
private static Context createContext(YarnConfiguration conf) {
return new NodeManager.NMContext(
new NMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInNM(),
null,
new ApplicationACLsManager(conf),
new NMNullStateStoreService(), false);
}
private static final class AppLogAggregatorInTest extends
AppLogAggregatorImpl {
final DeletionService deletionService;
final ApplicationId applicationId;
final LogWriter logWriter;
final ArgumentCaptor<LogValue> logValue;
public AppLogAggregatorInTest(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation ugi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long recoveredLogInitedTime) throws IOException {
super(dispatcher, deletionService, conf, appId, ugi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
logAggregationContext, context, lfs, recoveredLogInitedTime);
this.applicationId = appId;
this.deletionService = deletionService;
this.logWriter = getSpiedLogWriter(conf, ugi, remoteNodeLogFileForApp);
this.logValue = ArgumentCaptor.forClass(LogValue.class);
}
@Override
protected LogWriter createLogWriter() {
return this.logWriter;
}
private LogWriter getSpiedLogWriter(Configuration conf,
UserGroupInformation ugi, Path remoteAppLogFile) throws IOException {
return spy(new LogWriter(conf, remoteAppLogFile, ugi));
}
}
}

View File

@ -20,10 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
@ -712,7 +709,7 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception {
doThrow(new YarnRuntimeException("KABOOM!"))
.when(logAggregationService).initAppAggregator(
eq(appId), eq(user), any(Credentials.class),
anyMap(), any(LogAggregationContext.class));
anyMap(), any(LogAggregationContext.class), anyLong());
LogAggregationContext contextWithAMAndFailed =
Records.newRecord(LogAggregationContext.class);
contextWithAMAndFailed.setLogAggregationPolicyClassName(