From 78ff0b720e0418785d53802a1b4e72085c1a3556 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Fri, 13 Jan 2012 21:15:22 +0000 Subject: [PATCH 01/11] MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs may subsequently report as running. (Contributed by Vinod Kumar Vavilapalli) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1231297 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../apache/hadoop/yarn/util/BuilderUtils.java | 11 ++ .../container/ContainerImpl.java | 10 +- .../resourcemanager/ResourceManager.java | 10 +- .../ResourceTrackerService.java | 4 +- .../server/resourcemanager/rmnode/RMNode.java | 4 +- .../resourcemanager/rmnode/RMNodeImpl.java | 57 ++++--- .../scheduler/SchedulerApp.java | 19 ++- .../scheduler/capacity/CapacityScheduler.java | 6 +- .../scheduler/fifo/FifoScheduler.java | 9 +- .../yarn/server/resourcemanager/MockNM.java | 16 +- .../server/resourcemanager/MockNodes.java | 4 +- .../TestApplicationCleanup.java | 150 +++++++++++++++++- .../TestResourceTrackerService.java | 3 +- 14 files changed, 234 insertions(+), 72 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ea08bb1682..8e4ec55b58 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -481,6 +481,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3625. CapacityScheduler web-ui display of queue's used capacity is broken. (Jason Lowe via mahadev) + MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs + may subsequently report as running. (Vinod Kumar Vavilapalli via sseth) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java index 0650e206fe..8edade260d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java @@ -205,6 +205,17 @@ public static NodeId newNodeId(String host, int port) { return nodeId; } + public static ContainerStatus newContainerStatus(ContainerId containerId, + ContainerState containerState, String diagnostics, int exitStatus) { + ContainerStatus containerStatus = recordFactory + .newRecordInstance(ContainerStatus.class); + containerStatus.setState(containerState); + containerStatus.setContainerId(containerId); + containerStatus.setDiagnostics(diagnostics); + containerStatus.setExitStatus(exitStatus); + return containerStatus; + } + public static Container newContainer(ContainerId containerId, NodeId nodeId, String nodeHttpAddress, Resource resource, Priority priority, ContainerToken containerToken) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index c6ba07aa7d..2c2d2baaa4 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; public class ContainerImpl implements Container { @@ -370,13 +371,8 @@ public ContainerLaunchContext getLaunchContext() { public ContainerStatus cloneAndGetContainerStatus() { this.readLock.lock(); try { - ContainerStatus containerStatus = - recordFactory.newRecordInstance(ContainerStatus.class); - containerStatus.setState(getCurrentState()); - containerStatus.setContainerId(this.launchContext.getContainerId()); - containerStatus.setDiagnostics(diagnostics.toString()); - containerStatus.setExitStatus(exitCode); - return containerStatus; + return BuilderUtils.newContainerStatus(this.getContainerID(), + getCurrentState(), diagnostics.toString(), exitCode); } finally { this.readLock.unlock(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 1a69bbfc8b..18bdf8dbea 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -67,16 +67,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; +import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; +import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; +import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; import org.apache.hadoop.yarn.webapp.WebApps.Builder; -import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; -import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; -import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet; -import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; /** * The ResourceManager is the main class that is a set of components. @@ -256,7 +256,7 @@ protected RMAppManager createRMAppManager() { } @Private - public static final class SchedulerEventDispatcher extends AbstractService + public static class SchedulerEventDispatcher extends AbstractService implements EventHandler { private final ResourceScheduler scheduler; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 58697486d5..ccebe3a890 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -265,8 +265,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) HeartbeatResponse latestResponse = recordFactory .newRecordInstance(HeartbeatResponse.class); latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1); - latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp()); - latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup()); + latestResponse.addAllContainersToCleanup(rmNode.getContainersToCleanUp()); + latestResponse.addAllApplicationsToCleanup(rmNode.getAppsToCleanup()); latestResponse.setNodeAction(NodeAction.NORMAL); // 4. Send status to RMNode, saving the latest response. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 494dffcf25..8dda6eda1d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -101,9 +101,9 @@ public interface RMNode { public RMNodeState getState(); - public List pullContainersToCleanUp(); + public List getContainersToCleanUp(); - public List pullAppsToCleanup(); + public List getAppsToCleanup(); public HeartbeatResponse getLastHeartBeatResponse(); } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 2cadd89071..dd3e25fe16 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -89,7 +89,6 @@ public class RMNodeImpl implements RMNode, EventHandler { /* set of containers that have just launched */ private final Map justLaunchedContainers = new HashMap(); - /* set of containers that need to be cleaned */ private final Set containersToClean = new TreeSet( @@ -248,54 +247,38 @@ public RMNodeState getState() { } @Override - public List pullAppsToCleanup() { - this.writeLock.lock(); - - try { - List lastfinishedApplications = new ArrayList(); - lastfinishedApplications.addAll(this.finishedApplications); - this.finishedApplications.clear(); - return lastfinishedApplications; - } finally { - this.writeLock.unlock(); - } - - } - - @Private - public List getContainersToCleanUp() { + public List getAppsToCleanup() { this.readLock.lock(); + try { - return new ArrayList(containersToClean); + return new ArrayList(this.finishedApplications); } finally { this.readLock.unlock(); } + } @Override - public List pullContainersToCleanUp() { + public List getContainersToCleanUp() { - this.writeLock.lock(); + this.readLock.lock(); try { - List containersToCleanUp = new ArrayList(); - containersToCleanUp.addAll(this.containersToClean); - this.containersToClean.clear(); - return containersToCleanUp; + return new ArrayList(this.containersToClean); } finally { - this.writeLock.unlock(); + this.readLock.unlock(); } }; @Override public HeartbeatResponse getLastHeartBeatResponse() { - this.writeLock.lock(); + this.readLock.lock(); try { return this.latestHeartBeatResponse; } finally { - this.writeLock.unlock(); + this.readLock.unlock(); } } @@ -407,14 +390,22 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { for (ContainerStatus remoteContainer : statusEvent.getContainers()) { ContainerId containerId = remoteContainer.getContainerId(); - // Don't bother with containers already scheduled for cleanup, - // the scheduler doens't need to know any more about this container + // Don't bother with containers already scheduled for cleanup, or for + // applications already killed. The scheduler doens't need to know any + // more about this container if (rmNode.containersToClean.contains(containerId)) { LOG.info("Container " + containerId + " already scheduled for " + "cleanup, no further processing"); continue; } - + if (rmNode.finishedApplications.contains(containerId + .getApplicationAttemptId().getApplicationId())) { + LOG.info("Container " + containerId + + " belongs to an application that is already killed," + + " no further processing"); + continue; + } + // Process running containers if (remoteContainer.getState() == ContainerState.RUNNING) { if (!rmNode.justLaunchedContainers.containsKey(containerId)) { @@ -435,6 +426,12 @@ public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications( statusEvent.getKeepAliveAppIds()); + + // HeartBeat processing from our end is done, as node pulls the following + // lists before sending status-updates. Clear data-structures + rmNode.containersToClean.clear(); + rmNode.finishedApplications.clear(); + return RMNodeState.RUNNING; } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java index e3798c0def..94ddb2af8a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java @@ -39,9 +39,9 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; @@ -61,6 +62,7 @@ * Each running Application in the RM corresponds to one instance * of this class. */ +@SuppressWarnings("unchecked") public class SchedulerApp { private static final Log LOG = LogFactory.getLog(SchedulerApp.class); @@ -174,13 +176,20 @@ public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { this.appSchedulingInfo.stop(rmAppAttemptFinalState); } - synchronized public void containerLaunchedOnNode(ContainerId containerId) { + public synchronized void containerLaunchedOnNode(ContainerId containerId, + NodeId nodeId) { // Inform the container RMContainer rmContainer = getRMContainer(containerId); - rmContainer.handle( - new RMContainerEvent(containerId, - RMContainerEventType.LAUNCHED)); + if (rmContainer == null) { + // Some unknown container sneaked into the system. Kill it. + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); + return; + } + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.LAUNCHED)); } synchronized public void containerCompleted(RMContainer rmContainer, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index a542d0339c..364494b76c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -76,6 +77,7 @@ @LimitedPrivate("yarn") @Evolving +@SuppressWarnings("unchecked") public class CapacityScheduler implements ResourceScheduler, CapacitySchedulerContext { @@ -588,10 +590,12 @@ private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + " on node: " + node); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); return; } - application.containerLaunchedOnNode(containerId); + application.containerLaunchedOnNode(containerId, node.getNodeID()); } @Override diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 8e274956b5..0f6a8a84c8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -87,6 +88,7 @@ @LimitedPrivate("yarn") @Evolving +@SuppressWarnings("unchecked") public class FifoScheduler implements ResourceScheduler { private static final Log LOG = LogFactory.getLog(FifoScheduler.class); @@ -282,7 +284,6 @@ private SchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } - @SuppressWarnings("unchecked") private synchronized void addApplication(ApplicationAttemptId appAttemptId, String user) { // TODO: Fix store @@ -655,10 +656,14 @@ private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + " on node: " + node); + // Some unknown container sneaked into the system. Kill it. + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + return; } - application.containerLaunchedOnNode(containerId); + application.containerLaunchedOnNode(containerId, node.getNodeID()); } @Lock(FifoScheduler.class) diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index d5e87f137c..bd44f10b9e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -39,15 +39,17 @@ public class MockNM { private int responseId; private NodeId nodeId; - private final String nodeIdStr; private final int memory; private final ResourceTrackerService resourceTracker; private final int httpPort = 2; MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) { - this.nodeIdStr = nodeIdStr; this.memory = memory; this.resourceTracker = resourceTracker; + String[] splits = nodeIdStr.split(":"); + nodeId = Records.newRecord(NodeId.class); + nodeId.setHost(splits[0]); + nodeId.setPort(Integer.parseInt(splits[1])); } public NodeId getNodeId() { @@ -63,14 +65,10 @@ public void containerStatus(Container container) throws Exception { new HashMap>(); conts.put(container.getId().getApplicationAttemptId().getApplicationId(), Arrays.asList(new ContainerStatus[] { container.getContainerStatus() })); - nodeHeartbeat(conts, true,nodeId); + nodeHeartbeat(conts, true); } public NodeId registerNode() throws Exception { - String[] splits = nodeIdStr.split(":"); - nodeId = Records.newRecord(NodeId.class); - nodeId.setHost(splits[0]); - nodeId.setPort(Integer.parseInt(splits[1])); RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); @@ -83,11 +81,11 @@ public NodeId registerNode() throws Exception { } public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception { - return nodeHeartbeat(new HashMap>(), b,nodeId); + return nodeHeartbeat(new HashMap>(), b); } public HeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy, NodeId nodeId) throws Exception { + List> conts, boolean isHealthy) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setNodeId(nodeId); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index a7df0229f1..90b43504c1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -152,13 +152,13 @@ public RMNodeState getState() { } @Override - public List pullAppsToCleanup() { + public List getAppsToCleanup() { // TODO Auto-generated method stub return null; } @Override - public List pullContainersToCleanUp() { + public List getContainersToCleanUp() { // TODO Auto-generated method stub return null; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 86bf29055b..ae2c07f048 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -19,26 +19,39 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.Test; -import org.mortbay.log.Log; public class TestApplicationCleanup { + private static final Log LOG = LogFactory + .getLog(TestApplicationCleanup.class); + @Test public void testAppCleanup() throws Exception { Logger rootLogger = LogManager.getRootLogger(); @@ -67,11 +80,13 @@ public void testAppCleanup() throws Exception { List conts = am.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); int contReceived = conts.size(); - while (contReceived < request) { + int waitCount = 0; + while (contReceived < request && waitCount++ < 20) { conts = am.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); contReceived += conts.size(); - Log.info("Got " + contReceived + " containers. Waiting to get " + request); + LOG.info("Got " + contReceived + " containers. Waiting to get " + + request); Thread.sleep(2000); } Assert.assertEquals(request, conts.size()); @@ -86,11 +101,12 @@ public void testAppCleanup() throws Exception { //currently only containers are cleaned via this //AM container is cleaned via container launcher - while (cleanedConts < 2 || cleanedApps < 1) { + waitCount = 0; + while ((cleanedConts < 3 || cleanedApps < 1) && waitCount++ < 20) { HeartbeatResponse resp = nm1.nodeHeartbeat(true); contsToClean = resp.getContainersToCleanupList(); apps = resp.getApplicationsToCleanupList(); - Log.info("Waiting to get cleanup events.. cleanedConts: " + LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts + " cleanedApps: " + cleanedApps); cleanedConts += contsToClean.size(); cleanedApps += apps.size(); @@ -99,6 +115,130 @@ public void testAppCleanup() throws Exception { Assert.assertEquals(1, apps.size()); Assert.assertEquals(app.getApplicationId(), apps.get(0)); + Assert.assertEquals(1, cleanedApps); + Assert.assertEquals(3, cleanedConts); + + rm.stop(); + } + + @Test + public void testContainerCleanup() throws Exception { + + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm = new MockRM() { + @Override + protected EventHandler createSchedulerEventDispatcher() { + return new SchedulerEventDispatcher(this.scheduler) { + @Override + public void handle(SchedulerEvent event) { + scheduler.handle(event); + } + }; + } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + + MockNM nm1 = rm.registerNode("h1:1234", 5000); + + RMApp app = rm.submitApp(2000); + + //kick the scheduling + nm1.nodeHeartbeat(true); + + RMAppAttempt attempt = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + + //request for containers + int request = 2; + am.allocate("h1" , 1000, request, + new ArrayList()); + dispatcher.await(); + + //kick the scheduler + nm1.nodeHeartbeat(true); + List conts = am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + int contReceived = conts.size(); + int waitCount = 0; + while (contReceived < request && waitCount++ < 20) { + conts = am.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + dispatcher.await(); + contReceived += conts.size(); + LOG.info("Got " + contReceived + " containers. Waiting to get " + + request); + Thread.sleep(2000); + } + Assert.assertEquals(request, conts.size()); + + // Release a container. + ArrayList release = new ArrayList(); + release.add(conts.get(1).getId()); + am.allocate(new ArrayList(), release); + dispatcher.await(); + + // Send one more heartbeat with a fake running container. This is to + // simulate the situation that can happen if the NM reports that container + // is running in the same heartbeat when the RM asks it to clean it up. + Map> containerStatuses = + new HashMap>(); + ArrayList containerStatusList = + new ArrayList(); + containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(1) + .getId(), ContainerState.RUNNING, "nothing", 0)); + containerStatuses.put(app.getApplicationId(), containerStatusList); + + HeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true); + dispatcher.await(); + List contsToClean = resp.getContainersToCleanupList(); + int cleanedConts = contsToClean.size(); + waitCount = 0; + while (cleanedConts < 1 && waitCount++ < 20) { + resp = nm1.nodeHeartbeat(true); + dispatcher.await(); + contsToClean = resp.getContainersToCleanupList(); + LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); + cleanedConts += contsToClean.size(); + Thread.sleep(1000); + } + LOG.info("Got cleanup for " + contsToClean.get(0)); + Assert.assertEquals(1, cleanedConts); + + // Now to test the case when RM already gave cleanup, and NM suddenly + // realizes that the container is running. + LOG.info("Testing container launch much after release and " + + "NM getting cleanup"); + containerStatuses.clear(); + containerStatusList.clear(); + containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(1) + .getId(), ContainerState.RUNNING, "nothing", 0)); + containerStatuses.put(app.getApplicationId(), containerStatusList); + + resp = nm1.nodeHeartbeat(containerStatuses, true); + dispatcher.await(); + contsToClean = resp.getContainersToCleanupList(); + cleanedConts = contsToClean.size(); + // The cleanup list won't be instantaneous as it is given out by scheduler + // and not RMNodeImpl. + waitCount = 0; + while (cleanedConts < 1 && waitCount++ < 20) { + resp = nm1.nodeHeartbeat(true); + dispatcher.await(); + contsToClean = resp.getContainersToCleanupList(); + LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts); + cleanedConts += contsToClean.size(); + Thread.sleep(1000); + } + LOG.info("Got cleanup for " + contsToClean.get(0)); + Assert.assertEquals(1, cleanedConts); rm.stop(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index d40b7abad8..183396092b 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -164,8 +164,7 @@ public void testReboot() throws Exception { Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); nodeHeartbeat = nm2.nodeHeartbeat( - new HashMap>(), true, - recordFactory.newRecordInstance(NodeId.class)); + new HashMap>(), true); Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction())); checkRebootedNMCount(rm, ++initialMetricCount); } From 0c278b0f636a01c81aba9e46fe7658fcdfb0f33c Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 13 Jan 2012 21:31:40 +0000 Subject: [PATCH 02/11] MAPREDUCE-3656. Fixed a race condition in MR AM which is failing the sort benchmark consistently. Contributed by Siddarth Seth. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1231314 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../mapred/TaskAttemptListenerImpl.java | 51 ++++++++++++------- .../mapreduce/v2/app/TaskAttemptListener.java | 3 +- .../v2/app/TaskHeartbeatHandler.java | 1 + .../v2/app/job/impl/TaskAttemptImpl.java | 2 +- .../mapred/TestTaskAttemptListenerImpl.java | 9 ++-- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 5 +- 7 files changed, 49 insertions(+), 25 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8e4ec55b58..cc6ca36a8a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -484,6 +484,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs may subsequently report as running. (Vinod Kumar Vavilapalli via sseth) + MAPREDUCE-3656. Fixed a race condition in MR AM which is failing the sort + benchmark consistently. (Siddarth Seth via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 0b4ea94e27..6d78a6a8c0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -22,7 +22,9 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -77,6 +79,9 @@ public class TaskAttemptListenerImpl extends CompositeService private ConcurrentMap jvmIDToActiveAttemptMap = new ConcurrentHashMap(); + private Set launchedJVMs = Collections + .newSetFromMap(new ConcurrentHashMap()); + private JobTokenSecretManager jobTokenSecretManager = null; public TaskAttemptListenerImpl(AppContext context, @@ -412,22 +417,28 @@ public JvmTask getTask(JvmContext context) throws IOException { // Try to look up the task. We remove it directly as we don't give // multiple tasks to a JVM - org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap - .remove(wJvmID); - if (task != null) { - LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); - jvmTask = new JvmTask(task, false); - - // remove the task as it is no more needed and free up the memory - // Also we have already told the JVM to process a task, so it is no - // longer pending, and further request should ask it to exit. - } else { + if (!jvmIDToActiveAttemptMap.containsKey(wJvmID)) { LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); jvmTask = TASK_FOR_INVALID_JVM; + } else { + if (!launchedJVMs.contains(wJvmID)) { + jvmTask = null; + LOG.info("JVM with ID: " + jvmId + + " asking for task before AM launch registered. Given null task"); + } else { + // remove the task as it is no more needed and free up the memory. + // Also we have already told the JVM to process a task, so it is no + // longer pending, and further request should ask it to exit. + org.apache.hadoop.mapred.Task task = + jvmIDToActiveAttemptMap.remove(wJvmID); + launchedJVMs.remove(wJvmID); + LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); + jvmTask = new JvmTask(task, false); + } } return jvmTask; } - + @Override public void registerPendingTask( org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { @@ -440,13 +451,12 @@ public void registerPendingTask( @Override public void registerLaunchedTask( - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID) { + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, + WrappedJvmID jvmId) { + // The AM considers the task to be launched (Has asked the NM to launch it) + // The JVM will only be given a task after this registartion. + launchedJVMs.add(jvmId); - // The task is launched. Register this for expiry-tracking. - - // Timing can cause this to happen after the real JVM launches and gets a - // task which is still fine as we will only be tracking for expiry a little - // late than usual. taskHeartbeatHandler.register(attemptID); } @@ -459,7 +469,12 @@ public void unregister( // registration. Events are ordered at TaskAttempt, so unregistration will // always come after registration. - // remove the mapping if not already removed + // Remove from launchedJVMs before jvmIDToActiveAttemptMap to avoid + // synchronization issue with getTask(). getTask should be checking + // jvmIDToActiveAttemptMap before it checks launchedJVMs. + + // remove the mappings if not already removed + launchedJVMs.remove(jvmID); jvmIDToActiveAttemptMap.remove(jvmID); //unregister this attempt diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java index 7002e69d52..1d2a0a4061 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java @@ -45,8 +45,9 @@ public interface TaskAttemptListener { * * @param attemptID * the id of the attempt for this JVM. + * @param jvmID the ID of the JVM. */ - void registerLaunchedTask(TaskAttemptId attemptID); + void registerLaunchedTask(TaskAttemptId attemptID, WrappedJvmID jvmID); /** * Unregister the JVM and the attempt associated with it. This should be diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java index a9e8be6258..b827a2cdf3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java @@ -93,6 +93,7 @@ public void stop() { public void receivedPing(TaskAttemptId attemptID) { //only put for the registered attempts + //TODO throw an exception if the task isn't registered. runningAttempts.replace(attemptID, clock.getTime()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index d9ed1e53f8..b296d02d55 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1201,7 +1201,7 @@ public void transition(TaskAttemptImpl taskAttempt, // register it to TaskAttemptListener so that it can start monitoring it. taskAttempt.taskAttemptListener - .registerLaunchedTask(taskAttempt.attemptId); + .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO: diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index 47a0e55f58..8737864e41 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -79,21 +80,21 @@ public void testGetTask() throws IOException { assertNotNull(result); assertTrue(result.shouldDie); - // Verify ask after registration but before launch + // Verify ask after registration but before launch. + // Don't kill, should be null. TaskAttemptId attemptID = mock(TaskAttemptId.class); Task task = mock(Task.class); //Now put a task with the ID listener.registerPendingTask(task, wid); result = listener.getTask(context); - assertNotNull(result); - assertFalse(result.shouldDie); + assertNull(result); // Unregister for more testing. listener.unregister(attemptID, wid); // Verify ask after registration and launch //Now put a task with the ID listener.registerPendingTask(task, wid); - listener.registerLaunchedTask(attemptID); + listener.registerLaunchedTask(attemptID, wid); verify(hbHandler).register(attemptID); result = listener.getTask(context); assertNotNull(result); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index f17bf6f8af..3eb214d79c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -324,7 +324,9 @@ public InetSocketAddress getAddress() { return NetUtils.createSocketAddr("localhost:54321"); } @Override - public void registerLaunchedTask(TaskAttemptId attemptID) {} + public void registerLaunchedTask(TaskAttemptId attemptID, + WrappedJvmID jvmID) { + } @Override public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) { } @@ -463,6 +465,7 @@ protected StateMachine getStateMachine() { return localStateMachine; } + @SuppressWarnings("rawtypes") public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Clock clock, From 90f096d86c93d2e5acaa3d9fc1ce1e0ae9de050a Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 13 Jan 2012 22:21:33 +0000 Subject: [PATCH 03/11] MAPREDUCE-3532. Modified NM to report correct http address when an ephemeral web port is configured. Contributed by Bhallamudi Venkata Siva Kamesh. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1231342 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../server/nodemanager/webapp/WebServer.java | 4 ++ .../nodemanager/webapp/TestNMWebServer.java | 41 +++++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index cc6ca36a8a..5826ca22be 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -487,6 +487,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3656. Fixed a race condition in MR AM which is failing the sort benchmark consistently. (Siddarth Seth via vinodkv) + MAPREDUCE-3532. Modified NM to report correct http address when an ephemeral + web port is configured. (Bhallamudi Venkata Siva Kamesh via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java index 7a799fd25b..31282179f1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.yarn.util.StringHelper.pajoin; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -65,6 +66,9 @@ public synchronized void start() { this.webApp = WebApps.$for("node", Context.class, this.nmContext, "ws") .at(bindAddress).with(getConfig()).start(this.nmWebApp); + int port = this.webApp.httpServer().getPort(); + String webAddress = StringUtils.split(bindAddress, ':')[0] + ":" + port; + getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddress); } catch (Exception e) { String msg = "NMWebapps failed to start."; LOG.error(msg, e); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index ebba63fcc0..5808b865db 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.Writer; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -72,6 +74,45 @@ public void tearDown() { FileUtil.fullyDelete(testRootDir); FileUtil.fullyDelete(testLogDir); } + + private String startNMWebAppServer(String webAddr) { + Context nmContext = new NodeManager.NMContext(); + ResourceView resourceView = new ResourceView() { + @Override + public long getVmemAllocatedForContainers() { + return 0; + } + @Override + public long getPmemAllocatedForContainers() { + return 0; + } + }; + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath()); + NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(); + healthChecker.init(conf); + LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler(); + conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddr); + WebServer server = new WebServer(nmContext, resourceView, + new ApplicationACLsManager(conf), dirsHandler); + server.init(conf); + server.start(); + String webAppAddr = conf.get(YarnConfiguration.NM_WEBAPP_ADDRESS); + return StringUtils.split(webAppAddr, ':')[1]; + } + + @Test + public void testNMWebAppWithOutPort() throws IOException { + String port = startNMWebAppServer("0.0.0.0"); + Assert.assertTrue("Port is not updated", Integer.parseInt(port) > 0); + } + + @Test + public void testNMWebAppWithEphemeralPort() throws IOException { + String port = startNMWebAppServer("0.0.0.0:0"); + Assert.assertTrue("Port is not updated", Integer.parseInt(port) > 0); + } @Test public void testNMWebApp() throws IOException { From b62c1b8563c7b870ace40bed424b4e1f90a058d7 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 13 Jan 2012 23:41:23 +0000 Subject: [PATCH 04/11] MAPREDUCE-3404. Corrected MR AM to honor speculative configuration and enable speculating either maps or reduces. Contributed by Eric Payne. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1231395 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 30 +- .../v2/TestSpeculativeExecution.java | 309 ++++++++++++++++++ 3 files changed, 338 insertions(+), 4 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5826ca22be..c994d91a52 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -490,6 +490,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3532. Modified NM to report correct http address when an ephemeral web port is configured. (Bhallamudi Venkata Siva Kamesh via vinodkv) + MAPREDUCE-3404. Corrected MR AM to honor speculative configuration and enable + speculating either maps or reduces. (Eric Payne via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 8ba241ec02..6097e377d1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -258,7 +258,7 @@ public void init(final Configuration conf) { dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); dispatcher.register(TaskCleaner.EventType.class, taskCleaner); - + if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) { //optional service to speculate on task attempts' progress @@ -881,9 +881,31 @@ public SpeculatorEventDispatcher(Configuration config) { } @Override public void handle(SpeculatorEvent event) { - if (!disabled && - (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) - || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false))) { + if (disabled) { + return; + } + + TaskId tId = event.getTaskID(); + TaskType tType = null; + /* event's TaskId will be null if the event type is JOB_CREATE or + * ATTEMPT_STATUS_UPDATE + */ + if (tId != null) { + tType = tId.getTaskType(); + } + boolean shouldMapSpec = + conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false); + boolean shouldReduceSpec = + conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + + /* The point of the following is to allow the MAP and REDUCE speculative + * config values to be independent: + * IF spec-exec is turned on for maps AND the task is a map task + * OR IF spec-exec is turned on for reduces AND the task is a reduce task + * THEN call the speculator to handle the event. + */ + if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP)) + || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) { // Speculator IS enabled, direct the event to there. speculator.handle(event); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java new file mode 100644 index 0000000000..12bb5ac0e7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java @@ -0,0 +1,309 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator; +import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestSpeculativeExecution { + + /* + * This class is used to control when speculative execution happens. + */ + public static class TestSpecEstimator extends LegacyTaskRuntimeEstimator { + private static final long SPECULATE_THIS = 999999L; + + public TestSpecEstimator() { + super(); + } + + /* + * This will only be called if speculative execution is turned on. + * + * If either mapper or reducer speculation is turned on, this will be + * called. + * + * This will cause speculation to engage for the first mapper or first + * reducer (that is, attempt ID "*_m_000000_0" or "*_r_000000_0") + * + * If this attempt is killed, the retry will have attempt id 1, so it + * will not engage speculation again. + */ + @Override + public long estimatedRuntime(TaskAttemptId id) { + if ((id.getTaskId().getId() == 0) && (id.getId() == 0)) { + return SPECULATE_THIS; + } + return super.estimatedRuntime(id); + } + } + + private static final Log LOG = LogFactory.getLog(TestSpeculativeExecution.class); + + protected static MiniMRYarnCluster mrCluster; + + private static Configuration initialConf = new Configuration(); + private static FileSystem localFs; + static { + try { + localFs = FileSystem.getLocal(initialConf); + } catch (IOException io) { + throw new RuntimeException("problem getting local fs", io); + } + } + + private static Path TEST_ROOT_DIR = + new Path("target",TestSpeculativeExecution.class.getName() + "-tmpDir") + .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); + static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar"); + private static Path TEST_OUT_DIR = new Path(TEST_ROOT_DIR, "test.out.dir"); + + @BeforeClass + public static void setup() throws IOException { + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + if (mrCluster == null) { + mrCluster = new MiniMRYarnCluster(TestSpeculativeExecution.class.getName(), 4); + Configuration conf = new Configuration(); + mrCluster.init(conf); + mrCluster.start(); + } + + // workaround the absent public distcache. + localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR); + localFs.setPermission(APP_JAR, new FsPermission("700")); + } + + @AfterClass + public static void tearDown() { + if (mrCluster != null) { + mrCluster.stop(); + mrCluster = null; + } + } + + public static class SpeculativeMapper extends + Mapper { + + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + // Make one mapper slower for speculative execution + TaskAttemptID taid = context.getTaskAttemptID(); + long sleepTime = 100; + Configuration conf = context.getConfiguration(); + boolean test_speculate_map = + conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false); + + // IF TESTING MAPPER SPECULATIVE EXECUTION: + // Make the "*_m_000000_0" attempt take much longer than the others. + // When speculative execution is enabled, this should cause the attempt + // to be killed and restarted. At that point, the attempt ID will be + // "*_m_000000_1", so sleepTime will still remain 100ms. + if ( (taid.getTaskType() == TaskType.MAP) && test_speculate_map + && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) { + sleepTime = 10000; + } + try{ + Thread.sleep(sleepTime); + } catch(InterruptedException ie) { + // Ignore + } + context.write(value, new IntWritable(1)); + } + } + + public static class SpeculativeReducer extends + Reducer { + + public void reduce(Text key, Iterable values, + Context context) throws IOException, InterruptedException { + // Make one reducer slower for speculative execution + TaskAttemptID taid = context.getTaskAttemptID(); + long sleepTime = 100; + Configuration conf = context.getConfiguration(); + boolean test_speculate_reduce = + conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + + // IF TESTING REDUCE SPECULATIVE EXECUTION: + // Make the "*_r_000000_0" attempt take much longer than the others. + // When speculative execution is enabled, this should cause the attempt + // to be killed and restarted. At that point, the attempt ID will be + // "*_r_000000_1", so sleepTime will still remain 100ms. + if ( (taid.getTaskType() == TaskType.REDUCE) && test_speculate_reduce + && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) { + sleepTime = 10000; + } + try{ + Thread.sleep(sleepTime); + } catch(InterruptedException ie) { + // Ignore + } + context.write(key,new IntWritable(0)); + } + } + + @Test + public void testSpeculativeExecution() throws Exception { + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + /*------------------------------------------------------------------ + * Test that Map/Red does not speculate if MAP_SPECULATIVE and + * REDUCE_SPECULATIVE are both false. + * ----------------------------------------------------------------- + */ + Job job = runSpecTest(false, false); + + boolean succeeded = job.waitForCompletion(true); + Assert.assertTrue(succeeded); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + Counters counters = job.getCounters(); + Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) + .getValue()); + Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES) + .getValue()); + Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS) + .getValue()); + + /*---------------------------------------------------------------------- + * Test that Mapper speculates if MAP_SPECULATIVE is true and + * REDUCE_SPECULATIVE is false. + * --------------------------------------------------------------------- + */ + job = runSpecTest(true, false); + + succeeded = job.waitForCompletion(true); + Assert.assertTrue(succeeded); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + counters = job.getCounters(); + + // The long-running map will be killed and a new one started. + Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) + .getValue()); + Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES) + .getValue()); + Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_FAILED_MAPS) + .getValue()); + + /*---------------------------------------------------------------------- + * Test that Reducer speculates if REDUCE_SPECULATIVE is true and + * MAP_SPECULATIVE is false. + * --------------------------------------------------------------------- + */ + job = runSpecTest(false, true); + + succeeded = job.waitForCompletion(true); + Assert.assertTrue(succeeded); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + counters = job.getCounters(); + + // The long-running map will be killed and a new one started. + Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS) + .getValue()); + Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES) + .getValue()); + } + + private Path createTempFile(String filename, String contents) + throws IOException { + Path path = new Path(TEST_ROOT_DIR, filename); + FSDataOutputStream os = localFs.create(path); + os.writeBytes(contents); + os.close(); + localFs.setPermission(path, new FsPermission("700")); + return path; + } + + private Job runSpecTest(boolean mapspec, boolean redspec) + throws IOException, ClassNotFoundException, InterruptedException { + + Path first = createTempFile("specexec_map_input1", "a\nz"); + Path secnd = createTempFile("specexec_map_input2", "a\nz"); + + Configuration conf = mrCluster.getConfig(); + conf.setBoolean(MRJobConfig.MAP_SPECULATIVE,mapspec); + conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE,redspec); + conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR, + TestSpecEstimator.class, + TaskRuntimeEstimator.class); + + Job job = Job.getInstance(conf); + job.setJarByClass(TestSpeculativeExecution.class); + job.setMapperClass(SpeculativeMapper.class); + job.setReducerClass(SpeculativeReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setNumReduceTasks(2); + FileInputFormat.setInputPaths(job, first); + FileInputFormat.addInputPath(job, secnd); + FileOutputFormat.setOutputPath(job, TEST_OUT_DIR); + + // Delete output directory if it exists. + try { + localFs.delete(TEST_OUT_DIR,true); + } catch (IOException e) { + // ignore + } + + // Creates the Job Configuration + job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. + job.createSymlink(); + job.setMaxMapAttempts(2); + + job.submit(); + + return job; + } +} From 6b8bcfa9163568bbd84a12ad9d449cbea74d93cf Mon Sep 17 00:00:00 2001 From: Eli Collins Date: Sat, 14 Jan 2012 19:45:12 +0000 Subject: [PATCH 05/11] HDFS-2788. HdfsServerConstants#DN_KEEPALIVE_TIMEOUT is dead code. Contributed by Eli Collins git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1231569 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 ++ .../apache/hadoop/hdfs/server/common/HdfsServerConstants.java | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2b33c120a9..8cb8bd4659 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -264,6 +264,8 @@ Release 0.23.1 - UNRELEASED HDFS-69. Improve the 'dfsadmin' commandline help. (harsh) + HDFS-2788. HdfsServerConstants#DN_KEEPALIVE_TIMEOUT is dead code (eli) + OPTIMIZATIONS HDFS-2130. Switch default checksum to CRC32C. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index 256e5d663e..a457e5e880 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -86,7 +86,6 @@ public String getClusterId() { public static int READ_TIMEOUT_EXTENSION = 5 * 1000; public static int WRITE_TIMEOUT = 8 * 60 * 1000; public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline - public static int DN_KEEPALIVE_TIMEOUT = 5 * 1000; /** * Defines the NameNode role. From f02ea82bd14798849d5f0d2649cd261ce1394148 Mon Sep 17 00:00:00 2001 From: Eli Collins Date: Sat, 14 Jan 2012 19:50:14 +0000 Subject: [PATCH 06/11] HDFS-2790. FSNamesystem.setTimes throws exception with wrong configuration name in the message. Contributed by Arpit Gupta git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1231572 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8cb8bd4659..82bc6877d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -329,6 +329,9 @@ Release 0.23.1 - UNRELEASED HDFS-2707. HttpFS should read the hadoop-auth secret from a file instead inline from the configuration. (tucu) + HDFS-2790. FSNamesystem.setTimes throws exception with wrong + configuration name in the message. (Arpit Gupta via eli) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 7fa3fe50bb..1503e0850e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -984,7 +984,7 @@ void setTimes(String src, long mtime, long atime) throws IOException, UnresolvedLinkException { if (!isAccessTimeSupported() && atime != -1) { throw new IOException("Access time for hdfs is not configured. " + - " Please set dfs.support.accessTime configuration parameter."); + " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter."); } writeLock(); try { From 69290d383c9a4e9444a8e80ec7ef655e6270b38e Mon Sep 17 00:00:00 2001 From: Harsh J Date: Sun, 15 Jan 2012 05:18:59 +0000 Subject: [PATCH 07/11] HADOOP-7975. Add LZ4 as an entry in the default codec list, missed by HADOOP-7657 (harsh) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1231627 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-common-project/hadoop-common/CHANGES.txt | 2 ++ .../hadoop-common/src/main/resources/core-default.xml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 3a19047483..6f951e095a 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -203,6 +203,8 @@ Release 0.23.1 - Unreleased HADOOP-7348. Change 'addnl' in getmerge util to be a flag '-nl' instead. (XieXianshan via harsh) + HADOOP-7975. Add LZ4 as an entry in the default codec list, missed by HADOOP-7657 (harsh) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 9763845786..9cf1eaf311 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -161,7 +161,7 @@ io.compression.codecs - org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec + org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec A list of the compression codec classes that can be used for compression/decompression. From aee1bb81c69f11ace3d25a89c66441250b61e79d Mon Sep 17 00:00:00 2001 From: Eli Collins Date: Sun, 15 Jan 2012 08:51:48 +0000 Subject: [PATCH 08/11] HADOOP-7974. TestViewFsTrash incorrectly determines the user's home directory. Contributed by Harsh J git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1231640 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 +++ .../hadoop/fs/viewfs/TestViewFsTrash.java | 19 +++++++++---------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 6f951e095a..3026e078cb 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -271,6 +271,9 @@ Release 0.23.1 - Unreleased HADOOP-7964. Deadlock in NetUtils and SecurityUtil class initialization. (Daryn Sharp via suresh) + HADOOP-7974. TestViewFsTrash incorrectly determines the user's home + directory. (harsh via eli) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java index 9239f2f1f8..7795c3f5f0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java @@ -78,17 +78,16 @@ public void setUp() throws Exception { // set up viewfs's home dir root to point to home dir root on target // But home dir is different on linux, mac etc. // Figure it out by calling home dir on target - - String homeDir = fsTarget.getHomeDirectory().toUri().getPath(); - int indexOf2ndSlash = homeDir.indexOf('/', 1); - String homeDirRoot = homeDir.substring(0, indexOf2ndSlash); - ConfigUtil.addLink(conf, homeDirRoot, - fsTarget.makeQualified(new Path(homeDirRoot)).toUri()); - ConfigUtil.setHomeDirConf(conf, homeDirRoot); - Log.info("Home dir base " + homeDirRoot); - + + String homeDirRoot = fsTarget.getHomeDirectory() + .getParent().toUri().getPath(); + ConfigUtil.addLink(conf, homeDirRoot, + fsTarget.makeQualified(new Path(homeDirRoot)).toUri()); + ConfigUtil.setHomeDirConf(conf, homeDirRoot); + Log.info("Home dir base " + homeDirRoot); + fsView = ViewFileSystemTestSetup.setupForViewFs(conf, fsTarget); - + // set working dir so that relative paths //fsView.setWorkingDirectory(new Path(fsTarget.getWorkingDirectory().toUri().getPath())); conf.set("fs.defaultFS", FsConstants.VIEWFS_URI.toString()); From a9002bfea1e02c4e565cc708dbf55607f5a6d458 Mon Sep 17 00:00:00 2001 From: Harsh J Date: Sun, 15 Jan 2012 19:15:04 +0000 Subject: [PATCH 09/11] HADOOP-7968. Errant println left in RPC.getHighestSupportedProtocol (Sho Shimauchi via harsh) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1231732 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-common-project/hadoop-common/CHANGES.txt | 2 ++ .../src/main/java/org/apache/hadoop/ipc/RPC.java | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 3026e078cb..a784fcf090 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -83,6 +83,8 @@ Trunk (unreleased changes) HADOOP-4515. Configuration#getBoolean must not be case sensitive. (Sho Shimauchi via harsh) + HADOOP-7968. Errant println left in RPC.getHighestSupportedProtocol (Sho Shimauchi via harsh) + BUGS HADOOP-7851. Configuration.getClasses() never returns the default value. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index cf59746868..321c1d8ef3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -791,7 +791,10 @@ VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind, String protocolName) { Long highestVersion = 0L; ProtoClassProtoImpl highest = null; - System.out.println("Size of protoMap for " + rpcKind + " =" + getProtocolImplMap(rpcKind).size()); + if (LOG.isDebugEnabled()) { + LOG.debug("Size of protoMap for " + rpcKind + " =" + + getProtocolImplMap(rpcKind).size()); + } for (Map.Entry pv : getProtocolImplMap(rpcKind).entrySet()) { if (pv.getKey().protocol.equals(protocolName)) { From a24339e50df4d05928db96498f1bebad93fd6083 Mon Sep 17 00:00:00 2001 From: Mahadev Konar Date: Mon, 16 Jan 2012 19:35:06 +0000 Subject: [PATCH 10/11] MAPREDUCE-3649. Job End notification gives an error on calling back. (Ravi Prakash via mahadev) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1232126 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../mapreduce/v2/app/JobEndNotifier.java | 32 +++++++++++-------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c994d91a52..bc1337586d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -493,6 +493,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3404. Corrected MR AM to honor speculative configuration and enable speculating either maps or reduces. (Eric Payne via vinodkv) + MAPREDUCE-3649. Job End notification gives an error on calling back. + (Ravi Prakash via mahadev) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java index ae92cc0a3e..9ffe2181c9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java @@ -19,12 +19,11 @@ package org.apache.hadoop.mapreduce.v2.app; import java.io.IOException; -import java.io.InputStream; +import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLConnection; import java.net.Proxy; +import java.net.URL; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -40,7 +39,8 @@ * User can specify number of retry attempts and a time interval at which to * attempt retries
  • * Cluster administrators can set final parameters to set maximum number of - * tries (0 would disable job end notification) and max time interval
  • + * tries (0 would disable job end notification) and max time interval and a + * proxy if needed
  • * The URL may contain sentinels which will be replaced by jobId and jobStatus * (eg. SUCCEEDED/KILLED/FAILED)
  • *

    @@ -59,8 +59,8 @@ public class JobEndNotifier implements Configurable { /** * Parse the URL that needs to be notified of the end of the job, along - * with the number of retries in case of failure and the amount of time to - * wait between retries + * with the number of retries in case of failure, the amount of time to + * wait between retries and proxy settings * @param conf the configuration */ public void setConf(Configuration conf) { @@ -119,15 +119,19 @@ protected boolean notifyURLOnce() { boolean success = false; try { Log.info("Job end notification trying " + urlToNotify); - URLConnection conn = urlToNotify.openConnection(proxyToUse); + HttpURLConnection conn = (HttpURLConnection) urlToNotify.openConnection(); conn.setConnectTimeout(5*1000); conn.setReadTimeout(5*1000); conn.setAllowUserInteraction(false); - InputStream is = conn.getInputStream(); - conn.getContent(); - is.close(); - success = true; - Log.info("Job end notification to " + urlToNotify + " succeeded"); + if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) { + Log.warn("Job end notification to " + urlToNotify +" failed with code: " + + conn.getResponseCode() + " and message \"" + conn.getResponseMessage() + +"\""); + } + else { + success = true; + Log.info("Job end notification to " + urlToNotify + " succeeded"); + } } catch(IOException ioe) { Log.warn("Job end notification to " + urlToNotify + " failed", ioe); } @@ -135,8 +139,8 @@ protected boolean notifyURLOnce() { } /** - * Notify a server of the completion of a submitted job. The server must have - * configured MRConfig.JOB_END_NOTIFICATION_URLS + * Notify a server of the completion of a submitted job. The user must have + * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL * @param jobReport JobReport used to read JobId and JobStatus * @throws InterruptedException */ From c81995e37029b2b822f99f0ff76d606a81acd2c9 Mon Sep 17 00:00:00 2001 From: Mahadev Konar Date: Mon, 16 Jan 2012 21:22:21 +0000 Subject: [PATCH 11/11] MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe via mahadev) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1232167 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml | 1 + .../hadoop-yarn-server-resourcemanager/pom.xml | 1 + 3 files changed, 5 insertions(+) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index bc1337586d..f18bbb201f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -496,6 +496,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3649. Job End notification gives an error on calling back. (Ravi Prakash via mahadev) + MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe + via mahadev) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index 859a1464c7..73c7031808 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -109,6 +109,7 @@ org.apache.hadoop.yarn.util.VisualizeStateMachine + compile NodeManager org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl, diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 106c0a3247..b65a524696 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -137,6 +137,7 @@ org.apache.hadoop.yarn.util.VisualizeStateMachine + compile ResourceManager org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl,