YARN-2581. Passed LogAggregationContext to NM via ContainerTokenIdentifier. Contributed by Xuan Gong.

This commit is contained in:
Zhijie Shen 2014-09-24 17:50:26 -07:00
parent 116f83157a
commit c86674a3a4
13 changed files with 239 additions and 17 deletions

View File

@ -97,6 +97,9 @@ Release 2.6.0 - UNRELEASED
YARN-2102. Added the concept of a Timeline Domain to handle read/write ACLs
on Timeline service event data. (Zhijie Shen via vinodkv)
YARN-2581. Passed LogAggregationContext to NM via ContainerTokenIdentifier.
(Xuan Gong via zjshen)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

View File

@ -34,8 +34,11 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
/**
* TokenIdentifier for a container. Encodes {@link ContainerId},
@ -59,10 +62,19 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
private long rmIdentifier;
private Priority priority;
private long creationTime;
private LogAggregationContext logAggregationContext;
public ContainerTokenIdentifier(ContainerId containerID,
String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
int masterKeyId, long rmIdentifier, Priority priority, long creationTime) {
this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
rmIdentifier, priority, creationTime, null);
}
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext) {
this.containerId = containerID;
this.nmHostAddr = hostName;
this.appSubmitter = appSubmitter;
@ -72,6 +84,7 @@ public ContainerTokenIdentifier(ContainerId containerID,
this.rmIdentifier = rmIdentifier;
this.priority = priority;
this.creationTime = creationTime;
this.logAggregationContext = logAggregationContext;
}
/**
@ -119,6 +132,10 @@ public long getRMIdentifer() {
return this.rmIdentifier;
}
public LogAggregationContext getLogAggregationContext() {
return this.logAggregationContext;
}
@Override
public void write(DataOutput out) throws IOException {
LOG.debug("Writing ContainerTokenIdentifier to RPC layer: " + this);
@ -138,6 +155,15 @@ public void write(DataOutput out) throws IOException {
out.writeLong(this.rmIdentifier);
out.writeInt(this.priority.getPriority());
out.writeLong(this.creationTime);
if (this.logAggregationContext == null) {
out.writeInt(-1);
} else {
byte[] logAggregationContext =
((LogAggregationContextPBImpl) this.logAggregationContext).getProto()
.toByteArray();
out.writeInt(logAggregationContext.length);
out.write(logAggregationContext);
}
}
@Override
@ -158,6 +184,14 @@ public void readFields(DataInput in) throws IOException {
this.rmIdentifier = in.readLong();
this.priority = Priority.newInstance(in.readInt());
this.creationTime = in.readLong();
int size = in.readInt();
if (size != -1) {
byte[] bytes = new byte[size];
in.readFully(bytes);
this.logAggregationContext =
new LogAggregationContextPBImpl(
LogAggregationContextProto.parseFrom(bytes));
}
}
@Override

View File

@ -72,9 +72,11 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -275,11 +277,17 @@ private void recoverApplication(ContainerManagerApplicationProto p)
aclProto.getAcl());
}
LogAggregationContext logAggregationContext = null;
if (p.getLogAggregationContext() != null) {
logAggregationContext =
new LogAggregationContextPBImpl(p.getLogAggregationContext());
}
LOG.info("Recovering application " + appId);
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
creds, context);
context.getApplications().put(appId, app);
app.handle(new ApplicationInitEvent(appId, acls));
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
}
@SuppressWarnings("unchecked")
@ -719,13 +727,19 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials,
Map<ApplicationAccessType, String> appAcls) {
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
ContainerManagerApplicationProto.Builder builder =
ContainerManagerApplicationProto.newBuilder();
builder.setId(((ApplicationIdPBImpl) appId).getProto());
builder.setUser(user);
if (logAggregationContext != null) {
builder.setLogAggregationContext((
(LogAggregationContextPBImpl)logAggregationContext).getProto());
}
builder.clearCredentials();
if (credentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
@ -826,12 +840,16 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
if (null == context.getApplications().putIfAbsent(applicationID,
application)) {
LOG.info("Creating a new application reference for app " + applicationID);
LogAggregationContext logAggregationContext =
containerTokenIdentifier.getLogAggregationContext();
Map<ApplicationAccessType, String> appAcls =
container.getLaunchContext().getApplicationACLs();
context.getNMStateStore().storeApplication(applicationID,
buildAppProto(applicationID, user, credentials, appAcls));
buildAppProto(applicationID, user, credentials, appAcls,
logAggregationContext));
dispatcher.getEventHandler().handle(
new ApplicationInitEvent(applicationID, appAcls));
new ApplicationInitEvent(applicationID, appAcls,
logAggregationContext));
}
this.context.getNMStateStore().storeContainer(containerId, request);

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -54,6 +55,8 @@
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* The state machine for the representation of an Application
* within the NodeManager.
@ -72,6 +75,8 @@ public class ApplicationImpl implements Application {
private static final Log LOG = LogFactory.getLog(Application.class);
private LogAggregationContext logAggregationContext;
Map<ContainerId, Container> containers =
new HashMap<ContainerId, Container>();
@ -234,10 +239,11 @@ public void transition(ApplicationImpl app, ApplicationEvent event) {
app.applicationACLs = initEvent.getApplicationACLs();
app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
// Inform the logAggregator
app.logAggregationContext = initEvent.getLogAggregationContext();
app.dispatcher.getEventHandler().handle(
new LogHandlerAppStartedEvent(app.appId, app.user,
app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
app.applicationACLs));
app.applicationACLs, app.logAggregationContext));
}
}
@ -467,4 +473,14 @@ public void handle(ApplicationEvent event) {
public String toString() {
return appId.toString();
}
@VisibleForTesting
public LogAggregationContext getLogAggregationContext() {
try {
this.readLock.lock();
return this.logAggregationContext;
} finally {
this.readLock.unlock();
}
}
}

View File

@ -22,18 +22,31 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
public class ApplicationInitEvent extends ApplicationEvent {
private final Map<ApplicationAccessType, String> applicationACLs;
private final LogAggregationContext logAggregationContext;
public ApplicationInitEvent(ApplicationId appId,
Map<ApplicationAccessType, String> acls) {
this(appId, acls, null);
}
public ApplicationInitEvent(ApplicationId appId,
Map<ApplicationAccessType, String> acls,
LogAggregationContext logAggregationContext) {
super(appId, ApplicationEventType.INIT_APPLICATION);
this.applicationACLs = acls;
this.logAggregationContext = logAggregationContext;
}
public Map<ApplicationAccessType, String> getApplicationACLs() {
return this.applicationACLs;
}
public LogAggregationContext getLogAggregationContext() {
return this.logAggregationContext;
}
}

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
public class LogHandlerAppStartedEvent extends LogHandlerEvent {
@ -32,16 +33,25 @@ public class LogHandlerAppStartedEvent extends LogHandlerEvent {
private final String user;
private final Credentials credentials;
private final Map<ApplicationAccessType, String> appAcls;
private final LogAggregationContext logAggregationContext;
public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
this(appId, user, credentials, retentionPolicy, appAcls, null);
}
public LogHandlerAppStartedEvent(ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext) {
super(LogHandlerEventType.APPLICATION_STARTED);
this.applicationId = appId;
this.user = user;
this.credentials = credentials;
this.retentionPolicy = retentionPolicy;
this.appAcls = appAcls;
this.logAggregationContext = logAggregationContext;
}
public ApplicationId getApplicationId() {
@ -64,4 +74,7 @@ public Map<ApplicationAccessType, String> getApplicationAcls() {
return this.appAcls;
}
public LogAggregationContext getLogAggregationContext() {
return this.logAggregationContext;
}
}

View File

@ -29,6 +29,7 @@ message ContainerManagerApplicationProto {
optional string user = 2;
optional bytes credentials = 3;
repeated ApplicationACLMapProto acls = 4;
optional LogAggregationContextProto log_aggregation_context = 5;
}
message DeletionServiceDeleteTaskProto {

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -795,11 +796,20 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager)
throws IOException {
return createContainerToken(cId, rmIdentifier, nodeId, user,
containerTokenSecretManager, null);
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager,
LogAggregationContext logAggregationContext)
throws IOException {
Resource r = BuilderUtils.newResource(1024, 1);
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0);
Priority.newInstance(0), 0, logAggregationContext);
Token containerToken =
BuilderUtils
.newContainerToken(nodeId, containerTokenSecretManager

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
@ -58,6 +59,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
@ -126,8 +128,12 @@ public void testApplicationRecovery() throws Exception {
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
localResources, containerEnv, containerCmds, serviceData,
containerTokens, acls);
// create the logAggregationContext
LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance("includePattern", "excludePattern",
1000);
StartContainersResponse startResponse = startContainer(context, cm, cid,
clc);
clc, logAggregationContext);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
Application app = context.getApplications().get(appId);
@ -157,6 +163,18 @@ public void testApplicationRecovery() throws Exception {
assertEquals(1, context.getApplications().size());
app = context.getApplications().get(appId);
assertNotNull(app);
// check whether LogAggregationContext is recovered correctly
LogAggregationContext recovered =
((ApplicationImpl) app).getLogAggregationContext();
assertNotNull(recovered);
assertEquals(logAggregationContext.getRollingIntervalSeconds(),
recovered.getRollingIntervalSeconds());
assertEquals(logAggregationContext.getIncludePattern(),
recovered.getIncludePattern());
assertEquals(logAggregationContext.getExcludePattern(),
recovered.getExcludePattern());
waitForAppState(app, ApplicationState.INITING);
assertTrue(context.getApplicationACLsManager().checkAccess(
UserGroupInformation.createRemoteUser(modUser),
@ -224,13 +242,14 @@ public void testApplicationRecovery() throws Exception {
private StartContainersResponse startContainer(Context context,
final ContainerManagerImpl cm, ContainerId cid,
ContainerLaunchContext clc) throws Exception {
ContainerLaunchContext clc, LogAggregationContext logAggregationContext)
throws Exception {
UserGroupInformation user = UserGroupInformation.createRemoteUser(
cid.getApplicationAttemptId().toString());
StartContainerRequest scReq = StartContainerRequest.newInstance(
clc, TestContainerManager.createContainerToken(cid, 0,
context.getNodeId(), user.getShortUserName(),
context.getContainerTokenSecretManager()));
context.getContainerTokenSecretManager(), logAggregationContext));
final List<StartContainerRequest> scReqList =
new ArrayList<StartContainerRequest>();
scReqList.add(scReq);

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@ -91,6 +92,7 @@ public class SchedulerApplicationAttempt {
private Resource amResource = Resources.none();
private boolean unmanagedAM = true;
private boolean amRunning = false;
private LogAggregationContext logAggregationContext;
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
@ -138,6 +140,8 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
.getApplicationSubmissionContext();
if (appSubmissionContext != null) {
unmanagedAM = appSubmissionContext.getUnmanagedAM();
this.logAggregationContext =
appSubmissionContext.getLogAggregationContext();
}
}
}
@ -444,7 +448,7 @@ public List<NMToken> getNMTokenList() {
container.setContainerToken(rmContext.getContainerTokenSecretManager()
.createContainerToken(container.getId(), container.getNodeId(),
getUser(), container.getResource(), container.getPriority(),
rmContainer.getCreationTime()));
rmContainer.getCreationTime(), this.logAggregationContext));
NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container);

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -177,6 +178,25 @@ public void run() {
public Token createContainerToken(ContainerId containerId, NodeId nodeId,
String appSubmitter, Resource capability, Priority priority,
long createTime) {
return createContainerToken(containerId, nodeId, appSubmitter, capability,
priority, createTime, null);
}
/**
* Helper function for creating ContainerTokens
*
* @param containerId
* @param nodeId
* @param appSubmitter
* @param capability
* @param priority
* @param createTime
* @param logAggregationContext
* @return the container-token
*/
public Token createContainerToken(ContainerId containerId, NodeId nodeId,
String appSubmitter, Resource capability, Priority priority,
long createTime, LogAggregationContext logAggregationContext) {
byte[] password;
ContainerTokenIdentifier tokenIdentifier;
long expiryTimeStamp =
@ -189,7 +209,8 @@ public Token createContainerToken(ContainerId containerId, NodeId nodeId,
new ContainerTokenIdentifier(containerId, nodeId.toString(),
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
.getMasterKey().getKeyId(),
ResourceManager.getClusterTimeStamp(), priority, createTime);
ResourceManager.getClusterTimeStamp(), priority, createTime,
logAggregationContext);
password = this.createPassword(tokenIdentifier);
} finally {

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
@ -278,7 +279,7 @@ public RMApp submitApp(int masterMemory, String name, String user,
boolean waitForAccepted, boolean keepContainers) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
false, null, 0);
false, null, 0, null);
}
public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
@ -287,7 +288,7 @@ public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval)
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, attemptFailuresValidityInterval);
false, null, attemptFailuresValidityInterval, null);
}
public RMApp submitApp(int masterMemory, String name, String user,
@ -297,14 +298,24 @@ public RMApp submitApp(int masterMemory, String name, String user,
ApplicationId applicationId) throws Exception {
return submitApp(masterMemory, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, 0);
isAppIdProvided, applicationId, 0, null);
}
public RMApp submitApp(int masterMemory,
LogAggregationContext logAggregationContext) throws Exception {
return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
.getShortUserName(), null, false, null,
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
false, null, 0, logAggregationContext);
}
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval)
ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext)
throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
@ -342,6 +353,9 @@ public RMApp submitApp(int masterMemory, String name, String user,
}
sub.setAMContainerSpec(clc);
sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
if (logAggregationContext != null) {
sub.setLogAggregationContext(logAggregationContext);
}
req.setApplicationSubmissionContext(sub);
UserGroupInformation fakeUser =
UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});

