YARN-8418. App local logs could leaked if log aggregation fails to initialize for the app. (Bibin A Chundatt via wangda)

Change-Id: I29a23ca4b219b48c92e7975cd44cddb8b0e04104
This commit is contained in:
Wangda Tan 2018-07-31 12:07:51 -07:00
parent 8aa93a575e
commit 4b540bbfcf
13 changed files with 187 additions and 28 deletions

View File

@ -43,11 +43,14 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
@ -365,6 +368,10 @@ public Object run() throws Exception {
}
});
} catch (Exception e) {
if (e instanceof RemoteException) {
throw new YarnRuntimeException(((RemoteException) e)
.unwrapRemoteException(SecretManager.InvalidToken.class));
}
throw new YarnRuntimeException(e);
}
}

View File

@ -1135,6 +1135,7 @@ public void run() {
if (systemCredentials != null && !systemCredentials.isEmpty()) {
((NMContext) context).setSystemCrendentialsForApps(
parseCredentials(systemCredentials));
context.getContainerManager().handleCredentialUpdate();
}
List<org.apache.hadoop.yarn.api.records.Container>
containersToUpdate = response.getContainersToUpdate();

View File

@ -44,4 +44,5 @@ public interface ContainerManager extends ServiceStateChangeListener,
ContainerScheduler getContainerScheduler();
void handleCredentialUpdate();
}

View File

