diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index af76c67ea8..24cd4fbeee 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -272,6 +272,9 @@ Release 2.1.2 - UNRELEASED tests jar is breaking tests for downstream components (Robert Kanter via Sandy Ryza) + MAPREDUCE-5489. MR jobs hangs as it does not use the node-blacklisting + feature in RM requests (Zhijie Shen via bikas) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 67dd30e164..a9b5ce5847 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -86,6 +87,10 @@ public abstract class RMContainerRequestor extends RMCommunicator { private final Map nodeFailures = new HashMap(); private final Set blacklistedNodes = Collections .newSetFromMap(new ConcurrentHashMap()); + private final Set blacklistAdditions = Collections + .newSetFromMap(new ConcurrentHashMap()); + private final Set blacklistRemovals = Collections + .newSetFromMap(new ConcurrentHashMap()); public RMContainerRequestor(ClientService clientService, AppContext context) { super(clientService, context); @@ -145,10 +150,13 @@ protected void serviceInit(Configuration conf) throws Exception { } protected AllocateResponse makeRemoteRequest() throws IOException { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance(new ArrayList(blacklistAdditions), + new ArrayList(blacklistRemovals)); AllocateRequest allocateRequest = AllocateRequest.newInstance(lastResponseID, super.getApplicationProgress(), new ArrayList(ask), - new ArrayList(release), null); + new ArrayList(release), blacklistRequest); AllocateResponse allocateResponse; try { allocateResponse = scheduler.allocate(allocateRequest); @@ -172,6 +180,14 @@ protected AllocateResponse makeRemoteRequest() throws IOException { ask.clear(); release.clear(); + + if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) { + LOG.info("Update the blacklist for " + applicationId + + ": blacklistAdditions=" + blacklistAdditions.size() + + " blacklistRemovals=" + blacklistRemovals.size()); + } + blacklistAdditions.clear(); + blacklistRemovals.clear(); return allocateResponse; } @@ -195,11 +211,17 @@ protected void computeIgnoreBlacklisting() { if (ignoreBlacklisting.compareAndSet(false, true)) { LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%"); + // notify RM to ignore all the blacklisted nodes + blacklistAdditions.clear(); + blacklistRemovals.addAll(blacklistedNodes); } } else { if (ignoreBlacklisting.compareAndSet(true, false)) { LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount + ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%"); + // notify RM of all the blacklisted nodes + blacklistAdditions.addAll(blacklistedNodes); + blacklistRemovals.clear(); } } } @@ -221,6 +243,9 @@ protected void containerFailedOnHost(String hostName) { LOG.info(failures + " failures on node " + hostName); if (failures >= maxTaskFailuresPerNode) { blacklistedNodes.add(hostName); + if (!ignoreBlacklisting.get()) { + blacklistAdditions.add(hostName); + } //Even if blacklisting is ignored, continue to remove the host from // the request table. The RM may have additional nodes it can allocate on. LOG.info("Blacklisted host " + hostName); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 8e0f7f8960..ee0544a053 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -880,8 +880,10 @@ public void testBlackListedNodes() throws Exception { dispatcher.await(); assigned = allocator.schedule(); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); dispatcher.await(); - Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + assertBlacklistAdditionsAndRemovals(2, 0, rm); // mark h1/h2 as bad nodes nodeManager1.nodeHeartbeat(false); @@ -890,12 +892,14 @@ public void testBlackListedNodes() throws Exception { assigned = allocator.schedule(); dispatcher.await(); + assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); nodeManager3.nodeHeartbeat(true); // Node heartbeat dispatcher.await(); - assigned = allocator.schedule(); + assigned = allocator.schedule(); dispatcher.await(); + assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertTrue("No of assignments must be 3", assigned.size() == 3); @@ -948,7 +952,7 @@ public void testIgnoreBlacklisting() throws Exception { // Known=1, blacklisted=0, ignore should be false - assign first container assigned = getContainerOnHost(jobId, 1, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator); + nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); LOG.info("Failing container _1 on H1 (Node should be blacklisted and" @@ -958,44 +962,52 @@ public void testIgnoreBlacklisting() throws Exception { allocator.sendFailure(f1); // Test single node. + // Known=1, blacklisted=1, ignore should be true - assign 0 + // Because makeRemoteRequest will not be aware of it until next call + // The current call will send blacklisted node "h1" to RM + assigned = + getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, + nodeManagers[0], dispatcher, allocator, 1, 0, 0, 1, rm); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + // Known=1, blacklisted=1, ignore should be true - assign 1 assigned = getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator); + nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); // Known=2, blacklisted=1, ignore should be true - assign 1 anyway. assigned = getContainerOnHost(jobId, 3, 1024, new String[] { "h2" }, - nodeManagers[1], dispatcher, allocator); + nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. assigned = getContainerOnHost(jobId, 4, 1024, new String[] { "h3" }, - nodeManagers[2], dispatcher, allocator); + nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Known=3, blacklisted=1, ignore should be true - assign 1 assigned = getContainerOnHost(jobId, 5, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator); + nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); // Known=4, blacklisted=1, ignore should be false - assign 1 anyway assigned = getContainerOnHost(jobId, 6, 1024, new String[] { "h4" }, - nodeManagers[3], dispatcher, allocator); + nodeManagers[3], dispatcher, allocator, 0, 0, 1, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Test blacklisting re-enabled. // Known=4, blacklisted=1, ignore should be false - no assignment on h1 assigned = getContainerOnHost(jobId, 7, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator); + nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // RMContainerRequestor would have created a replacement request. @@ -1004,17 +1016,24 @@ public void testIgnoreBlacklisting() throws Exception { allocator.sendFailure(f2); // Test ignore blacklisting re-enabled + // Known=4, blacklisted=2, ignore should be true. Should assign 0 + // container for the same reason above. + assigned = + getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, + nodeManagers[0], dispatcher, allocator, 1, 0, 0, 2, rm); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + // Known=4, blacklisted=2, ignore should be true. Should assign 2 // containers. assigned = getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator); + nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); // Known=4, blacklisted=2, ignore should be true. assigned = getContainerOnHost(jobId, 9, 1024, new String[] { "h2" }, - nodeManagers[1], dispatcher, allocator); + nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Test blacklist while ignore blacklisting enabled @@ -1025,7 +1044,7 @@ public void testIgnoreBlacklisting() throws Exception { // Known=5, blacklisted=3, ignore should be true. assigned = getContainerOnHost(jobId, 10, 1024, new String[] { "h3" }, - nodeManagers[2], dispatcher, allocator); + nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Assign on 5 more nodes - to re-enable blacklisting @@ -1034,14 +1053,14 @@ public void testIgnoreBlacklisting() throws Exception { assigned = getContainerOnHost(jobId, 11 + i, 1024, new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i], - dispatcher, allocator); + dispatcher, allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); } // Test h3 (blacklisted while ignoring blacklisting) is blacklisted. assigned = getContainerOnHost(jobId, 20, 1024, new String[] { "h3" }, - nodeManagers[2], dispatcher, allocator); + nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); } @@ -1055,7 +1074,9 @@ private MockNM registerNodeManager(int i, MyResourceManager rm, private List getContainerOnHost(JobId jobId, int taskAttemptId, int memory, String[] hosts, MockNM mockNM, - DrainDispatcher dispatcher, MyContainerAllocator allocator) + DrainDispatcher dispatcher, MyContainerAllocator allocator, + int expectedAdditions1, int expectedRemovals1, + int expectedAdditions2, int expectedRemovals2, MyResourceManager rm) throws Exception { ContainerRequestEvent reqEvent = createReq(jobId, taskAttemptId, memory, hosts); @@ -1064,6 +1085,8 @@ List getContainerOnHost(JobId jobId, // Send the request to the RM List assigned = allocator.schedule(); dispatcher.await(); + assertBlacklistAdditionsAndRemovals( + expectedAdditions1, expectedRemovals1, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Heartbeat from the required nodeManager @@ -1072,6 +1095,8 @@ List getContainerOnHost(JobId jobId, assigned = allocator.schedule(); dispatcher.await(); + assertBlacklistAdditionsAndRemovals( + expectedAdditions2, expectedRemovals2, rm); return assigned; } @@ -1137,6 +1162,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); dispatcher.await(); + assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); LOG.info("Failing container _1 on H1 (should blacklist the node)"); @@ -1153,6 +1179,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { //Update the Scheduler with the new requests. assigned = allocator.schedule(); dispatcher.await(); + assertBlacklistAdditionsAndRemovals(1, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // send another request with different resource and priority @@ -1171,6 +1198,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); dispatcher.await(); + assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); //RMContainerAllocator gets assigned a p:5 on a blacklisted node. @@ -1179,6 +1207,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { LOG.info("RM Heartbeat (To process the re-scheduled containers)"); assigned = allocator.schedule(); dispatcher.await(); + assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); //Hearbeat from H3 to schedule on this host. @@ -1188,6 +1217,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)"); assigned = allocator.schedule(); + assertBlacklistAdditionsAndRemovals(0, 0, rm); dispatcher.await(); // For debugging @@ -1205,7 +1235,15 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { + " host not correct", "h3", assig.getContainer().getNodeId().getHost()); } } - + + private static void assertBlacklistAdditionsAndRemovals( + int expectedAdditions, int expectedRemovals, MyResourceManager rm) { + Assert.assertEquals(expectedAdditions, + rm.getMyFifoScheduler().lastBlacklistAdditions.size()); + Assert.assertEquals(expectedRemovals, + rm.getMyFifoScheduler().lastBlacklistRemovals.size()); + } + private static class MyFifoScheduler extends FifoScheduler { public MyFifoScheduler(RMContext rmContext) { @@ -1220,6 +1258,8 @@ public MyFifoScheduler(RMContext rmContext) { } List lastAsk = null; + List lastBlacklistAdditions; + List lastBlacklistRemovals; // override this to copy the objects otherwise FifoScheduler updates the // numContainers in same objects as kept by RMContainerAllocator @@ -1236,6 +1276,8 @@ public synchronized Allocation allocate( askCopy.add(reqCopy); } lastAsk = ask; + lastBlacklistAdditions = blacklistAdditions; + lastBlacklistRemovals = blacklistRemovals; return super.allocate( applicationAttemptId, askCopy, release, blacklistAdditions, blacklistRemovals);