From fc2b69eba1c5df59f6175205c27dc7b584df50c0 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Tue, 1 Nov 2016 20:47:25 -0700 Subject: [PATCH] MAPREDUCE-6765. MR should not schedule container requests in cases where reducer or mapper containers demand resource larger than the maximum supported (haibochen via rkanter) --- .../v2/app/rm/RMContainerAllocator.java | 171 +++++++++++------- .../v2/app/rm/TestRMContainerAllocator.java | 96 +++++++++- 2 files changed, 195 insertions(+), 72 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index e3b673a3d9..db8f3376ca 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -151,10 +151,10 @@ added to the pending and are ramped up (added to scheduled) based //holds information about the assigned containers to task attempts private final AssignedRequests assignedRequests; - + //holds scheduled requests to be fulfilled by RM private final ScheduledRequests scheduledRequests = new ScheduledRequests(); - + private int containersAllocated = 0; private int containersReleased = 0; private int hostLocalAssigned = 0; @@ -370,76 +370,16 @@ public void handle(ContainerAllocatorEvent event) { } } - @SuppressWarnings({ "unchecked" }) protected synchronized void handleEvent(ContainerAllocatorEvent event) { recalculateReduceSchedule = true; if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { ContainerRequestEvent reqEvent = (ContainerRequestEvent) event; - JobId jobId = getJob().getID(); - Resource supportedMaxContainerCapability = getMaxContainerCapability(); - if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { - if (mapResourceRequest.equals(Resources.none())) { - mapResourceRequest = reqEvent.getCapability(); - eventHandler.handle(new JobHistoryEvent(jobId, - new NormalizedResourceEvent( - org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest - .getMemorySize()))); - LOG.info("mapResourceRequest:" + mapResourceRequest); - if (mapResourceRequest.getMemorySize() > supportedMaxContainerCapability - .getMemorySize() - || mapResourceRequest.getVirtualCores() > supportedMaxContainerCapability - .getVirtualCores()) { - String diagMsg = - "MAP capability required is more than the supported " - + "max container capability in the cluster. Killing the Job. mapResourceRequest: " - + mapResourceRequest + " maxContainerCapability:" - + supportedMaxContainerCapability; - LOG.info(diagMsg); - eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); - eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); - } - } - // set the resources - reqEvent.getCapability().setMemorySize(mapResourceRequest.getMemorySize()); - reqEvent.getCapability().setVirtualCores( - mapResourceRequest.getVirtualCores()); - scheduledRequests.addMap(reqEvent);//maps are immediately scheduled + boolean isMap = reqEvent.getAttemptID().getTaskId().getTaskType(). + equals(TaskType.MAP); + if (isMap) { + handleMapContainerRequest(reqEvent); } else { - if (reduceResourceRequest.equals(Resources.none())) { - reduceResourceRequest = reqEvent.getCapability(); - eventHandler.handle(new JobHistoryEvent(jobId, - new NormalizedResourceEvent( - org.apache.hadoop.mapreduce.TaskType.REDUCE, - reduceResourceRequest.getMemorySize()))); - LOG.info("reduceResourceRequest:" + reduceResourceRequest); - if (reduceResourceRequest.getMemorySize() > supportedMaxContainerCapability - .getMemorySize() - || reduceResourceRequest.getVirtualCores() > supportedMaxContainerCapability - .getVirtualCores()) { - String diagMsg = - "REDUCE capability required is more than the " - + "supported max container capability in the cluster. Killing the " - + "Job. reduceResourceRequest: " + reduceResourceRequest - + " maxContainerCapability:" - + supportedMaxContainerCapability; - LOG.info(diagMsg); - eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); - eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); - } - } - // set the resources - reqEvent.getCapability().setMemorySize(reduceResourceRequest.getMemorySize()); - reqEvent.getCapability().setVirtualCores( - reduceResourceRequest.getVirtualCores()); - if (reqEvent.getEarlierAttemptFailed()) { - //add to the front of queue for fail fast - pendingReduces.addFirst(new ContainerRequest(reqEvent, - PRIORITY_REDUCE, reduceNodeLabelExpression)); - } else { - pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE, - reduceNodeLabelExpression)); - //reduces are added to pending and are slowly ramped up - } + handleReduceContainerRequest(reqEvent); } } else if ( @@ -476,6 +416,103 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) { } } + @SuppressWarnings({ "unchecked" }) + private void handleReduceContainerRequest(ContainerRequestEvent reqEvent) { + assert(reqEvent.getAttemptID().getTaskId().getTaskType().equals( + TaskType.REDUCE)); + + Resource supportedMaxContainerCapability = getMaxContainerCapability(); + JobId jobId = getJob().getID(); + + if (reduceResourceRequest.equals(Resources.none())) { + reduceResourceRequest = reqEvent.getCapability(); + eventHandler.handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.REDUCE, + reduceResourceRequest.getMemorySize()))); + LOG.info("reduceResourceRequest:" + reduceResourceRequest); + } + + boolean reduceContainerRequestAccepted = true; + if (reduceResourceRequest.getMemorySize() > + supportedMaxContainerCapability.getMemorySize() + || + reduceResourceRequest.getVirtualCores() > + supportedMaxContainerCapability.getVirtualCores()) { + reduceContainerRequestAccepted = false; + } + + if (reduceContainerRequestAccepted) { + // set the resources + reqEvent.getCapability().setVirtualCores( + reduceResourceRequest.getVirtualCores()); + reqEvent.getCapability().setMemorySize( + reduceResourceRequest.getMemorySize()); + + if (reqEvent.getEarlierAttemptFailed()) { + //previously failed reducers are added to the front for fail fast + pendingReduces.addFirst(new ContainerRequest(reqEvent, + PRIORITY_REDUCE, reduceNodeLabelExpression)); + } else { + //reduces are added to pending queue and are slowly ramped up + pendingReduces.add(new ContainerRequest(reqEvent, + PRIORITY_REDUCE, reduceNodeLabelExpression)); + } + } else { + String diagMsg = "REDUCE capability required is more than the " + + "supported max container capability in the cluster. Killing" + + " the Job. reduceResourceRequest: " + reduceResourceRequest + + " maxContainerCapability:" + supportedMaxContainerCapability; + LOG.info(diagMsg); + eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); + eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + } + } + + @SuppressWarnings({ "unchecked" }) + private void handleMapContainerRequest(ContainerRequestEvent reqEvent) { + assert(reqEvent.getAttemptID().getTaskId().getTaskType().equals( + TaskType.MAP)); + + Resource supportedMaxContainerCapability = getMaxContainerCapability(); + JobId jobId = getJob().getID(); + + if (mapResourceRequest.equals(Resources.none())) { + mapResourceRequest = reqEvent.getCapability(); + eventHandler.handle(new JobHistoryEvent(jobId, + new NormalizedResourceEvent( + org.apache.hadoop.mapreduce.TaskType.MAP, + mapResourceRequest.getMemorySize()))); + LOG.info("mapResourceRequest:" + mapResourceRequest); + } + + boolean mapContainerRequestAccepted = true; + if (mapResourceRequest.getMemorySize() > + supportedMaxContainerCapability.getMemorySize() + || + mapResourceRequest.getVirtualCores() > + supportedMaxContainerCapability.getVirtualCores()) { + mapContainerRequestAccepted = false; + } + + if(mapContainerRequestAccepted) { + // set the resources + reqEvent.getCapability().setMemorySize( + mapResourceRequest.getMemorySize()); + reqEvent.getCapability().setVirtualCores( + mapResourceRequest.getVirtualCores()); + scheduledRequests.addMap(reqEvent); //maps are immediately scheduled + } else { + String diagMsg = "The required MAP capability is more than the " + + "supported max container capability in the cluster. Killing" + + " the Job. mapResourceRequest: " + mapResourceRequest + + " maxContainerCapability:" + supportedMaxContainerCapability; + LOG.info(diagMsg); + eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg)); + eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + } + } + private static String getHost(String contMgrAddress) { String host = contMgrAddress; String[] hostport = host.split(":"); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index bcce7937eb..f9ee9cc606 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -1792,12 +1792,18 @@ public synchronized Allocation allocate( private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts) { - return createReq(jobId, taskAttemptId, memory, hosts, false, false); + return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false); } - private ContainerRequestEvent - createReq(JobId jobId, int taskAttemptId, int memory, String[] hosts, - boolean earlierFailedAttempt, boolean reduce) { + private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, + int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) { + return createReq(jobId, taskAttemptId, mem, + 1, hosts, earlierFailedAttempt, reduce); + } + + private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, + int memory, int vcore, String[] hosts, boolean earlierFailedAttempt, + boolean reduce) { TaskId taskId; if (reduce) { taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); @@ -1806,7 +1812,7 @@ private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, } TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); - Resource containerNeed = Resource.newInstance(memory, 1); + Resource containerNeed = Resource.newInstance(memory, vcore); if (earlierFailedAttempt) { return ContainerRequestEvent .createContainerRequestEventForFailedContainer(attemptId, @@ -2607,6 +2613,86 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() } + @Test + public void testUnsupportedMapContainerRequirement() throws Exception { + final Resource maxContainerSupported = Resource.newInstance(1, 1); + + final ApplicationId appId = ApplicationId.newInstance(1, 1); + final ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + final JobId jobId = + MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + + final MockScheduler mockScheduler = new MockScheduler(appAttemptId); + final Configuration conf = new Configuration(); + + final MyContainerAllocator allocator = new MyContainerAllocator(null, + conf, appAttemptId, mock(Job.class), SystemClock.getInstance()) { + @Override + protected void register() { + } + @Override + protected ApplicationMasterProtocol createSchedulerProxy() { + return mockScheduler; + } + @Override + protected Resource getMaxContainerCapability() { + return maxContainerSupported; + } + }; + + ContainerRequestEvent mapRequestEvt = createReq(jobId, 0, + (int) (maxContainerSupported.getMemorySize() + 10), + maxContainerSupported.getVirtualCores(), + new String[0], false, false); + allocator.sendRequests(Arrays.asList(mapRequestEvt)); + allocator.schedule(); + + Assert.assertEquals(0, mockScheduler.lastAnyAskMap); + } + + @Test + public void testUnsupportedReduceContainerRequirement() throws Exception { + final Resource maxContainerSupported = Resource.newInstance(1, 1); + + final ApplicationId appId = ApplicationId.newInstance(1, 1); + final ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 1); + final JobId jobId = + MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + + final MockScheduler mockScheduler = new MockScheduler(appAttemptId); + final Configuration conf = new Configuration(); + + final MyContainerAllocator allocator = new MyContainerAllocator(null, + conf, appAttemptId, mock(Job.class), SystemClock.getInstance()) { + @Override + protected void register() { + } + @Override + protected ApplicationMasterProtocol createSchedulerProxy() { + return mockScheduler; + } + @Override + protected Resource getMaxContainerCapability() { + return maxContainerSupported; + } + }; + + ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0, + (int) (maxContainerSupported.getMemorySize() + 10), + maxContainerSupported.getVirtualCores(), + new String[0], false, true); + allocator.sendRequests(Arrays.asList(reduceRequestEvt)); + // Reducer container requests are added to the pending queue upon request, + // schedule all reducers here so that we can observe if reducer requests + // are accepted by RMContainerAllocator on RM side. + allocator.scheduleAllReduces(); + allocator.schedule(); + + Assert.assertEquals(0, mockScheduler.lastAnyAskReduce); + } + @Test public void testRMUnavailable() throws Exception {