MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs may subsequently report as running. (Contributed by Vinod Kumar Vavilapalli)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1231297 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
be33949bef
commit
78ff0b720e
@ -481,6 +481,9 @@ Release 0.23.1 - Unreleased
|
||||
MAPREDUCE-3625. CapacityScheduler web-ui display of queue's used capacity is broken.
|
||||
(Jason Lowe via mahadev)
|
||||
|
||||
MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs
|
||||
may subsequently report as running. (Vinod Kumar Vavilapalli via sseth)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -205,6 +205,17 @@ public static NodeId newNodeId(String host, int port) {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
public static ContainerStatus newContainerStatus(ContainerId containerId,
|
||||
ContainerState containerState, String diagnostics, int exitStatus) {
|
||||
ContainerStatus containerStatus = recordFactory
|
||||
.newRecordInstance(ContainerStatus.class);
|
||||
containerStatus.setState(containerState);
|
||||
containerStatus.setContainerId(containerId);
|
||||
containerStatus.setDiagnostics(diagnostics);
|
||||
containerStatus.setExitStatus(exitStatus);
|
||||
return containerStatus;
|
||||
}
|
||||
|
||||
public static Container newContainer(ContainerId containerId,
|
||||
NodeId nodeId, String nodeHttpAddress,
|
||||
Resource resource, Priority priority, ContainerToken containerToken) {
|
||||
|
@ -65,6 +65,7 @@
|
||||
import org.apache.hadoop.yarn.state.SingleArcTransition;
|
||||
import org.apache.hadoop.yarn.state.StateMachine;
|
||||
import org.apache.hadoop.yarn.state.StateMachineFactory;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
public class ContainerImpl implements Container {
|
||||
@ -370,13 +371,8 @@ public ContainerLaunchContext getLaunchContext() {
|
||||
public ContainerStatus cloneAndGetContainerStatus() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
ContainerStatus containerStatus =
|
||||
recordFactory.newRecordInstance(ContainerStatus.class);
|
||||
containerStatus.setState(getCurrentState());
|
||||
containerStatus.setContainerId(this.launchContext.getContainerId());
|
||||
containerStatus.setDiagnostics(diagnostics.toString());
|
||||
containerStatus.setExitStatus(exitCode);
|
||||
return containerStatus;
|
||||
return BuilderUtils.newContainerStatus(this.getContainerID(),
|
||||
getCurrentState(), diagnostics.toString(), exitCode);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
@ -67,16 +67,16 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
|
||||
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
|
||||
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
|
||||
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
|
||||
import org.apache.hadoop.yarn.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.service.Service;
|
||||
import org.apache.hadoop.yarn.webapp.WebApp;
|
||||
import org.apache.hadoop.yarn.webapp.WebApps;
|
||||
import org.apache.hadoop.yarn.webapp.WebApps.Builder;
|
||||
import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
|
||||
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
|
||||
import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
|
||||
import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
|
||||
|
||||
/**
|
||||
* The ResourceManager is the main class that is a set of components.
|
||||
@ -256,7 +256,7 @@ protected RMAppManager createRMAppManager() {
|
||||
}
|
||||
|
||||
@Private
|
||||
public static final class SchedulerEventDispatcher extends AbstractService
|
||||
public static class SchedulerEventDispatcher extends AbstractService
|
||||
implements EventHandler<SchedulerEvent> {
|
||||
|
||||
private final ResourceScheduler scheduler;
|
||||
|
@ -265,8 +265,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
HeartbeatResponse latestResponse = recordFactory
|
||||
.newRecordInstance(HeartbeatResponse.class);
|
||||
latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
|
||||
latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
|
||||
latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
|
||||
latestResponse.addAllContainersToCleanup(rmNode.getContainersToCleanUp());
|
||||
latestResponse.addAllApplicationsToCleanup(rmNode.getAppsToCleanup());
|
||||
latestResponse.setNodeAction(NodeAction.NORMAL);
|
||||
|
||||
// 4. Send status to RMNode, saving the latest response.
|
||||
|
@ -101,9 +101,9 @@ public interface RMNode {
|
||||
|
||||
public RMNodeState getState();
|
||||
|
||||
public List<ContainerId> pullContainersToCleanUp();
|
||||
public List<ContainerId> getContainersToCleanUp();
|
||||
|
||||
public List<ApplicationId> pullAppsToCleanup();
|
||||
public List<ApplicationId> getAppsToCleanup();
|
||||
|
||||
public HeartbeatResponse getLastHeartBeatResponse();
|
||||
}
|
@ -89,7 +89,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||
/* set of containers that have just launched */
|
||||
private final Map<ContainerId, ContainerStatus> justLaunchedContainers =
|
||||
new HashMap<ContainerId, ContainerStatus>();
|
||||
|
||||
|
||||
/* set of containers that need to be cleaned */
|
||||
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
|
||||
@ -248,54 +247,38 @@ public RMNodeState getState() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationId> pullAppsToCleanup() {
|
||||
this.writeLock.lock();
|
||||
|
||||
try {
|
||||
List<ApplicationId> lastfinishedApplications = new ArrayList<ApplicationId>();
|
||||
lastfinishedApplications.addAll(this.finishedApplications);
|
||||
this.finishedApplications.clear();
|
||||
return lastfinishedApplications;
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Private
|
||||
public List<ContainerId> getContainersToCleanUp() {
|
||||
public List<ApplicationId> getAppsToCleanup() {
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
return new ArrayList<ContainerId>(containersToClean);
|
||||
return new ArrayList<ApplicationId>(this.finishedApplications);
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerId> pullContainersToCleanUp() {
|
||||
public List<ContainerId> getContainersToCleanUp() {
|
||||
|
||||
this.writeLock.lock();
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
List<ContainerId> containersToCleanUp = new ArrayList<ContainerId>();
|
||||
containersToCleanUp.addAll(this.containersToClean);
|
||||
this.containersToClean.clear();
|
||||
return containersToCleanUp;
|
||||
return new ArrayList<ContainerId>(this.containersToClean);
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
this.readLock.unlock();
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public HeartbeatResponse getLastHeartBeatResponse() {
|
||||
|
||||
this.writeLock.lock();
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
return this.latestHeartBeatResponse;
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -407,14 +390,22 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
|
||||
ContainerId containerId = remoteContainer.getContainerId();
|
||||
|
||||
// Don't bother with containers already scheduled for cleanup,
|
||||
// the scheduler doens't need to know any more about this container
|
||||
// Don't bother with containers already scheduled for cleanup, or for
|
||||
// applications already killed. The scheduler doens't need to know any
|
||||
// more about this container
|
||||
if (rmNode.containersToClean.contains(containerId)) {
|
||||
LOG.info("Container " + containerId + " already scheduled for " +
|
||||
"cleanup, no further processing");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (rmNode.finishedApplications.contains(containerId
|
||||
.getApplicationAttemptId().getApplicationId())) {
|
||||
LOG.info("Container " + containerId
|
||||
+ " belongs to an application that is already killed,"
|
||||
+ " no further processing");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Process running containers
|
||||
if (remoteContainer.getState() == ContainerState.RUNNING) {
|
||||
if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
|
||||
@ -435,6 +426,12 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||
|
||||
rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
|
||||
statusEvent.getKeepAliveAppIds());
|
||||
|
||||
// HeartBeat processing from our end is done, as node pulls the following
|
||||
// lists before sending status-updates. Clear data-structures
|
||||
rmNode.containersToClean.clear();
|
||||
rmNode.finishedApplications.clear();
|
||||
|
||||
return RMNodeState.RUNNING;
|
||||
}
|
||||
}
|
||||
|
@ -39,9 +39,9 @@
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
@ -52,6 +52,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
|
||||
import com.google.common.collect.HashMultiset;
|
||||
import com.google.common.collect.Multiset;
|
||||
@ -61,6 +62,7 @@
|
||||
* Each running Application in the RM corresponds to one instance
|
||||
* of this class.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public class SchedulerApp {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(SchedulerApp.class);
|
||||
@ -174,13 +176,20 @@ public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
|
||||
this.appSchedulingInfo.stop(rmAppAttemptFinalState);
|
||||
}
|
||||
|
||||
synchronized public void containerLaunchedOnNode(ContainerId containerId) {
|
||||
public synchronized void containerLaunchedOnNode(ContainerId containerId,
|
||||
NodeId nodeId) {
|
||||
// Inform the container
|
||||
RMContainer rmContainer =
|
||||
getRMContainer(containerId);
|
||||
rmContainer.handle(
|
||||
new RMContainerEvent(containerId,
|
||||
RMContainerEventType.LAUNCHED));
|
||||
if (rmContainer == null) {
|
||||
// Some unknown container sneaked into the system. Kill it.
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMNodeCleanContainerEvent(nodeId, containerId));
|
||||
return;
|
||||
}
|
||||
|
||||
rmContainer.handle(new RMContainerEvent(containerId,
|
||||
RMContainerEventType.LAUNCHED));
|
||||
}
|
||||
|
||||
synchronized public void containerCompleted(RMContainer rmContainer,
|
||||
|
@ -57,6 +57,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
@ -76,6 +77,7 @@
|
||||
|
||||
@LimitedPrivate("yarn")
|
||||
@Evolving
|
||||
@SuppressWarnings("unchecked")
|
||||
public class CapacityScheduler
|
||||
implements ResourceScheduler, CapacitySchedulerContext {
|
||||
|
||||
@ -588,10 +590,12 @@ private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node
|
||||
LOG.info("Unknown application: " + applicationAttemptId +
|
||||
" launched container " + containerId +
|
||||
" on node: " + node);
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
||||
return;
|
||||
}
|
||||
|
||||
application.containerLaunchedOnNode(containerId);
|
||||
application.containerLaunchedOnNode(containerId, node.getNodeID());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -65,6 +65,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||
@ -87,6 +88,7 @@
|
||||
|
||||
@LimitedPrivate("yarn")
|
||||
@Evolving
|
||||
@SuppressWarnings("unchecked")
|
||||
public class FifoScheduler implements ResourceScheduler {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
|
||||
@ -282,7 +284,6 @@ private SchedulerNode getNode(NodeId nodeId) {
|
||||
return nodes.get(nodeId);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private synchronized void addApplication(ApplicationAttemptId appAttemptId,
|
||||
String user) {
|
||||
// TODO: Fix store
|
||||
@ -655,10 +656,14 @@ private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node
|
||||
LOG.info("Unknown application: " + applicationAttemptId +
|
||||
" launched container " + containerId +
|
||||
" on node: " + node);
|
||||
// Some unknown container sneaked into the system. Kill it.
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
application.containerLaunchedOnNode(containerId);
|
||||
application.containerLaunchedOnNode(containerId, node.getNodeID());
|
||||
}
|
||||
|
||||
@Lock(FifoScheduler.class)
|
||||
|
@ -39,15 +39,17 @@ public class MockNM {
|
||||
|
||||
private int responseId;
|
||||
private NodeId nodeId;
|
||||
private final String nodeIdStr;
|
||||
private final int memory;
|
||||
private final ResourceTrackerService resourceTracker;
|
||||
private final int httpPort = 2;
|
||||
|
||||
MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
|
||||
this.nodeIdStr = nodeIdStr;
|
||||
this.memory = memory;
|
||||
this.resourceTracker = resourceTracker;
|
||||
String[] splits = nodeIdStr.split(":");
|
||||
nodeId = Records.newRecord(NodeId.class);
|
||||
nodeId.setHost(splits[0]);
|
||||
nodeId.setPort(Integer.parseInt(splits[1]));
|
||||
}
|
||||
|
||||
public NodeId getNodeId() {
|
||||
@ -63,14 +65,10 @@ public void containerStatus(Container container) throws Exception {
|
||||
new HashMap<ApplicationId, List<ContainerStatus>>();
|
||||
conts.put(container.getId().getApplicationAttemptId().getApplicationId(),
|
||||
Arrays.asList(new ContainerStatus[] { container.getContainerStatus() }));
|
||||
nodeHeartbeat(conts, true,nodeId);
|
||||
nodeHeartbeat(conts, true);
|
||||
}
|
||||
|
||||
public NodeId registerNode() throws Exception {
|
||||
String[] splits = nodeIdStr.split(":");
|
||||
nodeId = Records.newRecord(NodeId.class);
|
||||
nodeId.setHost(splits[0]);
|
||||
nodeId.setPort(Integer.parseInt(splits[1]));
|
||||
RegisterNodeManagerRequest req = Records.newRecord(
|
||||
RegisterNodeManagerRequest.class);
|
||||
req.setNodeId(nodeId);
|
||||
@ -83,11 +81,11 @@ public NodeId registerNode() throws Exception {
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
|
||||
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b,nodeId);
|
||||
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b);
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
List<ContainerStatus>> conts, boolean isHealthy, NodeId nodeId) throws Exception {
|
||||
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
|
||||
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
|
||||
NodeStatus status = Records.newRecord(NodeStatus.class);
|
||||
status.setNodeId(nodeId);
|
||||
|
@ -152,13 +152,13 @@ public RMNodeState getState() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationId> pullAppsToCleanup() {
|
||||
public List<ApplicationId> getAppsToCleanup() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerId> pullContainersToCleanUp() {
|
||||
public List<ContainerId> getContainersToCleanUp() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
@ -19,26 +19,39 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Test;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
public class TestApplicationCleanup {
|
||||
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(TestApplicationCleanup.class);
|
||||
|
||||
@Test
|
||||
public void testAppCleanup() throws Exception {
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
@ -67,11 +80,13 @@ public void testAppCleanup() throws Exception {
|
||||
List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||
int contReceived = conts.size();
|
||||
while (contReceived < request) {
|
||||
int waitCount = 0;
|
||||
while (contReceived < request && waitCount++ < 20) {
|
||||
conts = am.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||
contReceived += conts.size();
|
||||
Log.info("Got " + contReceived + " containers. Waiting to get " + request);
|
||||
LOG.info("Got " + contReceived + " containers. Waiting to get "
|
||||
+ request);
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
Assert.assertEquals(request, conts.size());
|
||||
@ -86,11 +101,12 @@ public void testAppCleanup() throws Exception {
|
||||
|
||||
//currently only containers are cleaned via this
|
||||
//AM container is cleaned via container launcher
|
||||
while (cleanedConts < 2 || cleanedApps < 1) {
|
||||
waitCount = 0;
|
||||
while ((cleanedConts < 3 || cleanedApps < 1) && waitCount++ < 20) {
|
||||
HeartbeatResponse resp = nm1.nodeHeartbeat(true);
|
||||
contsToClean = resp.getContainersToCleanupList();
|
||||
apps = resp.getApplicationsToCleanupList();
|
||||
Log.info("Waiting to get cleanup events.. cleanedConts: "
|
||||
LOG.info("Waiting to get cleanup events.. cleanedConts: "
|
||||
+ cleanedConts + " cleanedApps: " + cleanedApps);
|
||||
cleanedConts += contsToClean.size();
|
||||
cleanedApps += apps.size();
|
||||
@ -99,6 +115,130 @@ public void testAppCleanup() throws Exception {
|
||||
|
||||
Assert.assertEquals(1, apps.size());
|
||||
Assert.assertEquals(app.getApplicationId(), apps.get(0));
|
||||
Assert.assertEquals(1, cleanedApps);
|
||||
Assert.assertEquals(3, cleanedConts);
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerCleanup() throws Exception {
|
||||
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
rootLogger.setLevel(Level.DEBUG);
|
||||
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||
MockRM rm = new MockRM() {
|
||||
@Override
|
||||
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
||||
return new SchedulerEventDispatcher(this.scheduler) {
|
||||
@Override
|
||||
public void handle(SchedulerEvent event) {
|
||||
scheduler.handle(event);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return dispatcher;
|
||||
}
|
||||
};
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 5000);
|
||||
|
||||
RMApp app = rm.submitApp(2000);
|
||||
|
||||
//kick the scheduling
|
||||
nm1.nodeHeartbeat(true);
|
||||
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
am.registerAppAttempt();
|
||||
|
||||
//request for containers
|
||||
int request = 2;
|
||||
am.allocate("h1" , 1000, request,
|
||||
new ArrayList<ContainerId>());
|
||||
dispatcher.await();
|
||||
|
||||
//kick the scheduler
|
||||
nm1.nodeHeartbeat(true);
|
||||
List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||
int contReceived = conts.size();
|
||||
int waitCount = 0;
|
||||
while (contReceived < request && waitCount++ < 20) {
|
||||
conts = am.allocate(new ArrayList<ResourceRequest>(),
|
||||
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||
dispatcher.await();
|
||||
contReceived += conts.size();
|
||||
LOG.info("Got " + contReceived + " containers. Waiting to get "
|
||||
+ request);
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
Assert.assertEquals(request, conts.size());
|
||||
|
||||
// Release a container.
|
||||
ArrayList<ContainerId> release = new ArrayList<ContainerId>();
|
||||
release.add(conts.get(1).getId());
|
||||
am.allocate(new ArrayList<ResourceRequest>(), release);
|
||||
dispatcher.await();
|
||||
|
||||
// Send one more heartbeat with a fake running container. This is to
|
||||
// simulate the situation that can happen if the NM reports that container
|
||||
// is running in the same heartbeat when the RM asks it to clean it up.
|
||||
Map<ApplicationId, List<ContainerStatus>> containerStatuses =
|
||||
new HashMap<ApplicationId, List<ContainerStatus>>();
|
||||
ArrayList<ContainerStatus> containerStatusList =
|
||||
new ArrayList<ContainerStatus>();
|
||||
containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(1)
|
||||
.getId(), ContainerState.RUNNING, "nothing", 0));
|
||||
containerStatuses.put(app.getApplicationId(), containerStatusList);
|
||||
|
||||
HeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
|
||||
dispatcher.await();
|
||||
List<ContainerId> contsToClean = resp.getContainersToCleanupList();
|
||||
int cleanedConts = contsToClean.size();
|
||||
waitCount = 0;
|
||||
while (cleanedConts < 1 && waitCount++ < 20) {
|
||||
resp = nm1.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
contsToClean = resp.getContainersToCleanupList();
|
||||
LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
|
||||
cleanedConts += contsToClean.size();
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
LOG.info("Got cleanup for " + contsToClean.get(0));
|
||||
Assert.assertEquals(1, cleanedConts);
|
||||
|
||||
// Now to test the case when RM already gave cleanup, and NM suddenly
|
||||
// realizes that the container is running.
|
||||
LOG.info("Testing container launch much after release and "
|
||||
+ "NM getting cleanup");
|
||||
containerStatuses.clear();
|
||||
containerStatusList.clear();
|
||||
containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(1)
|
||||
.getId(), ContainerState.RUNNING, "nothing", 0));
|
||||
containerStatuses.put(app.getApplicationId(), containerStatusList);
|
||||
|
||||
resp = nm1.nodeHeartbeat(containerStatuses, true);
|
||||
dispatcher.await();
|
||||
contsToClean = resp.getContainersToCleanupList();
|
||||
cleanedConts = contsToClean.size();
|
||||
// The cleanup list won't be instantaneous as it is given out by scheduler
|
||||
// and not RMNodeImpl.
|
||||
waitCount = 0;
|
||||
while (cleanedConts < 1 && waitCount++ < 20) {
|
||||
resp = nm1.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
contsToClean = resp.getContainersToCleanupList();
|
||||
LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
|
||||
cleanedConts += contsToClean.size();
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
LOG.info("Got cleanup for " + contsToClean.get(0));
|
||||
Assert.assertEquals(1, cleanedConts);
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
@ -164,8 +164,7 @@ public void testReboot() throws Exception {
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||
|
||||
nodeHeartbeat = nm2.nodeHeartbeat(
|
||||
new HashMap<ApplicationId, List<ContainerStatus>>(), true,
|
||||
recordFactory.newRecordInstance(NodeId.class));
|
||||
new HashMap<ApplicationId, List<ContainerStatus>>(), true);
|
||||
Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
|
||||
checkRebootedNMCount(rm, ++initialMetricCount);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user