YARN-8512. ATSv2 entities are not published to HBase from second attempt onwards. Contributed by Rohith Sharma K S.

This commit is contained in:
Sunil G 2018-07-11 12:26:32 +05:30
parent a47ec5dac4
commit 7f1d3d0e9d
4 changed files with 180 additions and 27 deletions

View File

@ -1102,24 +1102,8 @@ protected void startContainerInternal(
// Create the application
// populate the flow context from the launch context if the timeline
// service v.2 is enabled
FlowContext flowContext = null;
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
String flowName = launchContext.getEnvironment()
.get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
String flowVersion = launchContext.getEnvironment()
.get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
String flowRunIdStr = launchContext.getEnvironment()
.get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
long flowRunId = 0L;
if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
flowRunId = Long.parseLong(flowRunIdStr);
}
flowContext = new FlowContext(flowName, flowVersion, flowRunId);
if (LOG.isDebugEnabled()) {
LOG.debug("Flow context: " + flowContext
+ " created for an application " + applicationID);
}
}
FlowContext flowContext =
getFlowContext(launchContext, applicationID);
Application application =
new ApplicationImpl(dispatcher, user, flowContext,
@ -1138,6 +1122,31 @@ protected void startContainerInternal(
dispatcher.getEventHandler().handle(new ApplicationInitEvent(
applicationID, appAcls, logAggregationContext));
}
} else if (containerTokenIdentifier.getContainerType()
== ContainerType.APPLICATION_MASTER) {
FlowContext flowContext =
getFlowContext(launchContext, applicationID);
if (flowContext != null) {
ApplicationImpl application =
(ApplicationImpl) context.getApplications().get(applicationID);
// update flowContext reference in ApplicationImpl
application.setFlowContext(flowContext);
// Required to update state store for recovery.
context.getNMStateStore().storeApplication(applicationID,
buildAppProto(applicationID, user, credentials,
container.getLaunchContext().getApplicationACLs(),
containerTokenIdentifier.getLogAggregationContext(),
flowContext));
LOG.info(
"Updated application reference with flowContext " + flowContext
+ " for app " + applicationID);
} else {
LOG.info("TimelineService V2.0 is not enabled. Skipping updating "
+ "flowContext for application " + applicationID);
}
}
this.context.getNMStateStore().storeContainer(containerId,
@ -1163,6 +1172,30 @@ protected void startContainerInternal(
}
}
private FlowContext getFlowContext(ContainerLaunchContext launchContext,
ApplicationId applicationID) {
FlowContext flowContext = null;
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
String flowName = launchContext.getEnvironment()
.get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
String flowVersion = launchContext.getEnvironment()
.get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
String flowRunIdStr = launchContext.getEnvironment()
.get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
long flowRunId = 0L;
if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
flowRunId = Long.parseLong(flowRunIdStr);
}
flowContext = new FlowContext(flowName, flowVersion, flowRunId);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Flow context: " + flowContext + " created for an application "
+ applicationID);
}
}
return flowContext;
}
protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
org.apache.hadoop.yarn.api.records.Token token,
ContainerTokenIdentifier containerTokenIdentifier) throws YarnException,

View File

@ -25,6 +25,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,7 +68,6 @@
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* The state machine for the representation of an Application
@ -688,4 +689,8 @@ public String getFlowVersion() {
public long getFlowRunId() {
return flowContext == null ? 0L : flowContext.getFlowRunId();
}
public void setFlowContext(FlowContext fc) {
this.flowContext = fc;
}
}

View File

