YARN-2790. Fixed a NodeManager bug that was causing log-aggregation to fail beyond HFDS delegation-token expiry even when RM is a proxy-user (YARN-2704). Contributed by Jian He.

This commit is contained in:
Vinod Kumar Vavilapalli 2014-11-01 16:32:35 -07:00
parent 36ccf097a9
commit 5c0381c96a
7 changed files with 100 additions and 23 deletions

View File

@ -832,6 +832,10 @@ Release 2.6.0 - UNRELEASED
YARN-2711. Fixed TestDefaultContainerExecutor#testContainerLaunchError failure on YARN-2711. Fixed TestDefaultContainerExecutor#testContainerLaunchError failure on
Windows. (Varun Vasudev via zjshen) Windows. (Varun Vasudev via zjshen)
YARN-2790. Fixed a NodeManager bug that was causing log-aggregation to fail
beyond HFDS delegation-token expiry even when RM is a proxy-user (YARN-2704).
(Jian He via vinodkv)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -433,7 +433,7 @@ public Map<ApplicationId, Credentials> getSystemCredentialsForApps() {
return systemCredentials; return systemCredentials;
} }
public void setSystemCrendentials( public void setSystemCrendentialsForApps(
Map<ApplicationId, Credentials> systemCredentials) { Map<ApplicationId, Credentials> systemCredentials) {
this.systemCredentials = systemCredentials; this.systemCredentials = systemCredentials;
} }

View File

@ -626,7 +626,7 @@ public void run() {
response.getSystemCredentialsForApps(); response.getSystemCredentialsForApps();
if (systemCredentials != null && !systemCredentials.isEmpty()) { if (systemCredentials != null && !systemCredentials.isEmpty()) {
((NMContext) context) ((NMContext) context)
.setSystemCrendentials(parseCredentials(systemCredentials)); .setSystemCrendentialsForApps(parseCredentials(systemCredentials));
} }
} catch (ConnectException e) { } catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM //catch and throw the exception if tried MAX wait time to connect RM

View File

@ -1122,9 +1122,9 @@ private Credentials getSystemCredentialsSentFromRM(
if (systemCredentials == null) { if (systemCredentials == null) {
return null; return null;
} }
LOG.info("Adding new framework tokens from RM for " + appId);
for (Token<?> token : systemCredentials.getAllTokens()) { for (Token<?> token : systemCredentials.getAllTokens()) {
LOG.info("Adding new application-token for localization: " + token); LOG.info("Adding new framework-token for " + appId
+ " for localization: " + token);
} }
return systemCredentials; return systemCredentials;
} }

View File

@ -21,7 +21,6 @@
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
@ -43,19 +42,21 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@ -197,6 +198,19 @@ private void uploadLogsForContainers() {
return; return;
} }
if (UserGroupInformation.isSecurityEnabled()) {
Credentials systemCredentials =
context.getSystemCredentialsForApps().get(appId);
if (systemCredentials != null) {
for (Token<?> token : systemCredentials.getAllTokens()) {
LOG.info("Adding new framework-token for " + appId
+ " for log-aggregation: " + token + " user=" + userUgi);
}
// this will replace old token
userUgi.addCredentials(systemCredentials);
}
}
// Create a set of Containers whose logs will be uploaded in this cycle. // Create a set of Containers whose logs will be uploaded in this cycle.
// It includes: // It includes:
// a) all containers in pendingContainers: those containers are finished // a) all containers in pendingContainers: those containers are finished
@ -538,4 +552,10 @@ public boolean apply(String next) {
return logValue.getCurrentUpLoadedFilesPath(); return logValue.getCurrentUpLoadedFilesPath();
} }
} }
// only for test
@VisibleForTesting
public UserGroupInformation getUgi() {
return this.userUgi;
}
} }

View File

@ -39,7 +39,6 @@
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -344,18 +343,6 @@ protected void initAppAggregator(final ApplicationId appId, String user,
Map<ApplicationAccessType, String> appAcls, Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) { LogAggregationContext logAggregationContext) {
if (UserGroupInformation.isSecurityEnabled()) {
Credentials systemCredentials =
context.getSystemCredentialsForApps().get(appId);
if (systemCredentials != null) {
LOG.info("Adding new framework tokens from RM for " + appId);
for (Token<?> token : systemCredentials.getAllTokens()) {
LOG.info("Adding new application-token for log-aggregation: " + token);
}
credentials = systemCredentials;
}
}
// Get user's FileSystem credentials // Get user's FileSystem credentials
final UserGroupInformation userUgi = final UserGroupInformation userUgi =
UserGroupInformation.createRemoteUser(user); UserGroupInformation.createRemoteUser(user);

View File

@ -55,11 +55,11 @@
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.junit.Assert;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -67,8 +67,11 @@
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -93,10 +96,11 @@
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@ -107,19 +111,22 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mortbay.util.MultiException; import org.mortbay.util.MultiException;
import com.google.common.base.Supplier;
//@Ignore //@Ignore
public class TestLogAggregationService extends BaseContainerManagerTest { public class TestLogAggregationService extends BaseContainerManagerTest {
@ -152,6 +159,7 @@ public void setup() throws IOException {
dispatcher = createDispatcher(); dispatcher = createDispatcher();
appEventHandler = mock(EventHandler.class); appEventHandler = mock(EventHandler.class);
dispatcher.register(ApplicationEventType.class, appEventHandler); dispatcher.register(ApplicationEventType.class, appEventHandler);
UserGroupInformation.setConfiguration(conf);
} }
@Override @Override
@ -1424,6 +1432,64 @@ private void testLogAggregationService(boolean retentionSizeLimitation)
dispatcher.stop(); dispatcher.stop();
} }
@Test (timeout = 20000)
public void testAddNewTokenSentFromRMForLogAggregation() throws Exception {
Configuration conf = new YarnConfiguration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
DrainDispatcher dispatcher = createDispatcher();
dispatcher.register(ApplicationEventType.class, appEventHandler);
ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
Application mockApp = mock(Application.class);
when(mockApp.getContainers()).thenReturn(
new HashMap<ContainerId, Container>());
this.context.getApplications().put(application1, mockApp);
@SuppressWarnings("resource")
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
logAggregationService.handle(new LogHandlerAppStartedEvent(application1,
this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
Records.newRecord(LogAggregationContext.class)));
// Inject new token for log-aggregation after app log-aggregator init
Text userText1 = new Text("user1");
RMDelegationTokenIdentifier dtId1 =
new RMDelegationTokenIdentifier(userText1, new Text("renewer1"),
userText1);
final Token<RMDelegationTokenIdentifier> token1 =
new Token<RMDelegationTokenIdentifier>(dtId1.getBytes(),
"password1".getBytes(), dtId1.getKind(), new Text("service1"));
Credentials credentials = new Credentials();
credentials.addToken(userText1, token1);
this.context.getSystemCredentialsForApps().put(application1, credentials);
logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
final UserGroupInformation ugi =
((AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
.get(application1)).getUgi();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
boolean hasNewToken = false;
for (Token<?> token : ugi.getCredentials().getAllTokens()) {
if (token.equals(token1)) {
hasNewToken = true;
}
}
return hasNewToken;
}
}, 1000, 20000);
logAggregationService.stop();
dispatcher.stop();
}
private int numOfLogsAvailable(LogAggregationService logAggregationService, private int numOfLogsAvailable(LogAggregationService logAggregationService,
ApplicationId appId, boolean sizeLimited, String lastLogFile) ApplicationId appId, boolean sizeLimited, String lastLogFile)
throws IOException { throws IOException {