YARN-1372. Ensure all completed containers are reported to the AMs across RM restart. Contributed by Anubhav Dhoot

This commit is contained in:
Jian He 2014-09-22 10:30:53 -07:00
parent 376233cdd4
commit 0a641496c7
21 changed files with 701 additions and 180 deletions

View File

@ -238,6 +238,9 @@ Release 2.6.0 - UNRELEASED
YARN-2001. Added a time threshold for RM to wait before starting container YARN-2001. Added a time threshold for RM to wait before starting container
allocations after restart/failover. (Jian He via vinodkv) allocations after restart/failover. (Jian He via vinodkv)
YARN-1372. Ensure all completed containers are reported to the AMs across
RM restart. (Anubhav Dhoot via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -30,6 +30,7 @@ public interface NodeHeartbeatResponse {
NodeAction getNodeAction(); NodeAction getNodeAction();
List<ContainerId> getContainersToCleanup(); List<ContainerId> getContainersToCleanup();
List<ContainerId> getFinishedContainersPulledByAM();
List<ApplicationId> getApplicationsToCleanup(); List<ApplicationId> getApplicationsToCleanup();
@ -43,6 +44,10 @@ public interface NodeHeartbeatResponse {
void setNMTokenMasterKey(MasterKey secretKey); void setNMTokenMasterKey(MasterKey secretKey);
void addAllContainersToCleanup(List<ContainerId> containers); void addAllContainersToCleanup(List<ContainerId> containers);
// This tells NM to remove finished containers only after the AM
// has actually received it in a previous allocate response
void addFinishedContainersPulledByAM(List<ContainerId> containers);
void addAllApplicationsToCleanup(List<ApplicationId> applications); void addAllApplicationsToCleanup(List<ApplicationId> applications);

View File

@ -46,6 +46,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
boolean viaProto = false; boolean viaProto = false;
private List<ContainerId> containersToCleanup = null; private List<ContainerId> containersToCleanup = null;
private List<ContainerId> finishedContainersPulledByAM = null;
private List<ApplicationId> applicationsToCleanup = null; private List<ApplicationId> applicationsToCleanup = null;
private MasterKey containerTokenMasterKey = null; private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null; private MasterKey nmTokenMasterKey = null;
@ -73,6 +74,9 @@ private void mergeLocalToBuilder() {
if (this.applicationsToCleanup != null) { if (this.applicationsToCleanup != null) {
addApplicationsToCleanupToProto(); addApplicationsToCleanupToProto();
} }
if (this.finishedContainersPulledByAM != null) {
addFinishedContainersPulledByAMToProto();
}
if (this.containerTokenMasterKey != null) { if (this.containerTokenMasterKey != null) {
builder.setContainerTokenMasterKey( builder.setContainerTokenMasterKey(
convertToProtoFormat(this.containerTokenMasterKey)); convertToProtoFormat(this.containerTokenMasterKey));
@ -199,6 +203,12 @@ public List<ContainerId> getContainersToCleanup() {
return this.containersToCleanup; return this.containersToCleanup;
} }
@Override
public List<ContainerId> getFinishedContainersPulledByAM() {
initFinishedContainersPulledByAM();
return this.finishedContainersPulledByAM;
}
private void initContainersToCleanup() { private void initContainersToCleanup() {
if (this.containersToCleanup != null) { if (this.containersToCleanup != null) {
return; return;
@ -212,6 +222,19 @@ private void initContainersToCleanup() {
} }
} }
private void initFinishedContainersPulledByAM() {
if (this.finishedContainersPulledByAM != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerIdProto> list = p.getFinishedContainersPulledByAmList();
this.finishedContainersPulledByAM = new ArrayList<ContainerId>();
for (ContainerIdProto c : list) {
this.finishedContainersPulledByAM.add(convertFromProtoFormat(c));
}
}
@Override @Override
public void addAllContainersToCleanup( public void addAllContainersToCleanup(
final List<ContainerId> containersToCleanup) { final List<ContainerId> containersToCleanup) {
@ -221,6 +244,15 @@ public void addAllContainersToCleanup(
this.containersToCleanup.addAll(containersToCleanup); this.containersToCleanup.addAll(containersToCleanup);
} }
@Override
public void addFinishedContainersPulledByAM(
final List<ContainerId> finishedContainersPulledByAM) {
if (finishedContainersPulledByAM == null)
return;
initFinishedContainersPulledByAM();
this.finishedContainersPulledByAM.addAll(finishedContainersPulledByAM);
}
private void addContainersToCleanupToProto() { private void addContainersToCleanupToProto() {
maybeInitBuilder(); maybeInitBuilder();
builder.clearContainersToCleanup(); builder.clearContainersToCleanup();
@ -256,6 +288,41 @@ public void remove() {
builder.addAllContainersToCleanup(iterable); builder.addAllContainersToCleanup(iterable);
} }
private void addFinishedContainersPulledByAMToProto() {
maybeInitBuilder();
builder.clearFinishedContainersPulledByAm();
if (finishedContainersPulledByAM == null)
return;
Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
@Override
public Iterator<ContainerIdProto> iterator() {
return new Iterator<ContainerIdProto>() {
Iterator<ContainerId> iter = finishedContainersPulledByAM.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerIdProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllFinishedContainersPulledByAm(iterable);
}
@Override @Override
public List<ApplicationId> getApplicationsToCleanup() { public List<ApplicationId> getApplicationsToCleanup() {
initApplicationsToCleanup(); initApplicationsToCleanup();

View File

@ -58,6 +58,7 @@ message NodeHeartbeatResponseProto {
repeated ApplicationIdProto applications_to_cleanup = 6; repeated ApplicationIdProto applications_to_cleanup = 6;
optional int64 nextHeartBeatInterval = 7; optional int64 nextHeartBeatInterval = 7;
optional string diagnostics_message = 8; optional string diagnostics_message = 8;
repeated ContainerIdProto finished_containers_pulled_by_am = 9;
} }
message NMContainerStatusProto { message NMContainerStatusProto {

View File

@ -311,7 +311,7 @@ public void run() {
public static class NMContext implements Context { public static class NMContext implements Context {
private NodeId nodeId = null; private NodeId nodeId = null;
private final ConcurrentMap<ApplicationId, Application> applications = protected final ConcurrentMap<ApplicationId, Application> applications =
new ConcurrentHashMap<ApplicationId, Application>(); new ConcurrentHashMap<ApplicationId, Application>();
protected final ConcurrentMap<ContainerId, Container> containers = protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>(); new ConcurrentSkipListMap<ContainerId, Container>();

View File

@ -104,11 +104,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// Duration for which to track recently stopped container. // Duration for which to track recently stopped container.
private long durationToTrackStoppedContainers; private long durationToTrackStoppedContainers;
// This is used to track the current completed containers when nodeheartBeat
// is called. These completed containers will be removed from NM context after
// nodeHeartBeat succeeds and the response from the nodeHeartBeat is
// processed.
private final Set<ContainerId> previousCompletedContainers;
private final NodeHealthCheckerService healthChecker; private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics; private final NodeManagerMetrics metrics;
@ -125,7 +120,6 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
this.metrics = metrics; this.metrics = metrics;
this.recentlyStoppedContainers = this.recentlyStoppedContainers =
new LinkedHashMap<ContainerId, Long>(); new LinkedHashMap<ContainerId, Long>();
this.previousCompletedContainers = new HashSet<ContainerId>();
} }
@Override @Override
@ -331,7 +325,7 @@ private List<ApplicationId> createKeepAliveApplicationList() {
return appList; return appList;
} }
private NodeStatus getNodeStatus(int responseId) { private NodeStatus getNodeStatus(int responseId) throws IOException {
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
@ -352,11 +346,18 @@ private NodeStatus getNodeStatus(int responseId) {
// Iterate through the NMContext and clone and get all the containers' // Iterate through the NMContext and clone and get all the containers'
// statuses. If it's a completed container, add into the // statuses. If it's a completed container, add into the
// recentlyStoppedContainers and previousCompletedContainers collections. // recentlyStoppedContainers collections.
@VisibleForTesting @VisibleForTesting
protected List<ContainerStatus> getContainerStatuses() { protected List<ContainerStatus> getContainerStatuses() throws IOException {
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>(); List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
for (Container container : this.context.getContainers().values()) { for (Container container : this.context.getContainers().values()) {
ContainerId containerId = container.getContainerId();
ApplicationId applicationId = container.getContainerId()
.getApplicationAttemptId().getApplicationId();
if (!this.context.getApplications().containsKey(applicationId)) {
context.getContainers().remove(containerId);
continue;
}
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus(); container.cloneAndGetContainerStatus();
containerStatuses.add(containerStatus); containerStatuses.add(containerStatus);
@ -381,10 +382,17 @@ private List<ApplicationId> getRunningApplications() {
} }
// These NMContainerStatus are sent on NM registration and used by YARN only. // These NMContainerStatus are sent on NM registration and used by YARN only.
private List<NMContainerStatus> getNMContainerStatuses() { private List<NMContainerStatus> getNMContainerStatuses() throws IOException {
List<NMContainerStatus> containerStatuses = List<NMContainerStatus> containerStatuses =
new ArrayList<NMContainerStatus>(); new ArrayList<NMContainerStatus>();
for (Container container : this.context.getContainers().values()) { for (Container container : this.context.getContainers().values()) {
ContainerId containerId = container.getContainerId();
ApplicationId applicationId = container.getContainerId()
.getApplicationAttemptId().getApplicationId();
if (!this.context.getApplications().containsKey(applicationId)) {
context.getContainers().remove(containerId);
continue;
}
NMContainerStatus status = NMContainerStatus status =
container.getNMContainerStatus(); container.getNMContainerStatus();
containerStatuses.add(status); containerStatuses.add(status);
@ -402,26 +410,30 @@ private List<NMContainerStatus> getNMContainerStatuses() {
@Override @Override
public void addCompletedContainer(ContainerId containerId) { public void addCompletedContainer(ContainerId containerId) {
synchronized (previousCompletedContainers) {
previousCompletedContainers.add(containerId);
}
synchronized (recentlyStoppedContainers) { synchronized (recentlyStoppedContainers) {
removeVeryOldStoppedContainersFromCache(); removeVeryOldStoppedContainersFromCache();
recentlyStoppedContainers.put(containerId, if (!recentlyStoppedContainers.containsKey(containerId)) {
System.currentTimeMillis() + durationToTrackStoppedContainers); recentlyStoppedContainers.put(containerId,
System.currentTimeMillis() + durationToTrackStoppedContainers);
}
} }
} }
private void removeCompletedContainersFromContext() { @VisibleForTesting
synchronized (previousCompletedContainers) { @Private
if (!previousCompletedContainers.isEmpty()) { public void removeCompletedContainersFromContext(
for (ContainerId containerId : previousCompletedContainers) { List<ContainerId>containerIds) throws IOException {
this.context.getContainers().remove(containerId); Set<ContainerId> removedContainers = new HashSet<ContainerId>();
}
LOG.info("Removed completed containers from NM context: " // If the AM has pulled the completedContainer it can be removed
+ previousCompletedContainers); for (ContainerId containerId : containerIds) {
previousCompletedContainers.clear(); context.getContainers().remove(containerId);
} removedContainers.add(containerId);
}
if (!removedContainers.isEmpty()) {
LOG.info("Removed completed containers from NM context: " +
removedContainers);
} }
} }
@ -454,7 +466,7 @@ public boolean isContainerRecentlyStopped(ContainerId containerId) {
return recentlyStoppedContainers.containsKey(containerId); return recentlyStoppedContainers.containsKey(containerId);
} }
} }
@Override @Override
public void clearFinishedContainersFromCache() { public void clearFinishedContainersFromCache() {
synchronized (recentlyStoppedContainers) { synchronized (recentlyStoppedContainers) {
@ -472,11 +484,13 @@ public void removeVeryOldStoppedContainersFromCache() {
while (i.hasNext()) { while (i.hasNext()) {
ContainerId cid = i.next(); ContainerId cid = i.next();
if (recentlyStoppedContainers.get(cid) < currentTime) { if (recentlyStoppedContainers.get(cid) < currentTime) {
i.remove(); if (!context.getContainers().containsKey(cid)) {
try { i.remove();
context.getNMStateStore().removeContainer(cid); try {
} catch (IOException e) { context.getNMStateStore().removeContainer(cid);
LOG.error("Unable to remove container " + cid + " in store", e); } catch (IOException e) {
LOG.error("Unable to remove container " + cid + " in store", e);
}
} }
} else { } else {
break; break;
@ -542,7 +556,9 @@ public void run() {
// don't want to remove the completed containers before resync // don't want to remove the completed containers before resync
// because these completed containers will be reported back to RM // because these completed containers will be reported back to RM
// when NM re-registers with RM. // when NM re-registers with RM.
removeCompletedContainersFromContext(); // Only remove the cleanedup containers that are acked
removeCompletedContainersFromContext(response
.getFinishedContainersPulledByAM());
lastHeartBeatID = response.getResponseId(); lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response List<ContainerId> containersToCleanup = response

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -247,6 +248,10 @@ public RegisterNodeManagerResponse registerNodeManager(
// put the completed container into the context // put the completed container into the context
getNMContext().getContainers().put( getNMContext().getContainers().put(
testCompleteContainer.getContainerId(), container); testCompleteContainer.getContainerId(), container);
getNMContext().getApplications().put(
testCompleteContainer.getContainerId()
.getApplicationAttemptId().getApplicationId(),
mock(Application.class));
} else { } else {
// second register contains the completed container info. // second register contains the completed container info.
List<NMContainerStatus> statuses = List<NMContainerStatus> statuses =
@ -382,9 +387,17 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
if (containersShouldBePreserved) { if (containersShouldBePreserved) {
Assert.assertFalse(containers.isEmpty()); Assert.assertFalse(containers.isEmpty());
Assert.assertTrue(containers.containsKey(existingCid)); Assert.assertTrue(containers.containsKey(existingCid));
Assert.assertEquals(ContainerState.RUNNING,
containers.get(existingCid)
.cloneAndGetContainerStatus().getState());
} else { } else {
// ensure that containers are empty before restart nodeStatusUpdater // ensure that containers are empty or are completed before
Assert.assertTrue(containers.isEmpty()); // restart nodeStatusUpdater
if (!containers.isEmpty()) {
Assert.assertEquals(ContainerState.COMPLETE,
containers.get(existingCid)
.cloneAndGetContainerStatus().getState());
}
} }
super.rebootNodeStatusUpdaterAndRegisterWithRM(); super.rebootNodeStatusUpdaterAndRegisterWithRM();
} }
@ -465,7 +478,12 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
try { try {
// ensure that containers are empty before restart nodeStatusUpdater // ensure that containers are empty before restart nodeStatusUpdater
Assert.assertTrue(containers.isEmpty()); if (!containers.isEmpty()) {
for (Container container: containers.values()) {
Assert.assertEquals(ContainerState.COMPLETE,
container.cloneAndGetContainerStatus().getState());
}
}
super.rebootNodeStatusUpdaterAndRegisterWithRM(); super.rebootNodeStatusUpdaterAndRegisterWithRM();
// After this point new containers are free to be launched, except // After this point new containers are free to be launched, except
// containers from previous RM // containers from previous RM

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.RMProxy; import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
@ -180,7 +181,7 @@ private Map<ApplicationId, List<ContainerStatus>> getAppToContainerStatusMap(
Map<ApplicationId, List<ContainerStatus>> map = Map<ApplicationId, List<ContainerStatus>> map =
new HashMap<ApplicationId, List<ContainerStatus>>(); new HashMap<ApplicationId, List<ContainerStatus>>();
for (ContainerStatus cs : containers) { for (ContainerStatus cs : containers) {
ApplicationId applicationId = ApplicationId applicationId =
cs.getContainerId().getApplicationAttemptId().getApplicationId(); cs.getContainerId().getApplicationAttemptId().getApplicationId();
List<ContainerStatus> appContainers = map.get(applicationId); List<ContainerStatus> appContainers = map.get(applicationId);
if (appContainers == null) { if (appContainers == null) {
@ -205,10 +206,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
Map<ApplicationId, List<ContainerStatus>> appToContainers = Map<ApplicationId, List<ContainerStatus>> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
ApplicationId appId1 = ApplicationId.newInstance(0, 1); ApplicationId appId1 = ApplicationId.newInstance(0, 1);
ApplicationId appId2 = ApplicationId.newInstance(0, 2); ApplicationId appId2 = ApplicationId.newInstance(0, 2);
if (heartBeatID == 1) { if (heartBeatID == 1) {
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
@ -419,7 +420,7 @@ protected void stopRMProxy() {
} }
private class MyNodeManager extends NodeManager { private class MyNodeManager extends NodeManager {
private MyNodeStatusUpdater3 nodeStatusUpdater; private MyNodeStatusUpdater3 nodeStatusUpdater;
@Override @Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context,
@ -433,7 +434,7 @@ public MyNodeStatusUpdater3 getNodeStatusUpdater() {
return this.nodeStatusUpdater; return this.nodeStatusUpdater;
} }
} }
private class MyNodeManager2 extends NodeManager { private class MyNodeManager2 extends NodeManager {
public boolean isStopped = false; public boolean isStopped = false;
private NodeStatusUpdater nodeStatusUpdater; private NodeStatusUpdater nodeStatusUpdater;
@ -467,7 +468,7 @@ protected void serviceStop() throws Exception {
syncBarrier.await(10000, TimeUnit.MILLISECONDS); syncBarrier.await(10000, TimeUnit.MILLISECONDS);
} }
} }
// //
private class MyResourceTracker2 implements ResourceTracker { private class MyResourceTracker2 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
public NodeAction registerNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL;
@ -478,7 +479,7 @@ private class MyResourceTracker2 implements ResourceTracker {
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException, RegisterNodeManagerRequest request) throws YarnException,
IOException { IOException {
RegisterNodeManagerResponse response = recordFactory RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class); .newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(registerNodeAction ); response.setNodeAction(registerNodeAction );
@ -493,7 +494,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException { throws YarnException, IOException {
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils. NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null, newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
null, null, null, 1000L); null, null, null, 1000L);
@ -501,7 +502,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
return nhResponse; return nhResponse;
} }
} }
private class MyResourceTracker3 implements ResourceTracker { private class MyResourceTracker3 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
public NodeAction registerNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL;
@ -513,7 +514,7 @@ private class MyResourceTracker3 implements ResourceTracker {
MyResourceTracker3(Context context) { MyResourceTracker3(Context context) {
this.context = context; this.context = context;
} }
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException, RegisterNodeManagerRequest request) throws YarnException,
@ -564,6 +565,14 @@ private class MyResourceTracker4 implements ResourceTracker {
public NodeAction registerNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL;
public NodeAction heartBeatNodeAction = NodeAction.NORMAL; public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
private Context context; private Context context;
private final ContainerStatus containerStatus2 =
createContainerStatus(2, ContainerState.RUNNING);
private final ContainerStatus containerStatus3 =
createContainerStatus(3, ContainerState.COMPLETE);
private final ContainerStatus containerStatus4 =
createContainerStatus(4, ContainerState.RUNNING);
private final ContainerStatus containerStatus5 =
createContainerStatus(5, ContainerState.COMPLETE);
public MyResourceTracker4(Context context) { public MyResourceTracker4(Context context) {
this.context = context; this.context = context;
@ -583,6 +592,8 @@ public RegisterNodeManagerResponse registerNodeManager(
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException { throws YarnException, IOException {
List<ContainerId> finishedContainersPulledByAM = new ArrayList
<ContainerId>();
try { try {
if (heartBeatID == 0) { if (heartBeatID == 0) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses() Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
@ -594,10 +605,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
Assert.assertEquals(statuses.size(), 2); Assert.assertEquals(statuses.size(), 2);
Assert.assertEquals(context.getContainers().size(), 2); Assert.assertEquals(context.getContainers().size(), 2);
ContainerStatus containerStatus2 =
createContainerStatus(2, ContainerState.RUNNING);
ContainerStatus containerStatus3 =
createContainerStatus(3, ContainerState.COMPLETE);
boolean container2Exist = false, container3Exist = false; boolean container2Exist = false, container3Exist = false;
for (ContainerStatus status : statuses) { for (ContainerStatus status : statuses) {
if (status.getContainerId().equals( if (status.getContainerId().equals(
@ -619,23 +626,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// nodeStatusUpdaterRunnable, otherwise nm just shuts down and the // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the
// test passes. // test passes.
throw new YarnRuntimeException("Lost the heartbeat response"); throw new YarnRuntimeException("Lost the heartbeat response");
} else if (heartBeatID == 2) { } else if (heartBeatID == 2 || heartBeatID == 3) {
List<ContainerStatus> statuses = List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses(); request.getNodeStatus().getContainersStatuses();
Assert.assertEquals(statuses.size(), 4); Assert.assertEquals(statuses.size(), 4);
Assert.assertEquals(context.getContainers().size(), 4); Assert.assertEquals(context.getContainers().size(), 4);
ContainerStatus containerStatus2 = boolean container2Exist = false, container3Exist = false,
createContainerStatus(2, ContainerState.RUNNING); container4Exist = false, container5Exist = false;
ContainerStatus containerStatus3 =
createContainerStatus(3, ContainerState.COMPLETE);
ContainerStatus containerStatus4 =
createContainerStatus(4, ContainerState.RUNNING);
ContainerStatus containerStatus5 =
createContainerStatus(5, ContainerState.COMPLETE);
boolean container2Exist = false, container3Exist = false, container4Exist =
false, container5Exist = false;
for (ContainerStatus status : statuses) { for (ContainerStatus status : statuses) {
if (status.getContainerId().equals( if (status.getContainerId().equals(
containerStatus2.getContainerId())) { containerStatus2.getContainerId())) {
@ -664,6 +662,24 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
} }
Assert.assertTrue(container2Exist && container3Exist Assert.assertTrue(container2Exist && container3Exist
&& container4Exist && container5Exist); && container4Exist && container5Exist);
if (heartBeatID == 3) {
finishedContainersPulledByAM.add(containerStatus3.getContainerId());
}
} else if (heartBeatID == 4) {
List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses();
Assert.assertEquals(statuses.size(), 3);
Assert.assertEquals(context.getContainers().size(), 3);
boolean container3Exist = false;
for (ContainerStatus status : statuses) {
if (status.getContainerId().equals(
containerStatus3.getContainerId())) {
container3Exist = true;
}
}
Assert.assertFalse(container3Exist);
} }
} catch (AssertionError error) { } catch (AssertionError error) {
error.printStackTrace(); error.printStackTrace();
@ -676,6 +692,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeHeartbeatResponse nhResponse = NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
heartBeatNodeAction, null, null, null, null, 1000L); heartBeatNodeAction, null, null, null, null, 1000L);
nhResponse.addFinishedContainersPulledByAM(finishedContainersPulledByAM);
return nhResponse; return nhResponse;
} }
} }
@ -686,7 +703,7 @@ private class MyResourceTracker5 implements ResourceTracker {
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException, RegisterNodeManagerRequest request) throws YarnException,
IOException { IOException {
RegisterNodeManagerResponse response = recordFactory RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class); .newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(registerNodeAction ); response.setNodeAction(registerNodeAction );
@ -694,7 +711,7 @@ public RegisterNodeManagerResponse registerNodeManager(
response.setNMTokenMasterKey(createMasterKey()); response.setNMTokenMasterKey(createMasterKey());
return response; return response;
} }
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException { throws YarnException, IOException {
@ -767,11 +784,11 @@ public void deleteBaseDir() throws IOException {
lfs.delete(new Path(basedir.getPath()), true); lfs.delete(new Path(basedir.getPath()), true);
} }
@Test(timeout = 90000) @Test(timeout = 90000)
public void testRecentlyFinishedContainers() throws Exception { public void testRecentlyFinishedContainers() throws Exception {
NodeManager nm = new NodeManager(); NodeManager nm = new NodeManager();
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.set( conf.set(
NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
"10000"); "10000");
nm.init(conf); nm.init(conf);
@ -780,27 +797,112 @@ public void testRecentlyFinishedContainers() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 0); ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0); ApplicationAttemptId.newInstance(appId, 0);
ContainerId cId = ContainerId.newInstance(appAttemptId, 0); ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
nm.getNMContext().getApplications().putIfAbsent(appId,
mock(Application.class));
nm.getNMContext().getContainers().putIfAbsent(cId, mock(Container.class));
nodeStatusUpdater.addCompletedContainer(cId); nodeStatusUpdater.addCompletedContainer(cId);
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
nm.getNMContext().getContainers().remove(cId);
long time1 = System.currentTimeMillis(); long time1 = System.currentTimeMillis();
int waitInterval = 15; int waitInterval = 15;
while (waitInterval-- > 0 while (waitInterval-- > 0
&& nodeStatusUpdater.isContainerRecentlyStopped(cId)) { && nodeStatusUpdater.isContainerRecentlyStopped(cId)) {
nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
Thread.sleep(1000); Thread.sleep(1000);
} }
long time2 = System.currentTimeMillis(); long time2 = System.currentTimeMillis();
// By this time the container will be removed from cache. need to verify. // By this time the container will be removed from cache. need to verify.
Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000); Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000);
} }
@Test(timeout = 90000)
public void testRemovePreviousCompletedContainersFromContext() throws Exception {
NodeManager nm = new NodeManager();
YarnConfiguration conf = new YarnConfiguration();
conf.set(
NodeStatusUpdaterImpl
.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
"10000");
nm.init(conf);
NodeStatusUpdaterImpl nodeStatusUpdater =
(NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
ContainerId cId = ContainerId.newInstance(appAttemptId, 1);
Token containerToken =
BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser",
BuilderUtils.newResource(1024, 1), 0, 123,
"password".getBytes(), 0);
Container anyCompletedContainer = new ContainerImpl(conf, null,
null, null, null, null,
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
@Override
public ContainerState getCurrentState() {
return ContainerState.COMPLETE;
}
};
nm.getNMContext().getApplications().putIfAbsent(appId,
mock(Application.class));
nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
List<ContainerId> ackedContainers = new ArrayList<ContainerId>();
ackedContainers.add(cId);
nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers);
Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty());
}
@Test
public void testCleanedupApplicationContainerCleanup() throws IOException {
NodeManager nm = new NodeManager();
YarnConfiguration conf = new YarnConfiguration();
conf.set(NodeStatusUpdaterImpl
.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
"1000000");
nm.init(conf);
NodeStatusUpdaterImpl nodeStatusUpdater =
(NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
ContainerId cId = ContainerId.newInstance(appAttemptId, 1);
Token containerToken =
BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser",
BuilderUtils.newResource(1024, 1), 0, 123,
"password".getBytes(), 0);
Container anyCompletedContainer = new ContainerImpl(conf, null,
null, null, null, null,
BuilderUtils.newContainerTokenIdentifier(containerToken)) {
@Override
public ContainerState getCurrentState() {
return ContainerState.COMPLETE;
}
};
nm.getNMContext().getApplications().putIfAbsent(appId,
mock(Application.class));
nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
nm.getNMContext().getApplications().remove(appId);
nodeStatusUpdater.removeCompletedContainersFromContext(new ArrayList
<ContainerId>());
Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
}
@Test @Test
public void testNMRegistration() throws InterruptedException { public void testNMRegistration() throws InterruptedException {
nm = new NodeManager() { nm = new NodeManager() {
@ -860,7 +962,7 @@ public void run() {
nm.stop(); nm.stop();
} }
@Test @Test
public void testStopReentrant() throws Exception { public void testStopReentrant() throws Exception {
final AtomicInteger numCleanups = new AtomicInteger(0); final AtomicInteger numCleanups = new AtomicInteger(0);
@ -875,7 +977,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
myNodeStatusUpdater.resourceTracker = myResourceTracker2; myNodeStatusUpdater.resourceTracker = myResourceTracker2;
return myNodeStatusUpdater; return myNodeStatusUpdater;
} }
@Override @Override
protected ContainerManagerImpl createContainerManager(Context context, protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del, ContainerExecutor exec, DeletionService del,
@ -897,7 +999,7 @@ public void cleanUpApplicationsOnNMShutDown() {
YarnConfiguration conf = createNMConfig(); YarnConfiguration conf = createNMConfig();
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
int waitCount = 0; int waitCount = 0;
while (heartBeatID < 1 && waitCount++ != 200) { while (heartBeatID < 1 && waitCount++ != 200) {
Thread.sleep(500); Thread.sleep(500);
@ -906,7 +1008,7 @@ public void cleanUpApplicationsOnNMShutDown() {
// Meanwhile call stop directly as the shutdown hook would // Meanwhile call stop directly as the shutdown hook would
nm.stop(); nm.stop();
// NM takes a while to reach the STOPPED state. // NM takes a while to reach the STOPPED state.
waitCount = 0; waitCount = 0;
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
@ -1172,9 +1274,13 @@ protected NMContext createNMContext(
nm.start(); nm.start();
int waitCount = 0; int waitCount = 0;
while (heartBeatID <= 3 && waitCount++ != 20) { while (heartBeatID <= 4 && waitCount++ != 20) {
Thread.sleep(500); Thread.sleep(500);
} }
if (heartBeatID <= 4) {
Assert.fail("Failed to get all heartbeats in time, " +
"heartbeatID:" + heartBeatID);
}
if(assertionFailedInThread.get()) { if(assertionFailedInThread.get()) {
Assert.fail("ContainerStatus Backup failed"); Assert.fail("ContainerStatus Backup failed");
} }
@ -1182,7 +1288,7 @@ protected NMContext createNMContext(
} }
@Test(timeout = 200000) @Test(timeout = 200000)
public void testNodeStatusUpdaterRetryAndNMShutdown() public void testNodeStatusUpdaterRetryAndNMShutdown()
throws Exception { throws Exception {
final long connectionWaitSecs = 1000; final long connectionWaitSecs = 1000;
final long connectionRetryIntervalMs = 1000; final long connectionRetryIntervalMs = 1000;
@ -1190,7 +1296,7 @@ public void testNodeStatusUpdaterRetryAndNMShutdown()
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
connectionWaitSecs); connectionWaitSecs);
conf.setLong(YarnConfiguration conf.setLong(YarnConfiguration
.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
connectionRetryIntervalMs); connectionRetryIntervalMs);
conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000); conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
@ -1281,30 +1387,36 @@ public ConcurrentMap<ContainerId, Container> getContainers() {
} else if (heartBeatID == 1) { } else if (heartBeatID == 1) {
ContainerStatus containerStatus2 = ContainerStatus containerStatus2 =
createContainerStatus(2, ContainerState.RUNNING); createContainerStatus(2, ContainerState.RUNNING);
Container container2 = getMockContainer(containerStatus2); putMockContainer(containerStatus2);
containers.put(containerStatus2.getContainerId(), container2);
ContainerStatus containerStatus3 = ContainerStatus containerStatus3 =
createContainerStatus(3, ContainerState.COMPLETE); createContainerStatus(3, ContainerState.COMPLETE);
Container container3 = getMockContainer(containerStatus3); putMockContainer(containerStatus3);
containers.put(containerStatus3.getContainerId(), container3);
return containers; return containers;
} else if (heartBeatID == 2) { } else if (heartBeatID == 2) {
ContainerStatus containerStatus4 = ContainerStatus containerStatus4 =
createContainerStatus(4, ContainerState.RUNNING); createContainerStatus(4, ContainerState.RUNNING);
Container container4 = getMockContainer(containerStatus4); putMockContainer(containerStatus4);
containers.put(containerStatus4.getContainerId(), container4);
ContainerStatus containerStatus5 = ContainerStatus containerStatus5 =
createContainerStatus(5, ContainerState.COMPLETE); createContainerStatus(5, ContainerState.COMPLETE);
Container container5 = getMockContainer(containerStatus5); putMockContainer(containerStatus5);
containers.put(containerStatus5.getContainerId(), container5); return containers;
} else if (heartBeatID == 3 || heartBeatID == 4) {
return containers; return containers;
} else { } else {
containers.clear(); containers.clear();
return containers; return containers;
} }
} }
private void putMockContainer(ContainerStatus containerStatus) {
Container container = getMockContainer(containerStatus);
containers.put(containerStatus.getContainerId(), container);
applications.putIfAbsent(containerStatus.getContainerId()
.getApplicationAttemptId().getApplicationId(),
mock(Application.class));
}
} }
public static ContainerStatus createContainerStatus(int id, public static ContainerStatus createContainerStatus(int id,
@ -1345,7 +1457,7 @@ private void verifyNodeStartFailure(String errMessage) throws Exception {
throw e; throw e;
} }
} }
// the service should be stopped // the service should be stopped
Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm
.getServiceState()); .getServiceState());
@ -1364,7 +1476,7 @@ private YarnConfiguration createNMConfig() {
} }
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345"); conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345");
conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346"); conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogsDir.getAbsolutePath()); remoteLogsDir.getAbsolutePath());
@ -1372,7 +1484,7 @@ private YarnConfiguration createNMConfig() {
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf; return conf;
} }
private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) { private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
return new NodeManager() { return new NodeManager() {
@Override @Override

View File

@ -198,7 +198,7 @@ protected void serviceStop() throws Exception {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@VisibleForTesting @VisibleForTesting
void handleNMContainerStatus(NMContainerStatus containerStatus) { void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) {
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
containerStatus.getContainerId().getApplicationAttemptId(); containerStatus.getContainerId().getApplicationAttemptId();
RMApp rmApp = RMApp rmApp =
@ -229,7 +229,8 @@ void handleNMContainerStatus(NMContainerStatus containerStatus) {
containerStatus.getContainerExitStatus()); containerStatus.getContainerExitStatus());
// sending master container finished event. // sending master container finished event.
RMAppAttemptContainerFinishedEvent evt = RMAppAttemptContainerFinishedEvent evt =
new RMAppAttemptContainerFinishedEvent(appAttemptId, status); new RMAppAttemptContainerFinishedEvent(appAttemptId, status,
nodeId);
rmContext.getDispatcher().getEventHandler().handle(evt); rmContext.getDispatcher().getEventHandler().handle(evt);
} }
} }
@ -324,7 +325,7 @@ public RegisterNodeManagerResponse registerNodeManager(
LOG.info("received container statuses on node manager register :" LOG.info("received container statuses on node manager register :"
+ request.getNMContainerStatuses()); + request.getNMContainerStatuses());
for (NMContainerStatus status : request.getNMContainerStatuses()) { for (NMContainerStatus status : request.getNMContainerStatuses()) {
handleNMContainerStatus(status); handleNMContainerStatus(status, nodeId);
} }
} }
} }

View File

@ -1181,7 +1181,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
int numberOfFailure = app.getNumFailedAppAttempts(); int numberOfFailure = app.getNumFailedAppAttempts();
if (!app.submissionContext.getUnmanagedAM() if (!app.submissionContext.getUnmanagedAM()
&& numberOfFailure < app.maxAppAttempts) { && numberOfFailure < app.maxAppAttempts) {
boolean transferStateFromPreviousAttempt = false; boolean transferStateFromPreviousAttempt;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt = transferStateFromPreviousAttempt =
failedEvent.getTransferStateFromPreviousAttempt(); failedEvent.getTransferStateFromPreviousAttempt();
@ -1191,11 +1191,11 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
// Transfer the state from the previous attempt to the current attempt. // Transfer the state from the previous attempt to the current attempt.
// Note that the previous failed attempt may still be collecting the // Note that the previous failed attempt may still be collecting the
// container events from the scheduler and update its data structures // container events from the scheduler and update its data structures
// before the new attempt is created. // before the new attempt is created. We always transferState for
if (transferStateFromPreviousAttempt) { // finished containers so that they can be acked to NM,
((RMAppAttemptImpl) app.currentAttempt) // but when pulling finished container we will check this flag again.
.transferStateFromPreviousAttempt(oldAttempt); ((RMAppAttemptImpl) app.currentAttempt)
} .transferStateFromPreviousAttempt(oldAttempt);
return initialState; return initialState;
} else { } else {
if (numberOfFailure >= app.maxAppAttempts) { if (numberOfFailure >= app.maxAppAttempts) {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentMap;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
@ -31,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -120,13 +122,28 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
List<ContainerStatus> pullJustFinishedContainers(); List<ContainerStatus> pullJustFinishedContainers();
/** /**
* Return the list of last set of finished containers. This does not reset the * Returns a reference to the map of last set of finished containers to the
* finished containers. * corresponding node. This does not reset the finished containers.
* @return the list of just finished contianers, this does not reset the * @return the list of just finished containers, this does not reset the
* finished containers. * finished containers.
*/ */
ConcurrentMap<NodeId, List<ContainerStatus>>
getJustFinishedContainersReference();
/**
* Return the list of last set of finished containers. This does not reset
* the finished containers.
* @return the list of just finished containers
*/
List<ContainerStatus> getJustFinishedContainers(); List<ContainerStatus> getJustFinishedContainers();
/**
* The map of conatiners per Node that are already sent to the AM.
* @return map of per node list of finished container status sent to AM
*/
ConcurrentMap<NodeId, List<ContainerStatus>>
getFinishedContainersSentToAMReference();
/** /**
* The container on which the Application Master is running. * The container on which the Application Master is running.
* @return the {@link Container} on which the application master is running. * @return the {@link Container} on which the application master is running.

View File

@ -24,9 +24,12 @@
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -52,6 +55,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
@ -83,6 +87,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -129,9 +134,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private final ApplicationSubmissionContext submissionContext; private final ApplicationSubmissionContext submissionContext;
private Token<AMRMTokenIdentifier> amrmToken = null; private Token<AMRMTokenIdentifier> amrmToken = null;
private SecretKey clientTokenMasterKey = null; private SecretKey clientTokenMasterKey = null;
private List<ContainerStatus> justFinishedContainers = private ConcurrentMap<NodeId, List<ContainerStatus>>
new ArrayList<ContainerStatus>(); justFinishedContainers =
new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
// Tracks the previous finished containers that are waiting to be
// verified as received by the AM. If the AM sends the next allocate
// request it implicitly acks this list.
private ConcurrentMap<NodeId, List<ContainerStatus>>
finishedContainersSentToAM =
new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
private Container masterContainer; private Container masterContainer;
private float progress = 0; private float progress = 0;
@ -627,9 +639,27 @@ public float getProgress() {
} }
} }
@VisibleForTesting
@Override @Override
public List<ContainerStatus> getJustFinishedContainers() { public List<ContainerStatus> getJustFinishedContainers() {
this.readLock.lock(); this.readLock.lock();
try {
List<ContainerStatus> returnList = new ArrayList<ContainerStatus>();
for (Collection<ContainerStatus> containerStatusList :
justFinishedContainers.values()) {
returnList.addAll(containerStatusList);
}
return returnList;
} finally {
this.readLock.unlock();
}
}
@Override
public ConcurrentMap<NodeId, List<ContainerStatus>>
getJustFinishedContainersReference
() {
this.readLock.lock();
try { try {
return this.justFinishedContainers; return this.justFinishedContainers;
} finally { } finally {
@ -637,15 +667,68 @@ public List<ContainerStatus> getJustFinishedContainers() {
} }
} }
@Override
public ConcurrentMap<NodeId, List<ContainerStatus>>
getFinishedContainersSentToAMReference() {
this.readLock.lock();
try {
return this.finishedContainersSentToAM;
} finally {
this.readLock.unlock();
}
}
@Override @Override
public List<ContainerStatus> pullJustFinishedContainers() { public List<ContainerStatus> pullJustFinishedContainers() {
this.writeLock.lock(); this.writeLock.lock();
try { try {
List<ContainerStatus> returnList = new ArrayList<ContainerStatus>( List<ContainerStatus> returnList = new ArrayList<ContainerStatus>();
this.justFinishedContainers.size());
returnList.addAll(this.justFinishedContainers); // A new allocate means the AM received the previously sent
this.justFinishedContainers.clear(); // finishedContainers. We can ack this to NM now
for (NodeId nodeId:finishedContainersSentToAM.keySet()) {
// Clear and get current values
List<ContainerStatus> currentSentContainers =
finishedContainersSentToAM
.put(nodeId, new ArrayList<ContainerStatus>());
List<ContainerId> containerIdList = new ArrayList<ContainerId>
(currentSentContainers.size());
for (ContainerStatus containerStatus:currentSentContainers) {
containerIdList.add(containerStatus.getContainerId());
}
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(
nodeId, containerIdList));
}
// Mark every containerStatus as being sent to AM though we may return
// only the ones that belong to the current attempt
boolean keepContainersAcressAttempts = this.submissionContext
.getKeepContainersAcrossApplicationAttempts();
for (NodeId nodeId:justFinishedContainers.keySet()) {
// Clear and get current values
List<ContainerStatus> finishedContainers = justFinishedContainers.put
(nodeId, new ArrayList<ContainerStatus>());
if (keepContainersAcressAttempts) {
returnList.addAll(finishedContainers);
} else {
// Filter out containers from previous attempt
for (ContainerStatus containerStatus: finishedContainers) {
if (containerStatus.getContainerId().getApplicationAttemptId()
.equals(this.getAppAttemptId())) {
returnList.add(containerStatus);
}
}
}
finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList
<ContainerStatus>());
finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
}
return returnList; return returnList;
} finally { } finally {
this.writeLock.unlock(); this.writeLock.unlock();
@ -732,7 +815,7 @@ public void recover(RMState state) throws Exception {
} }
setMasterContainer(attemptState.getMasterContainer()); setMasterContainer(attemptState.getMasterContainer());
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(), recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
attemptState.getState()); attemptState.getState());
this.recoveredFinalState = attemptState.getState(); this.recoveredFinalState = attemptState.getState();
this.originalTrackingUrl = attemptState.getFinalTrackingUrl(); this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@ -744,7 +827,9 @@ public void recover(RMState state) throws Exception {
} }
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
this.justFinishedContainers = attempt.getJustFinishedContainers(); this.justFinishedContainers = attempt.getJustFinishedContainersReference();
this.finishedContainersSentToAM =
attempt.getFinishedContainersSentToAMReference();
} }
private void recoverAppAttemptCredentials(Credentials appAttemptTokens, private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
@ -1507,6 +1592,9 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
ContainerStatus containerStatus = ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus(); containerFinishedEvent.getContainerStatus();
// Add all finished containers so that they can be acked to NM
addJustFinishedContainer(appAttempt, containerFinishedEvent);
// Is this container the AmContainer? If the finished container is same as // Is this container the AmContainer? If the finished container is same as
// the AMContainer, AppAttempt fails // the AMContainer, AppAttempt fails
if (appAttempt.masterContainer != null if (appAttempt.masterContainer != null
@ -1519,12 +1607,18 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
return RMAppAttemptState.FINAL_SAVING; return RMAppAttemptState.FINAL_SAVING;
} }
// Normal container.Put it in completed containers list
appAttempt.justFinishedContainers.add(containerStatus);
return this.currentState; return this.currentState;
} }
} }
private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
.getNodeId(), new ArrayList<ContainerStatus>());
appAttempt.justFinishedContainers.get(containerFinishedEvent
.getNodeId()).add(containerFinishedEvent.getContainerStatus());
}
private static final class ContainerFinishedAtFinalStateTransition private static final class ContainerFinishedAtFinalStateTransition
extends BaseTransition { extends BaseTransition {
@Override @Override
@ -1533,10 +1627,8 @@ private static final class ContainerFinishedAtFinalStateTransition
RMAppAttemptContainerFinishedEvent containerFinishedEvent = RMAppAttemptContainerFinishedEvent containerFinishedEvent =
(RMAppAttemptContainerFinishedEvent) event; (RMAppAttemptContainerFinishedEvent) event;
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
// Normal container. Add it in completed containers list // Normal container. Add it in completed containers list
appAttempt.justFinishedContainers.add(containerStatus); addJustFinishedContainer(appAttempt, containerFinishedEvent);
} }
} }
@ -1569,6 +1661,9 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
ContainerStatus containerStatus = ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus(); containerFinishedEvent.getContainerStatus();
// Add all finished containers so that they can be acked to NM.
addJustFinishedContainer(appAttempt, containerFinishedEvent);
// Is this container the ApplicationMaster container? // Is this container the ApplicationMaster container?
if (appAttempt.masterContainer.getId().equals( if (appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) { containerStatus.getContainerId())) {
@ -1576,8 +1671,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
appAttempt, containerFinishedEvent); appAttempt, containerFinishedEvent);
return RMAppAttemptState.FINISHED; return RMAppAttemptState.FINISHED;
} }
// Normal container.
appAttempt.justFinishedContainers.add(containerStatus);
return RMAppAttemptState.FINISHING; return RMAppAttemptState.FINISHING;
} }
} }
@ -1592,6 +1686,9 @@ private static class ContainerFinishedAtFinalSavingTransition extends
ContainerStatus containerStatus = ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus(); containerFinishedEvent.getContainerStatus();
// Add all finished containers so that they can be acked to NM.
addJustFinishedContainer(appAttempt, containerFinishedEvent);
// If this is the AM container, it means the AM container is finished, // If this is the AM container, it means the AM container is finished,
// but we are not yet acknowledged that the final state has been saved. // but we are not yet acknowledged that the final state has been saved.
// Thus, we still return FINAL_SAVING state here. // Thus, we still return FINAL_SAVING state here.
@ -1611,8 +1708,6 @@ private static class ContainerFinishedAtFinalSavingTransition extends
appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED); appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
return; return;
} }
// Normal container.
appAttempt.justFinishedContainers.add(containerStatus);
} }
} }
@ -1629,7 +1724,7 @@ public AMFinishedAfterFinalSavingTransition(
transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
appAttempt.updateInfoOnAMUnregister(amUnregisteredEvent); appAttempt.updateInfoOnAMUnregister(amUnregisteredEvent);
new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt, new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt,
event); event);
} }
} }

View File

@ -20,21 +20,27 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent { public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent {
private final ContainerStatus containerStatus; private final ContainerStatus containerStatus;
private final NodeId nodeId;
public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId, public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId,
ContainerStatus containerStatus) { ContainerStatus containerStatus, NodeId nodeId) {
super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED); super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED);
this.containerStatus = containerStatus; this.containerStatus = containerStatus;
this.nodeId = nodeId;
} }
public ContainerStatus getContainerStatus() { public ContainerStatus getContainerStatus() {
return this.containerStatus; return this.containerStatus;
} }
public NodeId getNodeId() {
return this.nodeId;
}
} }

View File

@ -78,13 +78,13 @@ RMContainerEventType.RESERVED, new ContainerReservedTransition())
RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
// Transitions from RESERVED state // Transitions from RESERVED state
.addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED,
RMContainerEventType.RESERVED, new ContainerReservedTransition()) RMContainerEventType.RESERVED, new ContainerReservedTransition())
.addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED, .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED,
RMContainerEventType.START, new ContainerStartedTransition()) RMContainerEventType.START, new ContainerStartedTransition())
.addTransition(RMContainerState.RESERVED, RMContainerState.KILLED, .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED,
RMContainerEventType.KILL) // nothing to do RMContainerEventType.KILL) // nothing to do
.addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED, .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED,
RMContainerEventType.RELEASED) // nothing to do RMContainerEventType.RELEASED) // nothing to do
@ -100,7 +100,7 @@ RMContainerEventType.KILL, new FinishedTransition())
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
RMContainerEventType.LAUNCHED, new LaunchedTransition()) RMContainerEventType.LAUNCHED, new LaunchedTransition())
.addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED, .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED,
RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState()) RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState())
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED, .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED,
RMContainerEventType.RELEASED, new KillTransition()) RMContainerEventType.RELEASED, new KillTransition())
.addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED, .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED,
@ -495,7 +495,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
updateAttemptMetrics(container); updateAttemptMetrics(container);
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
container.appAttemptId, finishedEvent.getRemoteContainerStatus())); container.appAttemptId, finishedEvent.getRemoteContainerStatus(),
container.getAllocatedNode()));
container.rmContext.getRMApplicationHistoryWriter().containerFinished( container.rmContext.getRMApplicationHistoryWriter().containerFinished(
container); container);

View File

@ -40,6 +40,9 @@ public enum RMNodeEventType {
CONTAINER_ALLOCATED, CONTAINER_ALLOCATED,
CLEANUP_CONTAINER, CLEANUP_CONTAINER,
// Source: RMAppAttempt
FINISHED_CONTAINERS_PULLED_BY_AM,
// Source: NMLivelinessMonitor // Source: NMLivelinessMonitor
EXPIRE EXPIRE
} }

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import java.util.List;
// Happens after an implicit ack from AM that the container completion has
// been notified successfully to the AM
public class RMNodeFinishedContainersPulledByAMEvent extends RMNodeEvent {
private List<ContainerId> containers;
public RMNodeFinishedContainersPulledByAMEvent(NodeId nodeId,
List<ContainerId> containers) {
super(nodeId, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM);
this.containers = containers;
}
public List<ContainerId> getContainers() {
return this.containers;
}
}

View File

@ -112,6 +112,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>( private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
new ContainerIdComparator()); new ContainerIdComparator());
/* set of containers that were notified to AM about their completion */
private final Set<ContainerId> finishedContainersPulledByAM =
new HashSet<ContainerId>();
/* the list of applications that have finished and need to be purged */ /* the list of applications that have finished and need to be purged */
private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>(); private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
@ -135,7 +139,7 @@ RMNodeEventType.STARTED, new AddNodeTransition())
new UpdateNodeResourceWhenUnusableTransition()) new UpdateNodeResourceWhenUnusableTransition())
//Transitions from RUNNING state //Transitions from RUNNING state
.addTransition(NodeState.RUNNING, .addTransition(NodeState.RUNNING,
EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition()) RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
.addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED, .addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
@ -151,6 +155,9 @@ RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition()) RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING, .addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new FinishedContainersPulledByAMTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING, .addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING, .addTransition(NodeState.RUNNING, NodeState.RUNNING,
@ -158,23 +165,30 @@ RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
//Transitions from REBOOTED state //Transitions from REBOOTED state
.addTransition(NodeState.REBOOTED, NodeState.REBOOTED, .addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
RMNodeEventType.RESOURCE_UPDATE, RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition()) new UpdateNodeResourceWhenUnusableTransition())
//Transitions from DECOMMISSIONED state //Transitions from DECOMMISSIONED state
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED, .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
RMNodeEventType.RESOURCE_UPDATE, RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition()) new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new FinishedContainersPulledByAMTransition())
//Transitions from LOST state //Transitions from LOST state
.addTransition(NodeState.LOST, NodeState.LOST, .addTransition(NodeState.LOST, NodeState.LOST,
RMNodeEventType.RESOURCE_UPDATE, RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition()) new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.LOST, NodeState.LOST,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new FinishedContainersPulledByAMTransition())
//Transitions from UNHEALTHY state //Transitions from UNHEALTHY state
.addTransition(NodeState.UNHEALTHY, .addTransition(NodeState.UNHEALTHY,
EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING), EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition()) RMNodeEventType.STATUS_UPDATE,
new StatusUpdateWhenUnHealthyTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED, .addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION, RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(NodeState.DECOMMISSIONED)) new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
@ -192,7 +206,10 @@ RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition()) RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new FinishedContainersPulledByAMTransition())
// create the topology tables // create the topology tables
.installTopology(); .installTopology();
@ -365,8 +382,11 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response
response.addAllContainersToCleanup( response.addAllContainersToCleanup(
new ArrayList<ContainerId>(this.containersToClean)); new ArrayList<ContainerId>(this.containersToClean));
response.addAllApplicationsToCleanup(this.finishedApplications); response.addAllApplicationsToCleanup(this.finishedApplications);
response.addFinishedContainersPulledByAM(
new ArrayList<ContainerId>(this.finishedContainersPulledByAM));
this.containersToClean.clear(); this.containersToClean.clear();
this.finishedApplications.clear(); this.finishedApplications.clear();
this.finishedContainersPulledByAM.clear();
} finally { } finally {
this.writeLock.unlock(); this.writeLock.unlock();
} }
@ -652,6 +672,16 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
} }
} }
public static class FinishedContainersPulledByAMTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.finishedContainersPulledByAM.addAll(((
RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
}
}
public static class DeactivateNodeTransition public static class DeactivateNodeTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@ -726,7 +756,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
new ArrayList<ContainerStatus>(); new ArrayList<ContainerStatus>();
for (ContainerStatus remoteContainer : statusEvent.getContainers()) { for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
ContainerId containerId = remoteContainer.getContainerId(); ContainerId containerId = remoteContainer.getContainerId();
// Don't bother with containers already scheduled for cleanup, or for // Don't bother with containers already scheduled for cleanup, or for
// applications already killed. The scheduler doens't need to know any // applications already killed. The scheduler doens't need to know any
// more about this container // more about this container

View File

@ -491,7 +491,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception {
ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1), ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
ContainerState.COMPLETE, Resource.newInstance(1024, 1), ContainerState.COMPLETE, Resource.newInstance(1024, 1),
"Dummy Completed", 0, Priority.newInstance(10), 1234); "Dummy Completed", 0, Priority.newInstance(10), 1234);
rm.getResourceTrackerService().handleNMContainerStatus(report); rm.getResourceTrackerService().handleNMContainerStatus(report, null);
verify(handler, never()).handle((Event) any()); verify(handler, never()).handle((Event) any());
// Case 1.2: Master container is null // Case 1.2: Master container is null
@ -502,7 +502,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception {
ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0), ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
ContainerState.COMPLETE, Resource.newInstance(1024, 1), ContainerState.COMPLETE, Resource.newInstance(1024, 1),
"Dummy Completed", 0, Priority.newInstance(10), 1234); "Dummy Completed", 0, Priority.newInstance(10), 1234);
rm.getResourceTrackerService().handleNMContainerStatus(report); rm.getResourceTrackerService().handleNMContainerStatus(report, null);
verify(handler, never()).handle((Event)any()); verify(handler, never()).handle((Event)any());
// Case 2: Managed AM // Case 2: Managed AM
@ -515,7 +515,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception {
ContainerState.COMPLETE, Resource.newInstance(1024, 1), ContainerState.COMPLETE, Resource.newInstance(1024, 1),
"Dummy Completed", 0, Priority.newInstance(10), 1234); "Dummy Completed", 0, Priority.newInstance(10), 1234);
try { try {
rm.getResourceTrackerService().handleNMContainerStatus(report); rm.getResourceTrackerService().handleNMContainerStatus(report, null);
} catch (Exception e) { } catch (Exception e) {
// expected - ignore // expected - ignore
} }
@ -530,7 +530,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception {
ContainerState.COMPLETE, Resource.newInstance(1024, 1), ContainerState.COMPLETE, Resource.newInstance(1024, 1),
"Dummy Completed", 0, Priority.newInstance(10), 1234); "Dummy Completed", 0, Priority.newInstance(10), 1234);
try { try {
rm.getResourceTrackerService().handleNMContainerStatus(report); rm.getResourceTrackerService().handleNMContainerStatus(report, null);
} catch (Exception e) { } catch (Exception e) {
// expected - ignore // expected - ignore
} }

View File

@ -98,6 +98,9 @@ public void testAMRestartWithExistingContainers() throws Exception {
Thread.sleep(200); Thread.sleep(200);
} }
ContainerId amContainerId = ContainerId.newInstance(am1
.getApplicationAttemptId(), 1);
// launch the 2nd container, for testing running container transferred. // launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING); nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
ContainerId containerId2 = ContainerId containerId2 =
@ -196,11 +199,15 @@ public void testAMRestartWithExistingContainers() throws Exception {
// completed containerId4 is also transferred to the new attempt. // completed containerId4 is also transferred to the new attempt.
RMAppAttempt newAttempt = RMAppAttempt newAttempt =
app1.getRMAppAttempt(am2.getApplicationAttemptId()); app1.getRMAppAttempt(am2.getApplicationAttemptId());
// 4 containers finished, acquired/allocated/reserved/completed. // 4 containers finished, acquired/allocated/reserved/completed + AM
waitForContainersToFinish(4, newAttempt); // container.
waitForContainersToFinish(5, newAttempt);
boolean container3Exists = false, container4Exists = false, container5Exists = boolean container3Exists = false, container4Exists = false, container5Exists =
false, container6Exists = false; false, container6Exists = false, amContainerExists = false;
for(ContainerStatus status : newAttempt.getJustFinishedContainers()) { for(ContainerStatus status : newAttempt.getJustFinishedContainers()) {
if(status.getContainerId().equals(amContainerId)) {
amContainerExists = true;
}
if(status.getContainerId().equals(containerId3)) { if(status.getContainerId().equals(containerId3)) {
// containerId3 is the container ran by previous attempt but finished by the // containerId3 is the container ran by previous attempt but finished by the
// new attempt. // new attempt.
@ -220,8 +227,11 @@ public void testAMRestartWithExistingContainers() throws Exception {
container6Exists = true; container6Exists = true;
} }
} }
Assert.assertTrue(container3Exists && container4Exists && container5Exists Assert.assertTrue(amContainerExists);
&& container6Exists); Assert.assertTrue(container3Exists);
Assert.assertTrue(container4Exists);
Assert.assertTrue(container5Exists);
Assert.assertTrue(container6Exists);
// New SchedulerApplicationAttempt also has the containers info. // New SchedulerApplicationAttempt also has the containers info.
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
@ -240,14 +250,14 @@ public void testAMRestartWithExistingContainers() throws Exception {
// all 4 normal containers finished. // all 4 normal containers finished.
System.out.println("New attempt's just finished containers: " System.out.println("New attempt's just finished containers: "
+ newAttempt.getJustFinishedContainers()); + newAttempt.getJustFinishedContainers());
waitForContainersToFinish(5, newAttempt); waitForContainersToFinish(6, newAttempt);
rm1.stop(); rm1.stop();
} }
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
throws InterruptedException { throws InterruptedException {
int count = 0; int count = 0;
while (attempt.getJustFinishedContainers().size() != expectedNum while (attempt.getJustFinishedContainers().size() < expectedNum
&& count < 500) { && count < 500) {
Thread.sleep(100); Thread.sleep(100);
count++; count++;

View File

@ -28,6 +28,7 @@
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -35,6 +36,7 @@
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -91,6 +93,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@ -151,6 +158,7 @@ public class TestRMAppAttemptTransitions {
private NMTokenSecretManagerInRM nmTokenManager = private NMTokenSecretManagerInRM nmTokenManager =
spy(new NMTokenSecretManagerInRM(conf)); spy(new NMTokenSecretManagerInRM(conf));
private boolean transferStateFromPreviousAttempt = false; private boolean transferStateFromPreviousAttempt = false;
private EventHandler<RMNodeEvent> rmnodeEventHandler;
private final class TestApplicationAttemptEventDispatcher implements private final class TestApplicationAttemptEventDispatcher implements
EventHandler<RMAppAttemptEvent> { EventHandler<RMAppAttemptEvent> {
@ -203,7 +211,7 @@ public void handle(AMLauncherEvent event) {
applicationMasterLauncher.handle(event); applicationMasterLauncher.handle(event);
} }
} }
private static int appId = 1; private static int appId = 1;
private ApplicationSubmissionContext submissionContext = null; private ApplicationSubmissionContext submissionContext = null;
@ -268,6 +276,9 @@ public void setUp() throws Exception {
rmDispatcher.register(AMLauncherEventType.class, rmDispatcher.register(AMLauncherEventType.class,
new TestAMLauncherEventDispatcher()); new TestAMLauncherEventDispatcher());
rmnodeEventHandler = mock(RMNodeImpl.class);
rmDispatcher.register(RMNodeEventType.class, rmnodeEventHandler);
rmDispatcher.init(conf); rmDispatcher.init(conf);
rmDispatcher.start(); rmDispatcher.start();
@ -575,6 +586,8 @@ private void testAppAttemptFinishedState(Container container,
} }
assertEquals(finishedContainerCount, applicationAttempt assertEquals(finishedContainerCount, applicationAttempt
.getJustFinishedContainers().size()); .getJustFinishedContainers().size());
Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
.size());
assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
@ -704,7 +717,8 @@ private void testUnmanagedAMSuccess(String url) {
application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(), application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(),
container.getNodeId())); container.getNodeId()));
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class),
container.getNodeId()));
// complete AM // complete AM
String diagnostics = "Successful"; String diagnostics = "Successful";
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
@ -752,10 +766,11 @@ public void testUsageReport() {
when(appResUsgRpt.getMemorySeconds()).thenReturn(223456L); when(appResUsgRpt.getMemorySeconds()).thenReturn(223456L);
when(appResUsgRpt.getVcoreSeconds()).thenReturn(75544L); when(appResUsgRpt.getVcoreSeconds()).thenReturn(75544L);
sendAttemptUpdateSavedEvent(applicationAttempt); sendAttemptUpdateSavedEvent(applicationAttempt);
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
attemptId, attemptId,
ContainerStatus.newInstance( ContainerStatus.newInstance(
amContainer.getId(), ContainerState.COMPLETE, "", 0))); amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
when(scheduler.getSchedulerAppInfo(eq(attemptId))).thenReturn(null); when(scheduler.getSchedulerAppInfo(eq(attemptId))).thenReturn(null);
@ -857,8 +872,9 @@ public void testAMCrashAtScheduled() {
SchedulerUtils.LOST_CONTAINER); SchedulerUtils.LOST_CONTAINER);
// send CONTAINER_FINISHED event at SCHEDULED state, // send CONTAINER_FINISHED event at SCHEDULED state,
// The state should be FINAL_SAVING with previous state SCHEDULED // The state should be FINAL_SAVING with previous state SCHEDULED
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), cs)); applicationAttempt.getAppAttemptId(), cs, anyNodeId));
// createApplicationAttemptState will return previous state (SCHEDULED), // createApplicationAttemptState will return previous state (SCHEDULED),
// if the current state is FINAL_SAVING. // if the current state is FINAL_SAVING.
assertEquals(YarnApplicationAttemptState.SCHEDULED, assertEquals(YarnApplicationAttemptState.SCHEDULED,
@ -904,8 +920,9 @@ public void testAMCrashAtAllocated() {
ContainerStatus cs = ContainerStatus cs =
BuilderUtils.newContainerStatus(amContainer.getId(), BuilderUtils.newContainerStatus(amContainer.getId(),
ContainerState.COMPLETE, containerDiagMsg, exitCode); ContainerState.COMPLETE, containerDiagMsg, exitCode);
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), cs)); applicationAttempt.getAppAttemptId(), cs, anyNodeId));
assertEquals(YarnApplicationAttemptState.ALLOCATED, assertEquals(YarnApplicationAttemptState.ALLOCATED,
applicationAttempt.createApplicationAttemptState()); applicationAttempt.createApplicationAttemptState());
sendAttemptUpdateSavedEvent(applicationAttempt); sendAttemptUpdateSavedEvent(applicationAttempt);
@ -928,16 +945,17 @@ public void testRunningToFailed() {
ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(), ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(),
ContainerState.COMPLETE, containerDiagMsg, exitCode); ContainerState.COMPLETE, containerDiagMsg, exitCode);
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs)); appAttemptId, cs, anyNodeId));
// ignored ContainerFinished and Expire at FinalSaving if we were supposed // ignored ContainerFinished and Expire at FinalSaving if we were supposed
// to Failed state. // to Failed state.
assertEquals(RMAppAttemptState.FINAL_SAVING, assertEquals(RMAppAttemptState.FINAL_SAVING,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
amContainer.getId(), ContainerState.COMPLETE, "", 0))); amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.handle(new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
assertEquals(RMAppAttemptState.FINAL_SAVING, assertEquals(RMAppAttemptState.FINAL_SAVING,
@ -947,7 +965,7 @@ public void testRunningToFailed() {
sendAttemptUpdateSavedEvent(applicationAttempt); sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.FAILED, assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, application.getRanNodes().size()); assertEquals(0, application.getRanNodes().size());
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@ -971,10 +989,11 @@ public void testRunningToKilled() {
// ignored ContainerFinished and Expire at FinalSaving if we were supposed // ignored ContainerFinished and Expire at FinalSaving if we were supposed
// to Killed state. // to Killed state.
assertEquals(RMAppAttemptState.FINAL_SAVING, assertEquals(RMAppAttemptState.FINAL_SAVING,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
amContainer.getId(), ContainerState.COMPLETE, "", 0))); amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.handle(new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
assertEquals(RMAppAttemptState.FINAL_SAVING, assertEquals(RMAppAttemptState.FINAL_SAVING,
@ -984,7 +1003,7 @@ public void testRunningToKilled() {
sendAttemptUpdateSavedEvent(applicationAttempt); sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.KILLED, assertEquals(RMAppAttemptState.KILLED,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(1,applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, application.getRanNodes().size()); assertEquals(0, application.getRanNodes().size());
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@ -1144,13 +1163,14 @@ public void testFinishingToFinishing() {
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl, unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
diagnostics); diagnostics);
// container must be AM container to move from FINISHING to FINISHED // container must be AM container to move from FINISHING to FINISHED
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle( applicationAttempt.handle(
new RMAppAttemptContainerFinishedEvent( new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), applicationAttempt.getAppAttemptId(),
BuilderUtils.newContainerStatus( BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId( BuilderUtils.newContainerId(
applicationAttempt.getAppAttemptId(), 42), applicationAttempt.getAppAttemptId(), 42),
ContainerState.COMPLETE, "", 0))); ContainerState.COMPLETE, "", 0), anyNodeId));
testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl, testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl,
diagnostics); diagnostics);
} }
@ -1165,13 +1185,14 @@ public void testSuccessfulFinishingToFinished() {
String diagnostics = "Successful"; String diagnostics = "Successful";
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl, unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
diagnostics); diagnostics);
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle( applicationAttempt.handle(
new RMAppAttemptContainerFinishedEvent( new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), applicationAttempt.getAppAttemptId(),
BuilderUtils.newContainerStatus(amContainer.getId(), BuilderUtils.newContainerStatus(amContainer.getId(),
ContainerState.COMPLETE, "", 0))); ContainerState.COMPLETE, "", 0), anyNodeId));
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
diagnostics, 0, false); diagnostics, 1, false);
} }
// While attempt is at FINAL_SAVING, Contaienr_Finished event may come before // While attempt is at FINAL_SAVING, Contaienr_Finished event may come before
@ -1195,15 +1216,16 @@ public void testSuccessfulFinishingToFinished() {
assertEquals(YarnApplicationAttemptState.RUNNING, assertEquals(YarnApplicationAttemptState.RUNNING,
applicationAttempt.createApplicationAttemptState()); applicationAttempt.createApplicationAttemptState());
// Container_finished event comes before Attempt_Saved event. // Container_finished event comes before Attempt_Saved event.
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
amContainer.getId(), ContainerState.COMPLETE, "", 0))); amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
assertEquals(RMAppAttemptState.FINAL_SAVING, assertEquals(RMAppAttemptState.FINAL_SAVING,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
// send attempt_saved // send attempt_saved
sendAttemptUpdateSavedEvent(applicationAttempt); sendAttemptUpdateSavedEvent(applicationAttempt);
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
diagnostics, 0, false); diagnostics, 1, false);
} }
// While attempt is at FINAL_SAVING, Expire event may come before // While attempt is at FINAL_SAVING, Expire event may come before
@ -1235,6 +1257,71 @@ public void testFinalSavingToFinishedWithExpire() {
diagnostics, 0, false); diagnostics, 0, false);
} }
@Test
public void testFinishedContainer() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
// Complete one container
ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
.getAppAttemptId(), 2);
Container container1 = mock(Container.class);
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
when(container1.getId()).thenReturn(
containerId1);
when(containerStatus1.getContainerId()).thenReturn(containerId1);
when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
application.handle(new RMAppRunningOnNodeEvent(application
.getApplicationId(),
container1.getNodeId()));
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), containerStatus1,
container1.getNodeId()));
ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
// Verify justFinishedContainers
Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers()
.size());
Assert.assertEquals(container1.getId(), applicationAttempt
.getJustFinishedContainers().get(0).getContainerId());
Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
.size());
// Verify finishedContainersSentToAM gets container after pull
List<ContainerStatus> containerStatuses = applicationAttempt
.pullJustFinishedContainers();
Assert.assertEquals(1, containerStatuses.size());
Mockito.verify(rmnodeEventHandler, never()).handle(Mockito
.any(RMNodeEvent.class));
Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty());
Assert.assertEquals(1, getFinishedContainersSentToAM(applicationAttempt)
.size());
// Verify container is acked to NM via the RMNodeEvent after second pull
containerStatuses = applicationAttempt.pullJustFinishedContainers();
Assert.assertEquals(0, containerStatuses.size());
Mockito.verify(rmnodeEventHandler).handle(captor.capture());
Assert.assertEquals(container1.getId(), captor.getValue().getContainers()
.get(0));
Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty());
Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
.size());
}
private static List<ContainerStatus> getFinishedContainersSentToAM(
RMAppAttempt applicationAttempt) {
List<ContainerStatus> containers = new ArrayList<ContainerStatus>();
for (List<ContainerStatus> containerStatuses: applicationAttempt
.getFinishedContainersSentToAMReference().values()) {
containers.addAll(containerStatuses);
}
return containers;
}
// this is to test user can get client tokens only after the client token // this is to test user can get client tokens only after the client token
// master key is saved in the state store and also registered in // master key is saved in the state store and also registered in
// ClientTokenSecretManager // ClientTokenSecretManager
@ -1281,8 +1368,9 @@ public void testFailedToFailed() {
ContainerStatus.newInstance(amContainer.getId(), ContainerStatus.newInstance(amContainer.getId(),
ContainerState.COMPLETE, "some error", 123); ContainerState.COMPLETE, "some error", 123);
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs1)); appAttemptId, cs1, anyNodeId));
assertEquals(YarnApplicationAttemptState.RUNNING, assertEquals(YarnApplicationAttemptState.RUNNING,
applicationAttempt.createApplicationAttemptState()); applicationAttempt.createApplicationAttemptState());
sendAttemptUpdateSavedEvent(applicationAttempt); sendAttemptUpdateSavedEvent(applicationAttempt);
@ -1293,15 +1381,21 @@ public void testFailedToFailed() {
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
// failed attempt captured the container finished event. // failed attempt captured the container finished event.
assertEquals(0, applicationAttempt.getJustFinishedContainers().size()); assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
ContainerStatus cs2 = ContainerStatus cs2 =
ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2), ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
ContainerState.COMPLETE, "", 0); ContainerState.COMPLETE, "", 0);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs2)); appAttemptId, cs2, anyNodeId));
assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
assertEquals(cs2.getContainerId(), applicationAttempt boolean found = false;
.getJustFinishedContainers().get(0).getContainerId()); for (ContainerStatus containerStatus:applicationAttempt
.getJustFinishedContainers()) {
if (cs2.getContainerId().equals(containerStatus.getContainerId())) {
found = true;
}
}
assertTrue(found);
} }
@ -1322,8 +1416,9 @@ scheduler, masterService, submissionContext, new Configuration(),
ContainerStatus.newInstance(amContainer.getId(), ContainerStatus.newInstance(amContainer.getId(),
ContainerState.COMPLETE, "some error", 123); ContainerState.COMPLETE, "some error", 123);
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs1)); appAttemptId, cs1, anyNodeId));
assertEquals(YarnApplicationAttemptState.RUNNING, assertEquals(YarnApplicationAttemptState.RUNNING,
applicationAttempt.createApplicationAttemptState()); applicationAttempt.createApplicationAttemptState());
sendAttemptUpdateSavedEvent(applicationAttempt); sendAttemptUpdateSavedEvent(applicationAttempt);

View File

@ -161,7 +161,7 @@ public void testTokenExpiry() throws Exception {
.getEventHandler() .getEventHandler()
.handle( .handle(
new RMAppAttemptContainerFinishedEvent(applicationAttemptId, new RMAppAttemptContainerFinishedEvent(applicationAttemptId,
containerStatus)); containerStatus, nm1.getNodeId()));
// Make sure the RMAppAttempt is at Finished State. // Make sure the RMAppAttempt is at Finished State.
// Both AMRMToken and ClientToAMToken have been removed. // Both AMRMToken and ClientToAMToken have been removed.