From c86674a3a4d99aa56bb8ed3f6df51e3fef215eba Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Wed, 24 Sep 2014 17:50:26 -0700 Subject: [PATCH] YARN-2581. Passed LogAggregationContext to NM via ContainerTokenIdentifier. Contributed by Xuan Gong. --- hadoop-yarn-project/CHANGES.txt | 3 + .../security/ContainerTokenIdentifier.java | 34 +++++++++++ .../ContainerManagerImpl.java | 26 ++++++-- .../application/ApplicationImpl.java | 18 +++++- .../application/ApplicationInitEvent.java | 13 ++++ .../event/LogHandlerAppStartedEvent.java | 13 ++++ .../yarn_server_nodemanager_recovery.proto | 1 + .../TestContainerManager.java | 12 +++- .../TestContainerManagerRecovery.java | 25 +++++++- .../SchedulerApplicationAttempt.java | 6 +- .../RMContainerTokenSecretManager.java | 23 ++++++- .../yarn/server/resourcemanager/MockRM.java | 22 +++++-- .../capacity/TestContainerAllocation.java | 60 ++++++++++++++++++- 13 files changed, 239 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c33bf45618..4e71c1bd80 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -97,6 +97,9 @@ Release 2.6.0 - UNRELEASED YARN-2102. Added the concept of a Timeline Domain to handle read/write ACLs on Timeline service event data. (Zhijie Shen via vinodkv) + YARN-2581. Passed LogAggregationContext to NM via ContainerTokenIdentifier. + (Xuan Gong via zjshen) + IMPROVEMENTS YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index ca847e0726..0bb016a053 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -34,8 +34,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto; /** * TokenIdentifier for a container. Encodes {@link ContainerId}, @@ -59,10 +62,19 @@ public class ContainerTokenIdentifier extends TokenIdentifier { private long rmIdentifier; private Priority priority; private long creationTime; + private LogAggregationContext logAggregationContext; public ContainerTokenIdentifier(ContainerId containerID, String hostName, String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, long rmIdentifier, Priority priority, long creationTime) { + this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId, + rmIdentifier, priority, creationTime, null); + } + + public ContainerTokenIdentifier(ContainerId containerID, String hostName, + String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId, + long rmIdentifier, Priority priority, long creationTime, + LogAggregationContext logAggregationContext) { this.containerId = containerID; this.nmHostAddr = hostName; this.appSubmitter = appSubmitter; @@ -72,6 +84,7 @@ public ContainerTokenIdentifier(ContainerId containerID, this.rmIdentifier = rmIdentifier; this.priority = priority; this.creationTime = creationTime; + this.logAggregationContext = logAggregationContext; } /** @@ -119,6 +132,10 @@ public long getRMIdentifer() { return this.rmIdentifier; } + public LogAggregationContext getLogAggregationContext() { + return this.logAggregationContext; + } + @Override public void write(DataOutput out) throws IOException { LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this); @@ -138,6 +155,15 @@ public void write(DataOutput out) throws IOException { out.writeLong(this.rmIdentifier); out.writeInt(this.priority.getPriority()); out.writeLong(this.creationTime); + if (this.logAggregationContext == null) { + out.writeInt(-1); + } else { + byte[] logAggregationContext = + ((LogAggregationContextPBImpl) this.logAggregationContext).getProto() + .toByteArray(); + out.writeInt(logAggregationContext.length); + out.write(logAggregationContext); + } } @Override @@ -158,6 +184,14 @@ public void readFields(DataInput in) throws IOException { this.rmIdentifier = in.readLong(); this.priority = Priority.newInstance(in.readInt()); this.creationTime = in.readLong(); + int size = in.readInt(); + if (size != -1) { + byte[] bytes = new byte[size]; + in.readFully(bytes); + this.logAggregationContext = + new LogAggregationContextPBImpl( + LogAggregationContextProto.parseFrom(bytes)); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 12166e0fa6..17c9e3e26b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -72,9 +72,11 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -275,11 +277,17 @@ private void recoverApplication(ContainerManagerApplicationProto p) aclProto.getAcl()); } + LogAggregationContext logAggregationContext = null; + if (p.getLogAggregationContext() != null) { + logAggregationContext = + new LogAggregationContextPBImpl(p.getLogAggregationContext()); + } + LOG.info("Recovering application " + appId); ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId, creds, context); context.getApplications().put(appId, app); - app.handle(new ApplicationInitEvent(appId, acls)); + app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } @SuppressWarnings("unchecked") @@ -719,13 +727,19 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier, private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, String user, Credentials credentials, - Map appAcls) { + Map appAcls, + LogAggregationContext logAggregationContext) { ContainerManagerApplicationProto.Builder builder = ContainerManagerApplicationProto.newBuilder(); builder.setId(((ApplicationIdPBImpl) appId).getProto()); builder.setUser(user); + if (logAggregationContext != null) { + builder.setLogAggregationContext(( + (LogAggregationContextPBImpl)logAggregationContext).getProto()); + } + builder.clearCredentials(); if (credentials != null) { DataOutputBuffer dob = new DataOutputBuffer(); @@ -826,12 +840,16 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, if (null == context.getApplications().putIfAbsent(applicationID, application)) { LOG.info("Creating a new application reference for app " + applicationID); + LogAggregationContext logAggregationContext = + containerTokenIdentifier.getLogAggregationContext(); Map appAcls = container.getLaunchContext().getApplicationACLs(); context.getNMStateStore().storeApplication(applicationID, - buildAppProto(applicationID, user, credentials, appAcls)); + buildAppProto(applicationID, user, credentials, appAcls, + logAggregationContext)); dispatcher.getEventHandler().handle( - new ApplicationInitEvent(applicationID, appAcls)); + new ApplicationInitEvent(applicationID, appAcls, + logAggregationContext)); } this.context.getNMStateStore().storeContainer(containerId, request); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index cc5544cc47..fbcd4a1990 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -54,6 +55,8 @@ import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import com.google.common.annotations.VisibleForTesting; + /** * The state machine for the representation of an Application * within the NodeManager. @@ -72,6 +75,8 @@ public class ApplicationImpl implements Application { private static final Log LOG = LogFactory.getLog(Application.class); + private LogAggregationContext logAggregationContext; + Map containers = new HashMap(); @@ -234,10 +239,11 @@ public void transition(ApplicationImpl app, ApplicationEvent event) { app.applicationACLs = initEvent.getApplicationACLs(); app.aclsManager.addApplication(app.getAppId(), app.applicationACLs); // Inform the logAggregator + app.logAggregationContext = initEvent.getLogAggregationContext(); app.dispatcher.getEventHandler().handle( new LogHandlerAppStartedEvent(app.appId, app.user, app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS, - app.applicationACLs)); + app.applicationACLs, app.logAggregationContext)); } } @@ -467,4 +473,14 @@ public void handle(ApplicationEvent event) { public String toString() { return appId.toString(); } + + @VisibleForTesting + public LogAggregationContext getLogAggregationContext() { + try { + this.readLock.lock(); + return this.logAggregationContext; + } finally { + this.readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java index 5746b6a708..097cfb5173 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java @@ -22,18 +22,31 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; public class ApplicationInitEvent extends ApplicationEvent { private final Map applicationACLs; + private final LogAggregationContext logAggregationContext; public ApplicationInitEvent(ApplicationId appId, Map acls) { + this(appId, acls, null); + } + + public ApplicationInitEvent(ApplicationId appId, + Map acls, + LogAggregationContext logAggregationContext) { super(appId, ApplicationEventType.INIT_APPLICATION); this.applicationACLs = acls; + this.logAggregationContext = logAggregationContext; } public Map getApplicationACLs() { return this.applicationACLs; } + + public LogAggregationContext getLogAggregationContext() { + return this.logAggregationContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java index 6c076748a4..993f69c061 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java @@ -23,6 +23,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; public class LogHandlerAppStartedEvent extends LogHandlerEvent { @@ -32,16 +33,25 @@ public class LogHandlerAppStartedEvent extends LogHandlerEvent { private final String user; private final Credentials credentials; private final Map appAcls; + private final LogAggregationContext logAggregationContext; public LogHandlerAppStartedEvent(ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy, Map appAcls) { + this(appId, user, credentials, retentionPolicy, appAcls, null); + } + + public LogHandlerAppStartedEvent(ApplicationId appId, String user, + Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy, + Map appAcls, + LogAggregationContext logAggregationContext) { super(LogHandlerEventType.APPLICATION_STARTED); this.applicationId = appId; this.user = user; this.credentials = credentials; this.retentionPolicy = retentionPolicy; this.appAcls = appAcls; + this.logAggregationContext = logAggregationContext; } public ApplicationId getApplicationId() { @@ -64,4 +74,7 @@ public Map getApplicationAcls() { return this.appAcls; } + public LogAggregationContext getLogAggregationContext() { + return this.logAggregationContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index e6f39f6c5f..d8fdd8b721 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -29,6 +29,7 @@ message ContainerManagerApplicationProto { optional string user = 2; optional bytes credentials = 3; repeated ApplicationACLMapProto acls = 4; + optional LogAggregationContextProto log_aggregation_context = 5; } message DeletionServiceDeleteTaskProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index f2109b519f..da39cb6553 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -795,11 +796,20 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier, NodeId nodeId, String user, NMContainerTokenSecretManager containerTokenSecretManager) throws IOException { + return createContainerToken(cId, rmIdentifier, nodeId, user, + containerTokenSecretManager, null); + } + + public static Token createContainerToken(ContainerId cId, long rmIdentifier, + NodeId nodeId, String user, + NMContainerTokenSecretManager containerTokenSecretManager, + LogAggregationContext logAggregationContext) + throws IOException { Resource r = BuilderUtils.newResource(1024, 1); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier(cId, nodeId.toString(), user, r, System.currentTimeMillis() + 100000L, 123, rmIdentifier, - Priority.newInstance(0), 0); + Priority.newInstance(0), 0, logAggregationContext); Token containerToken = BuilderUtils .newContainerToken(nodeId, containerTokenSecretManager diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 0319664c39..2c69843fd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; @@ -126,8 +128,12 @@ public void testApplicationRecovery() throws Exception { ContainerLaunchContext clc = ContainerLaunchContext.newInstance( localResources, containerEnv, containerCmds, serviceData, containerTokens, acls); + // create the logAggregationContext + LogAggregationContext logAggregationContext = + LogAggregationContext.newInstance("includePattern", "excludePattern", + 1000); StartContainersResponse startResponse = startContainer(context, cm, cid, - clc); + clc, logAggregationContext); assertTrue(startResponse.getFailedRequests().isEmpty()); assertEquals(1, context.getApplications().size()); Application app = context.getApplications().get(appId); @@ -157,6 +163,18 @@ public void testApplicationRecovery() throws Exception { assertEquals(1, context.getApplications().size()); app = context.getApplications().get(appId); assertNotNull(app); + + // check whether LogAggregationContext is recovered correctly + LogAggregationContext recovered = + ((ApplicationImpl) app).getLogAggregationContext(); + assertNotNull(recovered); + assertEquals(logAggregationContext.getRollingIntervalSeconds(), + recovered.getRollingIntervalSeconds()); + assertEquals(logAggregationContext.getIncludePattern(), + recovered.getIncludePattern()); + assertEquals(logAggregationContext.getExcludePattern(), + recovered.getExcludePattern()); + waitForAppState(app, ApplicationState.INITING); assertTrue(context.getApplicationACLsManager().checkAccess( UserGroupInformation.createRemoteUser(modUser), @@ -224,13 +242,14 @@ public void testApplicationRecovery() throws Exception { private StartContainersResponse startContainer(Context context, final ContainerManagerImpl cm, ContainerId cid, - ContainerLaunchContext clc) throws Exception { + ContainerLaunchContext clc, LogAggregationContext logAggregationContext) + throws Exception { UserGroupInformation user = UserGroupInformation.createRemoteUser( cid.getApplicationAttemptId().toString()); StartContainerRequest scReq = StartContainerRequest.newInstance( clc, TestContainerManager.createContainerToken(cid, 0, context.getNodeId(), user.getShortUserName(), - context.getContainerTokenSecretManager())); + context.getContainerTokenSecretManager(), logAggregationContext)); final List scReqList = new ArrayList(); scReqList.add(scReq); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index c9b0303df8..84975b6b84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -91,6 +92,7 @@ public class SchedulerApplicationAttempt { private Resource amResource = Resources.none(); private boolean unmanagedAM = true; private boolean amRunning = false; + private LogAggregationContext logAggregationContext; protected List newlyAllocatedContainers = new ArrayList(); @@ -138,6 +140,8 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, .getApplicationSubmissionContext(); if (appSubmissionContext != null) { unmanagedAM = appSubmissionContext.getUnmanagedAM(); + this.logAggregationContext = + appSubmissionContext.getLogAggregationContext(); } } } @@ -444,7 +448,7 @@ public List getNMTokenList() { container.setContainerToken(rmContext.getContainerTokenSecretManager() .createContainerToken(container.getId(), container.getNodeId(), getUser(), container.getResource(), container.getPriority(), - rmContainer.getCreationTime())); + rmContainer.getCreationTime(), this.logAggregationContext)); NMToken nmToken = rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), getApplicationAttemptId(), container); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java index 13943f8571..15dd1a9219 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -177,6 +178,25 @@ public void run() { public Token createContainerToken(ContainerId containerId, NodeId nodeId, String appSubmitter, Resource capability, Priority priority, long createTime) { + return createContainerToken(containerId, nodeId, appSubmitter, capability, + priority, createTime, null); + } + + /** + * Helper function for creating ContainerTokens + * + * @param containerId + * @param nodeId + * @param appSubmitter + * @param capability + * @param priority + * @param createTime + * @param logAggregationContext + * @return the container-token + */ + public Token createContainerToken(ContainerId containerId, NodeId nodeId, + String appSubmitter, Resource capability, Priority priority, + long createTime, LogAggregationContext logAggregationContext) { byte[] password; ContainerTokenIdentifier tokenIdentifier; long expiryTimeStamp = @@ -189,7 +209,8 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId, new ContainerTokenIdentifier(containerId, nodeId.toString(), appSubmitter, capability, expiryTimeStamp, this.currentMasterKey .getMasterKey().getKeyId(), - ResourceManager.getClusterTimeStamp(), priority, createTime); + ResourceManager.getClusterTimeStamp(), priority, createTime, + logAggregationContext); password = this.createPassword(tokenIdentifier); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 1338a6ccfd..4f5fdebfbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; @@ -278,7 +279,7 @@ public RMApp submitApp(int masterMemory, String name, String user, boolean waitForAccepted, boolean keepContainers) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - false, null, 0); + false, null, 0, null); } public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) @@ -287,7 +288,7 @@ public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval) .getShortUserName(), null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, - false, null, attemptFailuresValidityInterval); + false, null, attemptFailuresValidityInterval, null); } public RMApp submitApp(int masterMemory, String name, String user, @@ -297,14 +298,24 @@ public RMApp submitApp(int masterMemory, String name, String user, ApplicationId applicationId) throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted, keepContainers, - isAppIdProvided, applicationId, 0); + isAppIdProvided, applicationId, 0, null); } + public RMApp submitApp(int masterMemory, + LogAggregationContext logAggregationContext) throws Exception { + return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser() + .getShortUserName(), null, false, null, + super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false, + false, null, 0, logAggregationContext); + } + public RMApp submitApp(int masterMemory, String name, String user, Map acls, boolean unmanaged, String queue, int maxAppAttempts, Credentials ts, String appType, boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, - ApplicationId applicationId, long attemptFailuresValidityInterval) + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); @@ -342,6 +353,9 @@ public RMApp submitApp(int masterMemory, String name, String user, } sub.setAMContainerSpec(clc); sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); + if (logAggregationContext != null) { + sub.setLogAggregationContext(logAggregationContext); + } req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"}); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index a9bfc2fc93..85ef381a01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -28,12 +28,14 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -195,6 +198,58 @@ public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{ Assert.assertEquals(1, containers.size()); } + // This is to test whether LogAggregationContext is passed into + // container tokens correctly + @Test + public void testLogAggregationContextPassedIntoContainerToken() + throws Exception { + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000); + MockNM nm2 = rm1.registerNode("127.0.0.1:2345", 8000); + // LogAggregationContext is set as null + Assert + .assertNull(getLogAggregationContextFromContainerToken(rm1, nm1, null)); + + // create a not-null LogAggregationContext + final int interval = 2000; + LogAggregationContext logAggregationContext = + LogAggregationContext.newInstance( + "includePattern", "excludePattern", interval); + LogAggregationContext returned = + getLogAggregationContextFromContainerToken(rm1, nm2, + logAggregationContext); + Assert.assertEquals("includePattern", returned.getIncludePattern()); + Assert.assertEquals("excludePattern", returned.getExcludePattern()); + Assert.assertEquals(interval, returned.getRollingIntervalSeconds()); + rm1.stop(); + } + + private LogAggregationContext getLogAggregationContextFromContainerToken( + MockRM rm1, MockNM nm1, LogAggregationContext logAggregationContext) + throws Exception { + RMApp app2 = rm1.submitApp(200, logAggregationContext); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + nm1.nodeHeartbeat(true); + // request a container. + am2.allocate("127.0.0.1", 512, 1, new ArrayList()); + ContainerId containerId = + ContainerId.newInstance(am2.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId, RMContainerState.ALLOCATED); + + // acquire the container. + List containers = + am2.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + Assert.assertEquals(containerId, containers.get(0).getId()); + // container token is generated. + Assert.assertNotNull(containers.get(0).getContainerToken()); + ContainerTokenIdentifier token = + BuilderUtils.newContainerTokenIdentifier(containers.get(0) + .getContainerToken()); + return token.getLogAggregationContext(); + } + private volatile int numRetries = 0; private class TestRMSecretManagerService extends RMSecretManagerService { @@ -210,10 +265,11 @@ protected RMContainerTokenSecretManager createContainerTokenSecretManager( @Override public Token createContainerToken(ContainerId containerId, NodeId nodeId, String appSubmitter, Resource capability, - Priority priority, long createTime) { + Priority priority, long createTime, + LogAggregationContext logAggregationContext) { numRetries++; return super.createContainerToken(containerId, nodeId, appSubmitter, - capability, priority, createTime); + capability, priority, createTime, logAggregationContext); } }; }