diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index cde6d929c6..6cf7abbbff 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -570,6 +570,9 @@ Release 2.8.0 - UNRELEASED position/key information for uncompressed input sometimes. (Zhihai Xu via jlowe) + MAPREDUCE-5002. AM could potentially allocate a reduce container to a map + attempt (Chang Li via jlowe) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index ac4c586bcc..78b0dc434f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -1004,6 +1004,7 @@ private void assign(List allocatedContainers) { Iterator it = allocatedContainers.iterator(); LOG.info("Got allocated containers " + allocatedContainers.size()); containersAllocated += allocatedContainers.size(); + int reducePending = reduces.size(); while (it.hasNext()) { Container allocated = it.next(); if (LOG.isDebugEnabled()) { @@ -1034,13 +1035,14 @@ mapResourceRequest, getSchedulerResourceTypes()) <= 0 else if (PRIORITY_REDUCE.equals(priority)) { if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource, reduceResourceRequest, getSchedulerResourceTypes()) <= 0 - || reduces.isEmpty()) { - LOG.info("Cannot assign container " + allocated + || (reducePending <= 0)) { + LOG.info("Cannot assign container " + allocated + " for a reduce as either " + " container memory less than required " + reduceResourceRequest - + " or no pending reduce tasks - reduces.isEmpty=" - + reduces.isEmpty()); + + " or no pending reduce tasks."); isAssignable = false; + } else { + reducePending--; } } else { LOG.warn("Container allocated at unwanted priority: " + priority + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index e148c32f1b..c98ccd3580 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -127,6 +127,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; @@ -562,6 +563,52 @@ public void testNonAggressivelyPreemptReducers() throws Exception { assignedRequests.preemptionWaitingReduces.size()); } + @Test(timeout = 30000) + public void testExcessReduceContainerAssign() throws Exception { + final Configuration conf = new Configuration(); + conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); + final MyResourceManager2 rm = new MyResourceManager2(conf); + rm.start(); + final DrainDispatcher dispatcher = (DrainDispatcher)rm.getRMContext() + .getDispatcher(); + final RMApp app = rm.submitApp(2048); + dispatcher.await(); + final String host = "host1"; + final MockNM nm = rm.registerNode(String.format("%s:1234", host), 4096); + nm.nodeHeartbeat(true); + dispatcher.await(); + final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + final JobId jobId = MRBuilderUtils + .newJobId(appAttemptId.getApplicationId(), 0); + final Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob); + + // request to allocate two reduce priority containers + final String[] locations = new String[] { host }; + allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); + allocator.scheduleAllReduces(); + allocator.makeRemoteRequest(); + nm.nodeHeartbeat(true); + dispatcher.await(); + allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false)); + + int assignedContainer; + for (assignedContainer = 0; assignedContainer < 1;) { + assignedContainer += allocator.schedule().size(); + nm.nodeHeartbeat(true); + dispatcher.await(); + } + // only 1 allocated container should be assigned + Assert.assertEquals(assignedContainer, 1); + } + @Test public void testMapReduceAllocationWithNodeLabelExpression() throws Exception { @@ -770,6 +817,17 @@ MyFifoScheduler getMyFifoScheduler() { } } + private static class MyResourceManager2 extends MyResourceManager { + public MyResourceManager2(Configuration conf) { + super(conf); + } + + @Override + protected ResourceScheduler createScheduler() { + return new ExcessReduceContainerAllocateScheduler(this.getRMContext()); + } + } + @Test public void testReportedAppProgress() throws Exception { @@ -1595,6 +1653,58 @@ public synchronized Allocation allocate( } } + private static class ExcessReduceContainerAllocateScheduler extends FifoScheduler { + + public ExcessReduceContainerAllocateScheduler(RMContext rmContext) { + super(); + try { + Configuration conf = new Configuration(); + reinitialize(conf, rmContext); + } catch (IOException ie) { + LOG.info("add application failed with ", ie); + assert (false); + } + } + + @Override + public synchronized Allocation allocate( + ApplicationAttemptId applicationAttemptId, List ask, + List release, + List blacklistAdditions, List blacklistRemovals) { + List askCopy = new ArrayList(); + for (ResourceRequest req : ask) { + ResourceRequest reqCopy = ResourceRequest.newInstance(req + .getPriority(), req.getResourceName(), req.getCapability(), req + .getNumContainers(), req.getRelaxLocality()); + askCopy.add(reqCopy); + } + SecurityUtil.setTokenServiceUseIp(false); + Allocation normalAlloc = super.allocate( + applicationAttemptId, askCopy, release, + blacklistAdditions, blacklistRemovals); + List containers = normalAlloc.getContainers(); + if(containers.size() > 0) { + // allocate excess container + FiCaSchedulerApp application = super.getApplicationAttempt(applicationAttemptId); + ContainerId containerId = BuilderUtils.newContainerId(application + .getApplicationAttemptId(), application.getNewContainerId()); + Container excessC = mock(Container.class); + when(excessC.getId()).thenReturn(containerId); + when(excessC.getPriority()).thenReturn(RMContainerAllocator.PRIORITY_REDUCE); + Resource mockR = mock(Resource.class); + when(mockR.getMemory()).thenReturn(2048); + when(excessC.getResource()).thenReturn(mockR); + NodeId nId = mock(NodeId.class); + when(nId.getHost()).thenReturn("local"); + when(excessC.getNodeId()).thenReturn(nId); + containers.add(excessC); + } + Allocation excessAlloc = mock(Allocation.class); + when(excessAlloc.getContainers()).thenReturn(containers); + return excessAlloc; + } + } + private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts) { return createReq(jobId, taskAttemptId, memory, hosts, false, false);