View File

@ -28,12 +28,14 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -47,6 +49,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -195,6 +198,58 @@ public void testNormalContainerAllocationWhenDNSUnavailable() throws Exception{
Assert.assertEquals(1, containers.size());
}
// This is to test whether LogAggregationContext is passed into
// container tokens correctly
@Test
public void testLogAggregationContextPassedIntoContainerToken()
throws Exception {
MockRM rm1 = new MockRM(conf);
rm1.start();
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
MockNM nm2 = rm1.registerNode("127.0.0.1:2345", 8000);
// LogAggregationContext is set as null
Assert
.assertNull(getLogAggregationContextFromContainerToken(rm1, nm1, null));
// create a not-null LogAggregationContext
final int interval = 2000;
LogAggregationContext logAggregationContext =
LogAggregationContext.newInstance(
"includePattern", "excludePattern", interval);
LogAggregationContext returned =
getLogAggregationContextFromContainerToken(rm1, nm2,
logAggregationContext);
Assert.assertEquals("includePattern", returned.getIncludePattern());
Assert.assertEquals("excludePattern", returned.getExcludePattern());
Assert.assertEquals(interval, returned.getRollingIntervalSeconds());
rm1.stop();
}
private LogAggregationContext getLogAggregationContextFromContainerToken(
MockRM rm1, MockNM nm1, LogAggregationContext logAggregationContext)
throws Exception {
RMApp app2 = rm1.submitApp(200, logAggregationContext);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
nm1.nodeHeartbeat(true);
// request a container.
am2.allocate("127.0.0.1", 512, 1, new ArrayList<ContainerId>());
ContainerId containerId =
ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId, RMContainerState.ALLOCATED);
// acquire the container.
List<Container> containers =
am2.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
Assert.assertEquals(containerId, containers.get(0).getId());
// container token is generated.
Assert.assertNotNull(containers.get(0).getContainerToken());
ContainerTokenIdentifier token =
BuilderUtils.newContainerTokenIdentifier(containers.get(0)
.getContainerToken());
return token.getLogAggregationContext();
}
private volatile int numRetries = 0;
private class TestRMSecretManagerService extends RMSecretManagerService {
@ -210,10 +265,11 @@ protected RMContainerTokenSecretManager createContainerTokenSecretManager(
@Override
public Token createContainerToken(ContainerId containerId,
NodeId nodeId, String appSubmitter, Resource capability,
Priority priority, long createTime) {
Priority priority, long createTime,
LogAggregationContext logAggregationContext) {
numRetries++;
return super.createContainerToken(containerId, nodeId, appSubmitter,
capability, priority, createTime);
capability, priority, createTime, logAggregationContext);
}
};
}