@ -428,6 +428,16 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
containerTokenSecretManager, logAggregationContext);
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user,
NMContainerTokenSecretManager containerTokenSecretManager,
LogAggregationContext logAggregationContext, ContainerType containerType)
throws IOException {
Resource r = BuilderUtils.newResource(1024, 1);
return createContainerToken(cId, rmIdentifier, nodeId, user, r,
containerTokenSecretManager, logAggregationContext, containerType);
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,
@ -442,6 +452,21 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
containerTokenIdentifier);
}
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,
LogAggregationContext logAggregationContext, ContainerType continerType)
throws IOException {
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0, logAggregationContext, null,
continerType);
return BuilderUtils.newContainerToken(nodeId,
containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
containerTokenIdentifier);
}
public static Token createContainerToken(ContainerId cId, int version,
long rmIdentifier, NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,

View File

@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
@ -74,6 +75,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
@ -205,7 +207,7 @@ public void testApplicationRecovery() throws Exception {
"includePatternInRollingAggregation",
"excludePatternInRollingAggregation");
StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
clc, logAggregationContext, ContainerType.TASK);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
Application app = context.getApplications().get(appId);
@ -342,7 +344,7 @@ public void testNMRecoveryForAppFinishedWithLogAggregationFailure()
null, null);
StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, null);
clc, null, ContainerType.TASK);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
Application app = context.getApplications().get(appId);
@ -579,7 +581,7 @@ public void testContainerCleanupOnShutdown() throws Exception {
cm.init(conf);
cm.start();
StartContainersResponse startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
clc, logAggregationContext, ContainerType.TASK);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
verify(cm).handle(isA(CMgrCompletedAppsEvent.class));
@ -595,7 +597,7 @@ public void testContainerCleanupOnShutdown() throws Exception {
cm.init(conf);
cm.start();
startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
clc, logAggregationContext, ContainerType.TASK);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
memStore.close();
@ -612,7 +614,7 @@ public void testContainerCleanupOnShutdown() throws Exception {
cm.init(conf);
cm.start();
startResponse = startContainer(context, cm, cid,
clc, logAggregationContext);
clc, logAggregationContext, ContainerType.TASK);
assertEquals(1, startResponse.getSuccessfullyStartedContainers().size());
cm.stop();
memStore.close();
@ -661,7 +663,7 @@ private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
localResources, containerEnv, commands, serviceData,
containerTokens, acls);
StartContainersResponse startResponse = startContainer(
context, cm, cid, clc, null);
context, cm, cid, clc, null, ContainerType.TASK);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
// make sure the container reaches RUNNING state
@ -736,14 +738,15 @@ public int getHttpPort() {
private StartContainersResponse startContainer(Context context,
final ContainerManagerImpl cm, ContainerId cid,
ContainerLaunchContext clc, LogAggregationContext logAggregationContext)
ContainerLaunchContext clc, LogAggregationContext logAggregationContext,
ContainerType containerType)
throws Exception {
UserGroupInformation user = UserGroupInformation.createRemoteUser(
cid.getApplicationAttemptId().toString());
StartContainerRequest scReq = StartContainerRequest.newInstance(
clc, TestContainerManager.createContainerToken(cid, 0,
context.getNodeId(), user.getShortUserName(),
context.getContainerTokenSecretManager(), logAggregationContext));
context.getContainerTokenSecretManager(), logAggregationContext, containerType));
final List<StartContainerRequest> scReqList =
new ArrayList<StartContainerRequest>();
scReqList.add(scReq);
@ -910,4 +913,91 @@ private static void setFlowTags(Map<String, String> environment,
}
}
@Test
public void testApplicationRecoveryAfterFlowContextUpdated()
throws Exception {
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
NMStateStoreService stateStore = new NMMemoryStateStoreService();
stateStore.init(conf);
stateStore.start();
Context context = createContext(conf, stateStore);
ContainerManagerImpl cm = createContainerManager(context);
cm.init(conf);
cm.start();
// add an application by starting a container
String appName = "app_name1";
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
// create 1nd attempt container with containerId 2
ContainerId cid = ContainerId.newContainerId(attemptId, 2);
Map<String, LocalResource> localResources = Collections.emptyMap();
Map<String, String> containerEnv = new HashMap<>();
List<String> containerCmds = Collections.emptyList();
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
Credentials containerCreds = new Credentials();
DataOutputBuffer dob = new DataOutputBuffer();
containerCreds.writeTokenStorageToStream(dob);
ByteBuffer containerTokens =
ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
ContainerLaunchContext clc = ContainerLaunchContext
.newInstance(localResources, containerEnv, containerCmds, serviceData,
containerTokens, acls);
// create the logAggregationContext
LogAggregationContext logAggregationContext = LogAggregationContext
.newInstance("includePattern", "excludePattern",
"includePatternInRollingAggregation",
"excludePatternInRollingAggregation");
StartContainersResponse startResponse =
startContainer(context, cm, cid, clc, logAggregationContext,
ContainerType.TASK);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
ApplicationImpl app =
(ApplicationImpl) context.getApplications().get(appId);
assertNotNull(app);
waitForAppState(app, ApplicationState.INITING);
assertNull(app.getFlowName());
// 2nd attempt
ApplicationAttemptId attemptId2 =
ApplicationAttemptId.newInstance(appId, 2);
// create 2nd attempt master container
ContainerId cid2 = ContainerId.newContainerId(attemptId, 1);
setFlowContext(containerEnv, appName, appId);
// once again create for updating launch context
clc = ContainerLaunchContext
.newInstance(localResources, containerEnv, containerCmds, serviceData,
containerTokens, acls);
// start container with container type AM.
startResponse =
startContainer(context, cm, cid2, clc, logAggregationContext,
ContainerType.APPLICATION_MASTER);
assertTrue(startResponse.getFailedRequests().isEmpty());
assertEquals(1, context.getApplications().size());
waitForAppState(app, ApplicationState.INITING);
assertEquals(appName, app.getFlowName());
// reset container manager and verify flow context information
cm.stop();
context = createContext(conf, stateStore);
cm = createContainerManager(context);
cm.init(conf);
cm.start();
assertEquals(1, context.getApplications().size());
app = (ApplicationImpl) context.getApplications().get(appId);
assertNotNull(app);
assertEquals(appName, app.getFlowName());
waitForAppState(app, ApplicationState.INITING);
cm.stop();
}
}