@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -170,7 +171,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -214,6 +214,7 @@ private enum ReInitOp {
protected final AsyncDispatcher dispatcher;
private final DeletionService deletionService;
private LogHandler logHandler;
private boolean serviceStopped = false;
private final ReadLock readLock;
private final WriteLock writeLock;
@ -292,7 +293,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
@Override
public void serviceInit(Configuration conf) throws Exception {
LogHandler logHandler =
logHandler =
createLogHandler(conf, this.context, this.deletionService);
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
@ -1904,4 +1905,12 @@ private void internalSignalToContainer(SignalContainerRequest request,
public ContainerScheduler getContainerScheduler() {
return this.containerScheduler;
}
@Override
public void handleCredentialUpdate() {
Set<ApplicationId> invalidApps = logHandler.getInvalidTokenApps();
if (!invalidApps.isEmpty()) {
dispatcher.getEventHandler().handle(new LogHandlerTokenUpdatedEvent());
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
public interface AppLogAggregator extends Runnable {
@ -29,4 +31,10 @@ public interface AppLogAggregator extends Runnable {
void finishLogAggregation();
void disableLogAggregation();
void enableLogAggregation();
boolean isAggregationEnabled();
UserGroupInformation updateCredentials(Credentials cred);
}

View File

@ -561,6 +561,16 @@ public void disableLogAggregation() {
this.logAggregationDisabled = true;
}
@Override
public void enableLogAggregation() {
this.logAggregationDisabled = false;
}
@Override
public boolean isAggregationEnabled() {
return !logAggregationDisabled;
}
@Private
@VisibleForTesting
// This is only used for testing.
@ -643,6 +653,11 @@ public UserGroupInformation getUgi() {
return this.userUgi;
}
public UserGroupInformation updateCredentials(Credentials cred) {
this.userUgi.addCredentials(cred);
return userUgi;
}
@Private
@VisibleForTesting
public int getLogAggregationTimes() {

View File

@ -20,10 +20,14 @@
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.security.token.SecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,6 +62,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -83,6 +88,9 @@ public class LogAggregationService extends AbstractService implements
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
// Holds applications whose aggregation is disable due to invalid Token
private final Set<ApplicationId> invalidTokenApps;
@VisibleForTesting
ExecutorService threadPool;
@ -95,6 +103,7 @@ public LogAggregationService(Dispatcher dispatcher, Context context,
this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
this.invalidTokenApps = ConcurrentHashMap.newKeySet();
}
protected void serviceInit(Configuration conf) throws Exception {
@ -224,8 +233,8 @@ protected void initAppAggregator(final ApplicationId appId, String user,
userUgi.addCredentials(credentials);
}
LogAggregationFileController logAggregationFileController
= getLogAggregationFileController(getConfig());
LogAggregationFileController logAggregationFileController =
getLogAggregationFileController(getConfig());
logAggregationFileController.verifyAndCreateRemoteLogDir();
// New application
final AppLogAggregator appLogAggregator =
@ -245,14 +254,16 @@ protected void initAppAggregator(final ApplicationId appId, String user,
logAggregationFileController.createAppDir(user, appId, userUgi);
} catch (Exception e) {
appLogAggregator.disableLogAggregation();
// add to disabled aggregators if due to InvalidToken
if (e.getCause() instanceof SecretManager.InvalidToken) {
invalidTokenApps.add(appId);
}
if (!(e instanceof YarnRuntimeException)) {
appDirException = new YarnRuntimeException(e);
} else {
appDirException = (YarnRuntimeException)e;
}
appLogAggregators.remove(appId);
closeFileSystems(userUgi);
throw appDirException;
}
// TODO Get the user configuration for the list of containers that need log
@ -270,6 +281,10 @@ public void run() {
}
};
this.threadPool.execute(aggregatorWrapper);
if (appDirException != null) {
throw appDirException;
}
}
protected void closeFileSystems(final UserGroupInformation userUgi) {
@ -307,17 +322,20 @@ private void stopApp(ApplicationId appId) {
// App is complete. Finish up any containers' pending log aggregation and
// close the application specific logFile.
AppLogAggregator aggregator = this.appLogAggregators.get(appId);
if (aggregator == null) {
LOG.warn("Log aggregation is not initialized for " + appId
+ ", did it fail to start?");
this.dispatcher.getEventHandler().handle(
new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
return;
try {
AppLogAggregator aggregator = this.appLogAggregators.get(appId);
if (aggregator == null) {
LOG.warn("Log aggregation is not initialized for " + appId
+ ", did it fail to start?");
this.dispatcher.getEventHandler().handle(new ApplicationEvent(appId,
ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
return;
}
aggregator.finishLogAggregation();
} finally {
// Remove invalid Token Apps
invalidTokenApps.remove(appId);
}
aggregator.finishLogAggregation();
}
@Override
@ -344,12 +362,47 @@ public void handle(LogHandlerEvent event) {
(LogHandlerAppFinishedEvent) event;
stopApp(appFinishedEvent.getApplicationId());
break;
case LOG_AGG_TOKEN_UPDATE:
checkAndEnableAppAggregators();
break;
default:
; // Ignore
}
}
private void checkAndEnableAppAggregators() {
for (ApplicationId appId : invalidTokenApps) {
try {
AppLogAggregator aggregator = appLogAggregators.get(appId);
if (aggregator != null) {
Credentials credentials =
context.getSystemCredentialsForApps().get(appId);
if (credentials != null) {
// Create the app dir again with
LogAggregationFileController logAggregationFileController =
getLogAggregationFileController(getConfig());
UserGroupInformation userUgi =
aggregator.updateCredentials(credentials);
logAggregationFileController
.createAppDir(userUgi.getShortUserName(), appId, userUgi);
aggregator.enableLogAggregation();
}
invalidTokenApps.remove(appId);
LOG.info("LogAggregation enabled for application {}", appId);
}
} catch (Exception e) {
//Ignore exception
LOG.warn("Enable aggregators failed {}", appId);
}
}
}
@Override
public Set<ApplicationId> getInvalidTokenApps() {
return invalidTokenApps;
}
@VisibleForTesting
public ConcurrentMap<ApplicationId, AppLogAggregator> getAppLogAggregators() {
return this.appLogAggregators;

View File

@ -18,9 +18,16 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import java.util.Set;
public interface LogHandler extends EventHandler<LogHandlerEvent> {
public void handle(LogHandlerEvent event);
public Set<ApplicationId> getInvalidTokenApps();
}

View File

@ -19,8 +19,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@ -204,6 +208,11 @@ public void handle(LogHandlerEvent event) {
}
}
@Override
public Set<ApplicationId> getInvalidTokenApps() {
return Collections.emptySet();
}
ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
Configuration conf) {
ThreadFactory tf =

View File

@ -19,5 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
public enum LogHandlerEventType {
APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED
APPLICATION_STARTED,
CONTAINER_FINISHED,
APPLICATION_FINISHED, LOG_AGG_TOKEN_UPDATE
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
public class LogHandlerTokenUpdatedEvent extends LogHandlerEvent {
public LogHandlerTokenUpdatedEvent() {
super(LogHandlerEventType.LOG_AGG_TOKEN_UPDATE);
}
}

View File

@ -24,6 +24,8 @@
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -187,6 +189,11 @@ public void handle(LogHandlerEvent event) {
// Ignore
}
}
@Override
public Set<ApplicationId> getInvalidTokenApps() {
return Collections.emptySet();
}
};
}

View File

@ -73,6 +73,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@ -128,6 +129,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -823,7 +825,8 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM()
.getFileControllerForWrite();
LogAggregationFileController spyLogAggregationFileFormat =
spy(logAggregationFileFormat);
Exception e = new RuntimeException("KABOOM!");
Exception e =
new YarnRuntimeException(new SecretManager.InvalidToken("KABOOM!"));
doThrow(e).when(spyLogAggregationFileFormat)
.createAppDir(any(String.class), any(ApplicationId.class),
any(UserGroupInformation.class));
@ -862,29 +865,40 @@ public LogAggregationFileController getLogAggregationFileController(
};
checkEvents(appEventHandler, expectedEvents, false,
"getType", "getApplicationID", "getDiagnostic");
Assert.assertEquals(logAggregationService.getInvalidTokenApps().size(), 1);
// verify trying to collect logs for containers/apps we don't know about
// doesn't blow up and tear down the NM
logAggregationService.handle(new LogHandlerContainerFinishedEvent(
BuilderUtils.newContainerId(4, 1, 1, 1),
ContainerType.APPLICATION_MASTER, 0));
dispatcher.await();
AppLogAggregator appAgg =
logAggregationService.getAppLogAggregators().get(appId);
Assert.assertFalse("Aggregation should be disabled",
appAgg.isAggregationEnabled());
// Enabled aggregation
logAggregationService.handle(new LogHandlerTokenUpdatedEvent());
dispatcher.await();
appAgg =
logAggregationService.getAppLogAggregators().get(appId);
Assert.assertFalse("Aggregation should be enabled",
appAgg.isAggregationEnabled());
// Check disabled apps are cleared
Assert.assertEquals(0, logAggregationService.getInvalidTokenApps().size());
logAggregationService.handle(new LogHandlerAppFinishedEvent(
BuilderUtils.newApplicationId(1, 5)));
dispatcher.await();
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
// local log dir shouldn't be deleted given log aggregation cannot
// continue due to aggregated log dir creation failure on remoteFS.
FileDeletionTask deletionTask = new FileDeletionTask(spyDelSrvc, user,
null, null);
verify(spyDelSrvc, never()).delete(deletionTask);
verify(spyDelSrvc).delete(any(FileDeletionTask.class));
verify(logAggregationService).closeFileSystems(
any(UserGroupInformation.class));
// make sure local log dir is not deleted in case log aggregation
// service cannot be initiated.
assertTrue(appLogDir.exists());
}
private void writeContainerLogs(File appLogDir, ContainerId containerId,