diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java new file mode 100644 index 0000000000..39a9ddcf8e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestCreator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.rm; + +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.yarn.api.records.Resource; + +final class ContainerRequestCreator { + + private ContainerRequestCreator() {} + + static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId, + Resource resource, String[] hosts) { + return createRequest(jobId, taskAttemptId, resource, hosts, + false, false); + } + + static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId, + Resource resource, String[] hosts, boolean earlierFailedAttempt, + boolean reduce) { + final TaskId taskId; + if (reduce) { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); + } else { + taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + } + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, + taskAttemptId); + + if (earlierFailedAttempt) { + return ContainerRequestEvent + .createContainerRequestEventForFailedContainer(attemptId, + resource); + } + return new ContainerRequestEvent(attemptId, resource, hosts, + new String[]{NetworkTopology.DEFAULT_RACK}); + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 7875917b68..427e6ea228 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm; +import static org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestCreator.createRequest; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyFloat; @@ -96,7 +97,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.CollectorInfo; @@ -203,7 +203,7 @@ public void testSimple() throws Exception { JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( - MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); @@ -215,13 +215,13 @@ public void testSimple() throws Exception { rm.drainEvents(); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(jobId, + 1, Resource.newInstance(1024, 1), new String[] {"h1"}); allocator.sendRequest(event1); // send 1 more request with different resource req - ContainerRequestEvent event2 = createReq(jobId, 2, 1024, - new String[] { "h2" }); + ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(jobId, + 2, Resource.newInstance(1024, 1), new String[] {"h2"}); allocator.sendRequest(event2); // this tells the scheduler about the requests @@ -232,8 +232,8 @@ public void testSimple() throws Exception { Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size()); // send another request with different resource and priority - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, - new String[] { "h3" }); + ContainerRequestEvent event3 = ContainerRequestCreator.createRequest(jobId, + 3, Resource.newInstance(1024, 1), new String[] {"h3"}); allocator.sendRequest(event3); // this tells the scheduler about the requests @@ -242,7 +242,7 @@ public void testSimple() throws Exception { rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size()); - + // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat @@ -252,21 +252,21 @@ public void testSimple() throws Exception { assigned = allocator.schedule(); rm.drainEvents(); Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size()); - checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, + checkAssignments(new ContainerRequestEvent[] {event1, event2, event3}, assigned, false); - + // check that the assigned container requests are cancelled allocator.schedule(); rm.drainEvents(); Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size()); } - - @Test + + @Test public void testMapNodeLocality() throws Exception { - // test checks that ordering of allocated containers list from the RM does - // not affect the map->container assignment done by the AM. If there is a - // node local container available for a map then it should be assigned to - // that container and not a rack-local container that happened to be seen + // test checks that ordering of allocated containers list from the RM does + // not affect the map->container assignment done by the AM. If there is a + // node local container available for a map then it should be assigned to + // that container and not a rack-local container that happened to be seen // earlier in the allocated containers list from the RM. // Regression test for MAPREDUCE-4893 LOG.info("Running testMapNodeLocality"); @@ -291,26 +291,29 @@ public void testMapNodeLocality() throws Exception { JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( - MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob); // add resources to scheduler - MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps + MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map rm.drainEvents(); // create the container requests for maps - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = ContainerRequestCreator.createRequest( + jobId, 1, Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event1); - ContainerRequestEvent event2 = createReq(jobId, 2, 1024, - new String[] { "h1" }); + ContainerRequestEvent event2 = ContainerRequestCreator.createRequest( + jobId, 2, Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event2); - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, - new String[] { "h2" }); + ContainerRequestEvent event3 = ContainerRequestCreator.createRequest( + jobId, 3, Resource.newInstance(1024, 1), + new String[]{"h2"}); allocator.sendRequest(event3); // this tells the scheduler about the requests @@ -323,14 +326,14 @@ public void testMapNodeLocality() throws Exception { // Node heartbeat from rack-local first. This makes node h3 the first in the // list of allocated containers but it should not be assigned to task1. nodeManager3.nodeHeartbeat(true); - // Node heartbeat from node-local next. This allocates 2 node local + // Node heartbeat from node-local next. This allocates 2 node local // containers for task1 and task2. These should be matched with those tasks. nodeManager1.nodeHeartbeat(true); rm.drainEvents(); assigned = allocator.schedule(); rm.drainEvents(); - checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, + checkAssignments(new ContainerRequestEvent[] {event1, event2, event3}, assigned, false); // remove the rack-local assignment that should have happened for task3 for(TaskAttemptContainerAssignedEvent event : assigned) { @@ -340,7 +343,7 @@ public void testMapNodeLocality() throws Exception { break; } } - checkAssignments(new ContainerRequestEvent[] { event1, event2}, + checkAssignments(new ContainerRequestEvent[] {event1, event2}, assigned, true); } @@ -381,13 +384,15 @@ public void testResource() throws Exception { rm.drainEvents(); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = ContainerRequestCreator.createRequest( + jobId, 1, Resource.newInstance(1024, 1), + new String[] {"h1"}); allocator.sendRequest(event1); // send 1 more request with different resource req - ContainerRequestEvent event2 = createReq(jobId, 2, 2048, - new String[] { "h2" }); + ContainerRequestEvent event2 = ContainerRequestCreator.createRequest( + jobId, 2, Resource.newInstance(1024, 1), + new String[] {"h2"}); allocator.sendRequest(event2); // this tells the scheduler about the requests @@ -404,7 +409,7 @@ public void testResource() throws Exception { assigned = allocator.schedule(); rm.drainEvents(); - checkAssignments(new ContainerRequestEvent[] { event1, event2 }, + checkAssignments(new ContainerRequestEvent[] {event1, event2}, assigned, false); } @@ -439,15 +444,19 @@ public void testReducerRampdownDiagnostics() throws Exception { rm.drainEvents(); // create the container request - final String[] locations = new String[] { host }; - allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); + final String[] locations = new String[] {host}; + allocator.sendRequest(createRequest(jobId, 0, + Resource.newInstance(1024, 1), + locations, false, true)); for (int i = 0; i < 1;) { rm.drainEvents(); i += allocator.schedule().size(); nm.nodeHeartbeat(true); } - allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false)); + allocator.sendRequest(createRequest(jobId, 0, + Resource.newInstance(1024, 1), + locations, true, false)); while (allocator.getTaskAttemptKillEvents().size() == 0) { rm.drainEvents(); allocator.schedule().size(); @@ -494,9 +503,10 @@ public void testPreemptReducers() throws Exception { RMContainerAllocator.ScheduledRequests scheduledRequests = allocator.getScheduledRequests(); ContainerRequestEvent event1 = - createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + createRequest(jobId, 1, Resource.newInstance(2048, 1), + new String[] {"h1"}, false, false); scheduledRequests.maps.put(mock(TaskAttemptId.class), - new RMContainerRequestor.ContainerRequest(event1, null,null)); + new RMContainerRequestor.ContainerRequest(event1, null, null)); assignedRequests.reduces.put(mock(TaskAttemptId.class), mock(Container.class)); @@ -547,9 +557,12 @@ public void testNonAggressivelyPreemptReducers() throws Exception { RMContainerAllocator.ScheduledRequests scheduledRequests = allocator.getScheduledRequests(); ContainerRequestEvent event1 = - createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + createRequest(jobId, 1, + Resource.newInstance(2048, 1), + new String[] {"h1"}, false, false); scheduledRequests.maps.put(mock(TaskAttemptId.class), - new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); + new RMContainerRequestor.ContainerRequest(event1, null, + clock.getTime())); assignedRequests.reduces.put(mock(TaskAttemptId.class), mock(Container.class)); @@ -561,7 +574,7 @@ public void testNonAggressivelyPreemptReducers() throws Exception { clock.setTime(clock.getTime() + (preemptThreshold) * 1000); allocator.preemptReducesIfNeeded(); Assert.assertEquals("The reducer is not preeempted", 1, - assignedRequests.preemptionWaitingReduces.size()); + assignedRequests.preemptionWaitingReduces.size()); } @Test(timeout = 30000) @@ -608,9 +621,12 @@ public void testUnconditionalPreemptReducers() throws Exception { RMContainerAllocator.ScheduledRequests scheduledRequests = allocator.getScheduledRequests(); ContainerRequestEvent event1 = - createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + createRequest(jobId, 1, + Resource.newInstance(2048, 1), + new String[] {"h1"}, false, false); scheduledRequests.maps.put(mock(TaskAttemptId.class), - new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); + new RMContainerRequestor.ContainerRequest(event1, null, + clock.getTime())); assignedRequests.reduces.put(mock(TaskAttemptId.class), mock(Container.class)); @@ -651,13 +667,17 @@ public void testExcessReduceContainerAssign() throws Exception { appAttemptId, mockJob, SystemClock.getInstance()); // request to allocate two reduce priority containers - final String[] locations = new String[] { host }; - allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); + final String[] locations = new String[] {host}; + allocator.sendRequest(createRequest(jobId, 0, + Resource.newInstance(1024, 1), + locations, false, true)); allocator.scheduleAllReduces(); allocator.makeRemoteRequest(); nm.nodeHeartbeat(true); rm.drainEvents(); - allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false)); + allocator.sendRequest(createRequest(jobId, 1, + Resource.newInstance(1024, 1), + locations, false, false)); int assignedContainer; for (assignedContainer = 0; assignedContainer < 1;) { @@ -684,7 +704,7 @@ public void testMapReduceAllocationWithNodeLabelExpression() throws Exception { conf.set(MRJobConfig.REDUCE_NODE_LABEL_EXP, "ReduceNodes"); ApplicationId appId = ApplicationId.newInstance(1, 1); ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); + ApplicationAttemptId.newInstance(appId, 1); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( @@ -706,13 +726,16 @@ protected ApplicationMasterProtocol createSchedulerProxy() { // create some map requests ContainerRequestEvent reqMapEvents; - reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" }); + reqMapEvents = ContainerRequestCreator.createRequest(jobId, 0, + Resource.newInstance(1024, 1), new String[]{"map"}); allocator.sendRequests(Arrays.asList(reqMapEvents)); // create some reduce requests ContainerRequestEvent reqReduceEvents; reqReduceEvents = - createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true); + createRequest(jobId, 0, + Resource.newInstance(2048, 1), + new String[] {"reduce"}, false, true); allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.schedule(); // verify all of the host-specific asks were sent plus one for the @@ -883,18 +906,21 @@ public void testMapReduceScheduling() throws Exception { // create the container request // send MAP request - ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] { - "h1", "h2" }, true, false); + ContainerRequestEvent event1 = createRequest(jobId, 1, + Resource.newInstance(2048, 1), + new String[] {"h1", "h2"}, true, false); allocator.sendRequest(event1); // send REDUCE request - ContainerRequestEvent event2 = createReq(jobId, 2, 3000, - new String[] { "h1" }, false, true); + ContainerRequestEvent event2 = createRequest(jobId, 2, + Resource.newInstance(3000, 1), + new String[] {"h1"}, false, true); allocator.sendRequest(event2); // send MAP request - ContainerRequestEvent event3 = createReq(jobId, 3, 2048, - new String[] { "h3" }, false, false); + ContainerRequestEvent event3 = createRequest(jobId, 3, + Resource.newInstance(2048, 1), + new String[] {"h3"}, false, false); allocator.sendRequest(event3); // this tells the scheduler about the requests @@ -911,7 +937,7 @@ public void testMapReduceScheduling() throws Exception { assigned = allocator.schedule(); rm.drainEvents(); - checkAssignments(new ContainerRequestEvent[] { event1, event3 }, + checkAssignments(new ContainerRequestEvent[] {event1, event3}, assigned, false); // validate that no container is assigned to h1 as it doesn't have 2048 @@ -921,10 +947,10 @@ public void testMapReduceScheduling() throws Exception { } } - private static class MyResourceManager extends MockRM { + static class MyResourceManager extends MockRM { private static long fakeClusterTimeStamp = System.currentTimeMillis(); - + public MyResourceManager(Configuration conf) { super(conf); } @@ -955,7 +981,7 @@ public void handle(SchedulerEvent event) { protected ResourceScheduler createScheduler() { return new MyFifoScheduler(this.getRMContext()); } - + MyFifoScheduler getMyFifoScheduler() { return (MyFifoScheduler) scheduler; } @@ -1221,7 +1247,7 @@ protected ContainerAllocator createContainerAllocator( Assert.assertEquals(0.95f, job.getProgress(), 0.001f); Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); } - + @Test public void testUpdatedNodes() throws Exception { Configuration conf = new Configuration(); @@ -1251,11 +1277,13 @@ public void testUpdatedNodes() throws Exception { rm.drainEvents(); // create the map container request - ContainerRequestEvent event = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event = + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[] {"h1"}); allocator.sendRequest(event); TaskAttemptId attemptId = event.getAttemptID(); - + TaskAttempt mockTaskAttempt = mock(TaskAttempt.class); when(mockTaskAttempt.getNodeId()).thenReturn(nm1.getNodeId()); Task mockTask = mock(Task.class); @@ -1279,7 +1307,7 @@ public void testUpdatedNodes() throws Exception { // no updated nodes reported Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty()); Assert.assertTrue(allocator.getTaskAttemptKillEvents().isEmpty()); - + // mark nodes bad nm1.nodeHeartbeat(false); nm2.nodeHeartbeat(false); @@ -1292,11 +1320,13 @@ public void testUpdatedNodes() throws Exception { // updated nodes are reported Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size()); - Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); - Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID()); + Assert.assertEquals(2, + allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); + Assert.assertEquals(attemptId, + allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID()); allocator.getJobUpdatedNodeEvents().clear(); allocator.getTaskAttemptKillEvents().clear(); - + assigned = allocator.schedule(); rm.drainEvents(); Assert.assertEquals(0, assigned.size()); @@ -1307,7 +1337,7 @@ public void testUpdatedNodes() throws Exception { @Test public void testBlackListedNodes() throws Exception { - + LOG.info("Running testBlackListedNodes"); Configuration conf = new Configuration(); @@ -1315,7 +1345,7 @@ public void testBlackListedNodes() throws Exception { conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); conf.setInt( MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); - + MyResourceManager rm = new MyResourceManager(conf); rm.start(); @@ -1331,7 +1361,7 @@ public void testBlackListedNodes() throws Exception { .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); rm.drainEvents(); - + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( @@ -1347,18 +1377,24 @@ public void testBlackListedNodes() throws Exception { rm.drainEvents(); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[] {"h1"}); allocator.sendRequest(event1); // send 1 more request with different resource req - ContainerRequestEvent event2 = createReq(jobId, 2, 1024, - new String[] { "h2" }); + ContainerRequestEvent event2 = + ContainerRequestCreator.createRequest(jobId, 2, + Resource.newInstance(1024, 1), + new String[] {"h2"}); allocator.sendRequest(event2); // send another request with different resource and priority - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, - new String[] { "h3" }); + ContainerRequestEvent event3 = + ContainerRequestCreator.createRequest(jobId, 3, + Resource.newInstance(1024, 1), + new String[] {"h3"}); allocator.sendRequest(event3); // this tells the scheduler about the requests @@ -1368,9 +1404,9 @@ public void testBlackListedNodes() throws Exception { Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Send events to blacklist nodes h1 and h2 - ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); + ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); allocator.sendFailure(f1); - ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false); + ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false); allocator.sendFailure(f2); // update resources in scheduler @@ -1392,23 +1428,23 @@ public void testBlackListedNodes() throws Exception { assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); nodeManager3.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - + Assert.assertTrue("No of assignments must be 3", assigned.size() == 3); - + // validate that all containers are assigned to h3 for (TaskAttemptContainerAssignedEvent assig : assigned) { Assert.assertTrue("Assigned container host not correct", "h3".equals(assig .getContainer().getNodeId().getHost())); } } - + @Test public void testIgnoreBlacklisting() throws Exception { LOG.info("Running testIgnoreBlacklisting"); @@ -1448,7 +1484,7 @@ public void testIgnoreBlacklisting() throws Exception { // Known=1, blacklisted=0, ignore should be false - assign first container assigned = - getContainerOnHost(jobId, 1, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 1, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); @@ -1463,47 +1499,47 @@ public void testIgnoreBlacklisting() throws Exception { // Because makeRemoteRequest will not be aware of it until next call // The current call will send blacklisted node "h1" to RM assigned = - getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 2, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 1, 0, 0, 1, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Known=1, blacklisted=1, ignore should be true - assign 1 assigned = - getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 2, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=2, blacklisted=1, ignore should be true - assign 1 anyway. assigned = - getContainerOnHost(jobId, 3, 1024, new String[] { "h2" }, + getContainerOnHost(jobId, 3, 1024, new String[] {"h2"}, nodeManagers[1], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. assigned = - getContainerOnHost(jobId, 4, 1024, new String[] { "h3" }, + getContainerOnHost(jobId, 4, 1024, new String[] {"h3"}, nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Known=3, blacklisted=1, ignore should be true - assign 1 assigned = - getContainerOnHost(jobId, 5, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 5, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=4, blacklisted=1, ignore should be false - assign 1 anyway assigned = - getContainerOnHost(jobId, 6, 1024, new String[] { "h4" }, + getContainerOnHost(jobId, 6, 1024, new String[] {"h4"}, nodeManagers[3], allocator, 0, 0, 1, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Test blacklisting re-enabled. // Known=4, blacklisted=1, ignore should be false - no assignment on h1 assigned = - getContainerOnHost(jobId, 7, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 7, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // RMContainerRequestor would have created a replacement request. @@ -1516,20 +1552,20 @@ public void testIgnoreBlacklisting() throws Exception { // Known=4, blacklisted=2, ignore should be true. Should assign 0 // container for the same reason above. assigned = - getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 8, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 1, 0, 0, 2, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Known=4, blacklisted=2, ignore should be true. Should assign 2 // containers. assigned = - getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, + getContainerOnHost(jobId, 8, 1024, new String[] {"h1"}, nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); // Known=4, blacklisted=2, ignore should be true. assigned = - getContainerOnHost(jobId, 9, 1024, new String[] { "h2" }, + getContainerOnHost(jobId, 9, 1024, new String[] {"h2"}, nodeManagers[1], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); @@ -1540,23 +1576,23 @@ public void testIgnoreBlacklisting() throws Exception { nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=5, blacklisted=3, ignore should be true. assigned = - getContainerOnHost(jobId, 10, 1024, new String[] { "h3" }, + getContainerOnHost(jobId, 10, 1024, new String[] {"h3"}, nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - + // Assign on 5 more nodes - to re-enable blacklisting for (int i = 0; i < 5; i++) { nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); assigned = getContainerOnHost(jobId, 11 + i, 1024, - new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i], + new String[] {String.valueOf(5 + i)}, nodeManagers[4 + i], allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); } // Test h3 (blacklisted while ignoring blacklisting) is blacklisted. assigned = - getContainerOnHost(jobId, 20, 1024, new String[] { "h3" }, + getContainerOnHost(jobId, 20, 1024, new String[] {"h3"}, nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); } @@ -1576,7 +1612,8 @@ List getContainerOnHost(JobId jobId, int expectedAdditions2, int expectedRemovals2, MyResourceManager rm) throws Exception { ContainerRequestEvent reqEvent = - createReq(jobId, taskAttemptId, memory, hosts); + ContainerRequestCreator.createRequest(jobId, taskAttemptId, + Resource.newInstance(memory, 1), hosts); allocator.sendRequest(reqEvent); // Send the request to the RM @@ -1596,7 +1633,7 @@ List getContainerOnHost(JobId jobId, expectedAdditions2, expectedRemovals2, rm); return assigned; } - + @Test public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { LOG.info("Running testBlackListedNodesWithSchedulingToThatNode"); @@ -1606,7 +1643,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1); conf.setInt( MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT, -1); - + MyResourceManager rm = new MyResourceManager(conf); rm.start(); @@ -1622,7 +1659,7 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); rm.drainEvents(); - + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); when(mockJob.getReport()).thenReturn( @@ -1638,8 +1675,10 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { LOG.info("Requesting 1 Containers _1 on H1"); // create the container request - ContainerRequestEvent event1 = createReq(jobId, 1, 1024, - new String[] { "h1" }); + ContainerRequestEvent event1 = + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[] {"h1"}); allocator.sendRequest(event1); LOG.info("RM Heartbeat (to send the container requests)"); @@ -1653,13 +1692,13 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); - + LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - + Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); + LOG.info("Failing container _1 on H1 (should blacklist the node)"); // Send events to blacklist nodes h1 and h2 ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false); @@ -1667,8 +1706,9 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { //At this stage, a request should be created for a fast fail map //Create a FAST_FAIL request for a previously failed map. - ContainerRequestEvent event1f = createReq(jobId, 1, 1024, - new String[] { "h1" }, true, false); + ContainerRequestEvent event1f = createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[] {"h1"}, true, false); allocator.sendRequest(event1f); //Update the Scheduler with the new requests. @@ -1678,24 +1718,26 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // send another request with different resource and priority - ContainerRequestEvent event3 = createReq(jobId, 3, 1024, - new String[] { "h1", "h3" }); + ContainerRequestEvent event3 = + ContainerRequestCreator.createRequest(jobId, 3, + Resource.newInstance(1024, 1), + new String[] {"h1", "h3"}); allocator.sendRequest(event3); - + //Allocator is aware of prio:5 container, and prio:20 (h1+h3) container. //RM is only aware of the prio:5 container - + LOG.info("h1 Heartbeat (To actually schedule the containers)"); // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); - + LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); - + Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); + //RMContainerAllocator gets assigned a p:5 on a blacklisted node. //Send a release for the p:5 container + another request. @@ -1704,26 +1746,26 @@ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception { rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); - + //Hearbeat from H3 to schedule on this host. LOG.info("h3 Heartbeat (To re-schedule the containers)"); nodeManager3.nodeHeartbeat(true); // Node heartbeat rm.drainEvents(); - + LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)"); assigned = allocator.schedule(); assertBlacklistAdditionsAndRemovals(0, 0, rm); rm.drainEvents(); - + // For debugging for (TaskAttemptContainerAssignedEvent assig : assigned) { LOG.info(assig.getTaskAttemptID() + " assgined to " + assig.getContainer().getId() + " with priority " + assig.getContainer().getPriority()); } - + Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); - + // validate that all containers are assigned to h3 for (TaskAttemptContainerAssignedEvent assig : assigned) { Assert.assertEquals("Assigned container " + assig.getContainer().getId() @@ -1759,13 +1801,13 @@ public MyFifoScheduler(RMContext rmContext) { assert (false); } } - + List lastAsk = null; List lastRelease = null; List lastBlacklistAdditions; List lastBlacklistRemovals; Resource forceResourceLimit = null; - + // override this to copy the objects otherwise FifoScheduler updates the // numContainers in same objects as kept by RMContainerAllocator @Override @@ -1855,38 +1897,6 @@ public synchronized Allocation allocate( } } - private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, - int memory, String[] hosts) { - return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false); - } - - private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, - int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) { - return createReq(jobId, taskAttemptId, mem, - 1, hosts, earlierFailedAttempt, reduce); - } - - private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, - int memory, int vcore, String[] hosts, boolean earlierFailedAttempt, - boolean reduce) { - TaskId taskId; - if (reduce) { - taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); - } else { - taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); - } - TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, - taskAttemptId); - Resource containerNeed = Resource.newInstance(memory, vcore); - if (earlierFailedAttempt) { - return ContainerRequestEvent - .createContainerRequestEventForFailedContainer(attemptId, - containerNeed); - } - return new ContainerRequestEvent(attemptId, containerNeed, hosts, - new String[] { NetworkTopology.DEFAULT_RACK }); - } - private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId, String host, boolean reduce) { TaskId taskId; @@ -1897,9 +1907,9 @@ private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId, } TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, taskAttemptId); - return new ContainerFailedEvent(attemptId, host); + return new ContainerFailedEvent(attemptId, host); } - + private ContainerAllocatorEvent createDeallocateEvent(JobId jobId, int taskAttemptId, boolean reduce) { TaskId taskId; @@ -1957,14 +1967,14 @@ private void checkAssignment(ContainerRequestEvent request, // Mock RMContainerAllocator // Instead of talking to remote Scheduler,uses the local Scheduler - private static class MyContainerAllocator extends RMContainerAllocator { - static final List events - = new ArrayList(); - static final List taskAttemptKillEvents - = new ArrayList(); - static final List jobUpdatedNodeEvents - = new ArrayList(); - static final List jobEvents = new ArrayList(); + static class MyContainerAllocator extends RMContainerAllocator { + static final List events = + new ArrayList<>(); + static final List taskAttemptKillEvents = + new ArrayList<>(); + static final List jobUpdatedNodeEvents = + new ArrayList<>(); + static final List jobEvents = new ArrayList<>(); private MyResourceManager rm; private boolean isUnregistered = false; private AllocateResponse allocateResponse; @@ -2069,7 +2079,7 @@ protected Resource getMaxContainerCapability() { } public void sendRequest(ContainerRequestEvent req) { - sendRequests(Arrays.asList(new ContainerRequestEvent[] { req })); + sendRequests(Arrays.asList(new ContainerRequestEvent[] {req})); } public void sendRequests(List reqs) { @@ -2081,7 +2091,7 @@ public void sendRequests(List reqs) { public void sendFailure(ContainerFailedEvent f) { super.handleEvent(f); } - + public void sendDeallocate(ContainerAllocatorEvent f) { super.handleEvent(f); } @@ -2099,16 +2109,15 @@ public Boolean get() { // run the scheduler super.heartbeat(); - List result - = new ArrayList(events); + List result = new ArrayList<>(events); events.clear(); return result; } - + static List getTaskAttemptKillEvents() { return taskAttemptKillEvents; } - + static List getJobUpdatedNodeEvents() { return jobUpdatedNodeEvents; } @@ -2117,12 +2126,12 @@ static List getJobUpdatedNodeEvents() { protected void startAllocatorThread() { // override to NOT start thread } - + @Override protected boolean isApplicationMasterRegistered() { return super.isApplicationMasterRegistered(); } - + public boolean isUnregistered() { return isUnregistered; } @@ -2164,7 +2173,7 @@ public void testReduceScheduling() throws Exception { int numPendingReduces = 4; float maxReduceRampupLimit = 0.5f; float reduceSlowStart = 0.2f; - + RMContainerAllocator allocator = mock(RMContainerAllocator.class); doCallRealMethod().when(allocator).scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), any(Resource.class), @@ -2174,14 +2183,14 @@ public void testReduceScheduling() throws Exception { // Test slow-start allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, never()).setIsReduceStarted(true); - + // verify slow-start still in effect when no more maps need to // be scheduled but some have yet to complete allocator.scheduleReduces( @@ -2197,23 +2206,23 @@ public void testReduceScheduling() throws Exception { succeededMaps = 3; doReturn(BuilderUtils.newResource(0, 0)).when(allocator).getResourceLimit(); allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, times(1)).setIsReduceStarted(true); - + // Test reduce ramp-up doReturn(BuilderUtils.newResource(100 * 1024, 100 * 1)).when(allocator) .getResourceLimit(); allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator).rampUpReduces(anyInt()); verify(allocator, never()).rampDownReduces(anyInt()); @@ -2232,18 +2241,18 @@ public void testReduceScheduling() throws Exception { verify(allocator).rampDownReduces(anyInt()); // Test reduce ramp-down for when there are scheduled maps - // Since we have two scheduled Maps, rampDownReducers + // Since we have two scheduled Maps, rampDownReducers // should be invoked twice. scheduledMaps = 2; assignedReduces = 2; doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator) .getResourceLimit(); allocator.scheduleReduces( - totalMaps, succeededMaps, - scheduledMaps, scheduledReduces, - assignedMaps, assignedReduces, - mapResourceReqt, reduceResourceReqt, - numPendingReduces, + totalMaps, succeededMaps, + scheduledMaps, scheduledReduces, + assignedMaps, assignedReduces, + mapResourceReqt, reduceResourceReqt, + numPendingReduces, maxReduceRampupLimit, reduceSlowStart); verify(allocator, times(2)).rampDownReduces(anyInt()); @@ -2288,7 +2297,7 @@ public void scheduleReduces(int totalMaps, int completedMaps, recalculatedReduceSchedule = true; } } - + @Test public void testCompletedTasksRecalculateSchedule() throws Exception { LOG.info("Running testCompletedTasksRecalculateSchedule"); @@ -2400,31 +2409,33 @@ public void testCompletedContainerEvent() { RMContainerAllocator allocator = new RMContainerAllocator( mock(ClientService.class), mock(AppContext.class), new NoopAMPreemptionPolicy()); - + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId( MRBuilderUtils.newTaskId( MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1); ApplicationId applicationId = ApplicationId.newInstance(1, 1); - ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance( - applicationId, 1); - ContainerId containerId = ContainerId.newContainerId(applicationAttemptId, 1); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(applicationId, 1); + ContainerId containerId = + ContainerId.newContainerId(applicationAttemptId, 1); ContainerStatus status = ContainerStatus.newInstance( containerId, ContainerState.RUNNING, "", 0); ContainerStatus abortedStatus = ContainerStatus.newInstance( containerId, ContainerState.RUNNING, "", ContainerExitStatus.ABORTED); - + TaskAttemptEvent event = allocator.createContainerFinishedEvent(status, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED, event.getType()); - + TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent( abortedStatus, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType()); - - ContainerId containerId2 = ContainerId.newContainerId(applicationAttemptId, 2); + + ContainerId containerId2 = + ContainerId.newContainerId(applicationAttemptId, 2); ContainerStatus status2 = ContainerStatus.newInstance(containerId2, ContainerState.RUNNING, "", 0); @@ -2440,7 +2451,7 @@ public void testCompletedContainerEvent() { preemptedStatus, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType()); } - + @Test public void testUnregistrationOnlyIfRegistered() throws Exception { Configuration conf = new Configuration(); @@ -2483,7 +2494,7 @@ protected ContainerAllocator createContainerAllocator( mrApp.stop(); Assert.assertTrue(allocator.isUnregistered()); } - + // Step-1 : AM send allocate request for 2 ContainerRequests and 1 // blackListeNode // Step-2 : 2 containers are allocated by RM. @@ -2542,11 +2553,15 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() // create the container request // send MAP request ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 2048, new String[] { "h1", "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + Resource.newInstance(2048, 1), + new String[] {"h1", "h2"}); allocator.sendRequest(event2); // Send events to blacklist h2 @@ -2584,7 +2599,9 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() // RM // send container request ContainerRequestEvent event3 = - createReq(jobId, 3, 1000, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 3, + Resource.newInstance(1000, 1), + new String[]{"h1"}); allocator.sendRequest(event3); // send deallocate request @@ -2628,7 +2645,9 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() allocator.sendFailure(f2); ContainerRequestEvent event4 = - createReq(jobId, 4, 2000, new String[] { "h1", "h2" }); + ContainerRequestCreator.createRequest(jobId, 4, + Resource.newInstance(2000, 1), + new String[]{"h1", "h2"}); allocator.sendRequest(event4); // send allocate request to 2nd RM and get resync command @@ -2639,7 +2658,9 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() // asks,release,blacklistAaddition // and another containerRequest(event5) ContainerRequestEvent event5 = - createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" }); + ContainerRequestCreator.createRequest(jobId, 5, + Resource.newInstance(3000, 1), + new String[]{"h1", "h2", "h3"}); allocator.sendRequest(event5); // send all outstanding request again. @@ -2696,9 +2717,10 @@ protected Resource getMaxContainerCapability() { } }; - ContainerRequestEvent mapRequestEvt = createReq(jobId, 0, - (int) (maxContainerSupported.getMemorySize() + 10), - maxContainerSupported.getVirtualCores(), + final int memory = (int) (maxContainerSupported.getMemorySize() + 10); + ContainerRequestEvent mapRequestEvt = createRequest(jobId, 0, + Resource.newInstance(memory, + maxContainerSupported.getVirtualCores()), new String[0], false, false); allocator.sendRequests(Arrays.asList(mapRequestEvt)); allocator.schedule(); @@ -2734,10 +2756,11 @@ protected Resource getMaxContainerCapability() { } }; - ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0, - (int) (maxContainerSupported.getMemorySize() + 10), - maxContainerSupported.getVirtualCores(), - new String[0], false, true); + final int memory = (int) (maxContainerSupported.getMemorySize() + 10); + ContainerRequestEvent reduceRequestEvt = createRequest(jobId, 0, + Resource.newInstance(memory, + maxContainerSupported.getVirtualCores()), + new String[0], false, true); allocator.sendRequests(Arrays.asList(reduceRequestEvt)); // Reducer container requests are added to the pending queue upon request, // schedule all reducers here so that we can observe if reducer requests @@ -2787,8 +2810,9 @@ public void testRMUnavailable() rm1.drainEvents(); Assert.assertEquals("Should Have 1 Job Event", 1, allocator.jobEvents.size()); - JobEvent event = allocator.jobEvents.get(0); - Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT)); + JobEvent event = allocator.jobEvents.get(0); + Assert.assertTrue("Should Reboot", + event.getType().equals(JobEventType.JOB_AM_REBOOT)); } @Test(timeout=60000) @@ -2920,7 +2944,9 @@ protected void setRequestLimit(Priority priority, // create some map requests ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; for (int i = 0; i < reqMapEvents.length; ++i) { - reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); + reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i, + Resource.newInstance(1024, 1), + new String[] {"h" + i}); } allocator.sendRequests(Arrays.asList(reqMapEvents)); // create some reduce requests @@ -2928,7 +2954,8 @@ protected void setRequestLimit(Priority priority, new ContainerRequestEvent[REDUCE_COUNT]; for (int i = 0; i < reqReduceEvents.length; ++i) { reqReduceEvents[i] = - createReq(jobId, i, 1024, new String[] {}, false, true); + createRequest(jobId, i, Resource.newInstance(1024, 1), + new String[] {}, false, true); } allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.schedule(); @@ -2975,14 +3002,17 @@ protected ApplicationMasterProtocol createSchedulerProxy() { // create some map requests ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; for (int i = 0; i < reqMapEvents.length; ++i) { - reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); + reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i, + Resource.newInstance(1024, 1), new String[] {"h" + i}); } allocator.sendRequests(Arrays.asList(reqMapEvents)); // create some reduce requests - ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT]; + ContainerRequestEvent[] reqReduceEvents = + new ContainerRequestEvent[REDUCE_COUNT]; for (int i = 0; i < reqReduceEvents.length; ++i) { - reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {}, - false, true); + reqReduceEvents[i] = + createRequest(jobId, i, Resource.newInstance(1024, 1), + new String[] {}, false, true); } allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.schedule(); @@ -3137,13 +3167,19 @@ public void testUpdateAskOnRampDownAllReduces() throws Exception { // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 1024, new String[] { "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + Resource.newInstance(1024, 1), + new String[]{"h2"}); allocator.sendRequest(event2); ContainerRequestEvent event3 = - createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); + createRequest(jobId, 3, + Resource.newInstance(1024, 1), + new String[]{"h2"}, false, true); allocator.sendRequest(event3); // This will tell the scheduler about the requests but there will be no @@ -3156,7 +3192,8 @@ public void testUpdateAskOnRampDownAllReduces() throws Exception { // Request for another reducer on h3 which has not registered. ContainerRequestEvent event4 = - createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + createRequest(jobId, 4, Resource.newInstance(1024, 1), + new String[] {"h3"}, false, true); allocator.sendRequest(event4); allocator.schedule(); @@ -3301,13 +3338,18 @@ public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 1024, new String[] { "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + Resource.newInstance(1024, 1), + new String[]{"h2"}); allocator.sendRequest(event2); ContainerRequestEvent event3 = - createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); + createRequest(jobId, 3, Resource.newInstance(1024, 1), + new String[]{"h2"}, false, true); allocator.sendRequest(event3); // This will tell the scheduler about the requests but there will be no @@ -3320,7 +3362,8 @@ public void testAvoidAskMoreReducersWhenReducerPreemptionIsRequired() // Request for another reducer on h3 which has not registered. ContainerRequestEvent event4 = - createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + createRequest(jobId, 4, Resource.newInstance(1024, 1), + new String[]{"h3"}, false, true); allocator.sendRequest(event4); allocator.schedule(); @@ -3433,13 +3476,19 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = - createReq(jobId, 1, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 1, + Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event1); ContainerRequestEvent event2 = - createReq(jobId, 2, 1024, new String[] { "h2" }); + ContainerRequestCreator.createRequest(jobId, 2, + Resource.newInstance(1024, 1), + new String[]{"h2"}); allocator.sendRequest(event2); ContainerRequestEvent event3 = - createReq(jobId, 3, 1024, new String[] { "h1" }, false, true); + createRequest(jobId, 3, + Resource.newInstance(1024, 1), + new String[]{"h1"}, false, true); allocator.sendRequest(event3); // This will tell the scheduler about the requests but there will be no @@ -3449,7 +3498,8 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { // Request for another reducer on h3 which has not registered. ContainerRequestEvent event4 = - createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); + createRequest(jobId, 4, Resource.newInstance(1024, 1), + new String[] {"h3"}, false, true); allocator.sendRequest(event4); allocator.schedule(); @@ -3486,7 +3536,9 @@ public void testExcludeSchedReducesFromHeadroom() throws Exception { // Send request for one more mapper. ContainerRequestEvent event5 = - createReq(jobId, 5, 1024, new String[] { "h1" }); + ContainerRequestCreator.createRequest(jobId, 5, + Resource.newInstance(1024, 1), + new String[]{"h1"}); allocator.sendRequest(event5); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2)); @@ -3528,7 +3580,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster( return RegisterApplicationMasterResponse.newInstance( Resource.newInstance(512, 1), Resource.newInstance(512000, 1024), - Collections.emptyMap(), + Collections.emptyMap(), ByteBuffer.wrap("fake_key".getBytes()), Collections.emptyList(), "default", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java index 7a212e163d..1da2fedc08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java @@ -175,16 +175,8 @@ public static long convert(String fromUnit, String toUnit, long fromValue) { */ public static int compare(String unitA, long valueA, String unitB, long valueB) { - if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA) - || !KNOWN_UNITS.contains(unitB)) { - throw new IllegalArgumentException("Units cannot be null"); - } - if (!KNOWN_UNITS.contains(unitA)) { - throw new IllegalArgumentException("Unknown unit '" + unitA + "'"); - } - if (!KNOWN_UNITS.contains(unitB)) { - throw new IllegalArgumentException("Unknown unit '" + unitB + "'"); - } + checkUnitArgument(unitA); + checkUnitArgument(unitB); if (unitA.equals(unitB)) { return Long.compare(valueA, valueB); } @@ -218,4 +210,36 @@ public static int compare(String unitA, long valueA, String unitB, return tmpA.compareTo(tmpB); } } + + private static void checkUnitArgument(String unit) { + if (unit == null) { + throw new IllegalArgumentException("Unit cannot be null"); + } else if (!KNOWN_UNITS.contains(unit)) { + throw new IllegalArgumentException("Unknown unit '" + unit + "'"); + } + } + + /** + * Compare a unit to another unit. + *
+ * Examples:
+ * 1. 'm' (milli) is smaller than 'k' (kilo), so compareUnits("m", "k") + * will return -1.
+ * 2. 'M' (MEGA) is greater than 'k' (kilo), so compareUnits("M", "k") will + * return 1. + * + * @param unitA first unit + * @param unitB second unit + * @return +1, 0 or -1 depending on whether the relationship between units + * is smaller than, + * equal to or lesser than. + */ + public static int compareUnits(String unitA, String unitB) { + checkUnitArgument(unitA); + checkUnitArgument(unitB); + int unitAPos = SORTED_UNITS.indexOf(unitA); + int unitBPos = SORTED_UNITS.indexOf(unitB); + + return Integer.compare(unitAPos, unitBPos); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java new file mode 100644 index 0000000000..98a8a003b2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.resourcetypes; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Contains helper methods to create Resource and ResourceInformation objects. + * ResourceInformation can be created from a resource name + * and a resource descriptor as well that comprises amount and unit. + */ +public final class ResourceTypesTestHelper { + + private static final Pattern RESOURCE_VALUE_AND_UNIT_PATTERN = + Pattern.compile("(\\d+)([A-za-z]*)"); + + private ResourceTypesTestHelper() {} + + private static final RecordFactory RECORD_FACTORY = RecordFactoryProvider + .getRecordFactory(null); + + private static final class ResourceValueAndUnit { + private final Long value; + private final String unit; + + private ResourceValueAndUnit(Long value, String unit) { + this.value = value; + this.unit = unit; + } + } + + public static Resource newResource(long memory, int vCores, Map customResources) { + Resource resource = RECORD_FACTORY.newRecordInstance(Resource.class); + resource.setMemorySize(memory); + resource.setVirtualCores(vCores); + + for (Map.Entry customResource : + customResources.entrySet()) { + String resourceName = customResource.getKey(); + ResourceInformation resourceInformation = + createResourceInformation(resourceName, + customResource.getValue()); + resource.setResourceInformation(resourceName, resourceInformation); + } + return resource; + } + + public static ResourceInformation createResourceInformation(String + resourceName, String descriptor) { + ResourceValueAndUnit resourceValueAndUnit = + getResourceValueAndUnit(descriptor); + return ResourceInformation + .newInstance(resourceName, resourceValueAndUnit.unit, + resourceValueAndUnit.value); + } + + private static ResourceValueAndUnit getResourceValueAndUnit(String val) { + Matcher matcher = RESOURCE_VALUE_AND_UNIT_PATTERN.matcher(val); + if (!matcher.find()) { + throw new RuntimeException("Invalid pattern of resource descriptor: " + + val); + } else if (matcher.groupCount() != 2) { + throw new RuntimeException("Capturing group count in string " + + val + " is not 2!"); + } + long value = Long.parseLong(matcher.group(1)); + + return new ResourceValueAndUnit(value, matcher.group(2)); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 0de834c43f..e06b55e4c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -183,7 +183,7 @@ public static ContainerId newContainerId(RecordFactory recordFactory, public static NodeId newNodeId(String host, int port) { return NodeId.newInstance(host, port); } - + public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState, String httpAddress, String rackName, Resource used, Resource capability, int numContainers, String healthReport, long lastHealthReportTime) { @@ -422,7 +422,7 @@ public static ApplicationReport newApplicationReport( report.setPriority(priority); return report; } - + public static ApplicationSubmissionContext newApplicationSubmissionContext( ApplicationId applicationId, String applicationName, String queue, Priority priority, ContainerLaunchContext amContainer, @@ -477,6 +477,10 @@ public static Resource newResource(long memory, int vCores) { return resource; } + public static Resource newEmptyResource() { + return recordFactory.newRecordInstance(Resource.class); + } + public static URL newURL(String scheme, String host, int port, String file) { URL url = recordFactory.newRecordInstance(URL.class); url.setScheme(scheme); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index c0d7d86ff8..9b3c20a0e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; @@ -283,24 +284,10 @@ public static void normalizeAndvalidateRequest(ResourceRequest resReq, private static void validateResourceRequest(ResourceRequest resReq, Resource maximumResource, QueueInfo queueInfo, RMContext rmContext) throws InvalidResourceRequestException { - Resource requestedResource = resReq.getCapability(); - for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) { - ResourceInformation reqRI = requestedResource.getResourceInformation(i); - ResourceInformation maxRI = maximumResource.getResourceInformation(i); - if (reqRI.getValue() < 0 || reqRI.getValue() > maxRI.getValue()) { - throw new InvalidResourceRequestException( - "Invalid resource request, requested resource type=[" + reqRI - .getName() - + "] < 0 or greater than maximum allowed allocation. Requested " - + "resource=" + requestedResource - + ", maximum allowed allocation=" + maximumResource - + ", please note that maximum allowed allocation is calculated " - + "by scheduler based on maximum resource of registered " - + "NodeManagers, which might be less than configured " - + "maximum allocation=" + ResourceUtils - .getResourceTypesMaximumAllocation()); - } - } + final Resource requestedResource = resReq.getCapability(); + checkResourceRequestAgainstAvailableResource(requestedResource, + maximumResource); + String labelExp = resReq.getNodeLabelExpression(); // we don't allow specify label expression other than resourceName=ANY now if (!ResourceRequest.ANY.equals(resReq.getResourceName()) @@ -338,6 +325,78 @@ private static void validateResourceRequest(ResourceRequest resReq, } } + @Private + @VisibleForTesting + static void checkResourceRequestAgainstAvailableResource(Resource reqResource, + Resource availableResource) throws InvalidResourceRequestException { + for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) { + final ResourceInformation requestedRI = + reqResource.getResourceInformation(i); + final String reqResourceName = requestedRI.getName(); + + if (requestedRI.getValue() < 0) { + throwInvalidResourceException(reqResource, availableResource, + reqResourceName); + } + + final ResourceInformation availableRI = + availableResource.getResourceInformation(reqResourceName); + + long requestedResourceValue = requestedRI.getValue(); + long availableResourceValue = availableRI.getValue(); + int unitsRelation = UnitsConversionUtil + .compareUnits(requestedRI.getUnits(), availableRI.getUnits()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Requested resource information: " + requestedRI); + LOG.debug("Available resource information: " + availableRI); + LOG.debug("Relation of units: " + unitsRelation); + } + + // requested resource unit is less than available resource unit + // e.g. requestedUnit: "m", availableUnit: "K") + if (unitsRelation < 0) { + availableResourceValue = + UnitsConversionUtil.convert(availableRI.getUnits(), + requestedRI.getUnits(), availableRI.getValue()); + + // requested resource unit is greater than available resource unit + // e.g. requestedUnit: "G", availableUnit: "M") + } else if (unitsRelation > 0) { + requestedResourceValue = + UnitsConversionUtil.convert(requestedRI.getUnits(), + availableRI.getUnits(), requestedRI.getValue()); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Requested resource value after conversion: " + + requestedResourceValue); + LOG.info("Available resource value after conversion: " + + availableResourceValue); + } + + if (requestedResourceValue > availableResourceValue) { + throwInvalidResourceException(reqResource, availableResource, + reqResourceName); + } + } + } + + private static void throwInvalidResourceException(Resource reqResource, + Resource availableResource, String reqResourceName) + throws InvalidResourceRequestException { + throw new InvalidResourceRequestException( + "Invalid resource request, requested resource type=[" + reqResourceName + + "] < 0 or greater than maximum allowed allocation. Requested " + + "resource=" + reqResource + ", maximum allowed allocation=" + + availableResource + + ", please note that maximum allowed allocation is calculated " + + "by scheduler based on maximum resource of registered " + + "NodeManagers, which might be less than configured " + + "maximum allocation=" + + ResourceUtils.getResourceTypesMaximumAllocation()); + } + private static void checkQueueLabelInLabelManager(String labelExpression, RMContext rmContext) throws InvalidLabelResourceRequestException { // check node label manager contains this label diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 90e4be83d0..9696741f1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -22,9 +22,13 @@ import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; + +import static org.junit.Assert.fail; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -61,6 +65,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -75,6 +80,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair + .FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; @@ -365,7 +373,7 @@ public void testInvalidContainerReleaseRequest() throws Exception { am2.addContainerToBeReleased(cId); try { am2.schedule(); - Assert.fail("Exception was expected!!"); + fail("Exception was expected!!"); } catch (InvalidContainerReleaseException e) { StringBuilder sb = new StringBuilder("Cannot release container : "); sb.append(cId.toString()); @@ -460,7 +468,7 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { FinalApplicationStatus.FAILED, "", ""); try { am1.unregisterAppAttempt(req, false); - Assert.fail("ApplicationMasterNotRegisteredException should be thrown"); + fail("ApplicationMasterNotRegisteredException should be thrown"); } catch (ApplicationMasterNotRegisteredException e) { Assert.assertNotNull(e); Assert.assertNotNull(e.getMessage()); @@ -468,7 +476,7 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { "Application Master is trying to unregister before registering for:" )); } catch (Exception e) { - Assert.fail("ApplicationMasterNotRegisteredException should be thrown"); + fail("ApplicationMasterNotRegisteredException should be thrown"); } am1.registerAppAttempt(); @@ -627,9 +635,7 @@ public void testInvalidIncreaseDecreaseRequest() throws Exception { Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", response.getUpdateErrors().get(0).getReason()); } finally { - if (rm != null) { - rm.close(); - } + rm.close(); } } @@ -709,34 +715,48 @@ private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedul ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); - CapacitySchedulerConfiguration csconf = - new CapacitySchedulerConfiguration(); - csconf.setResourceComparator(DominantResourceCalculator.class); + final YarnConfiguration yarnConf; + if (schedulerCls.getCanonicalName() + .equals(CapacityScheduler.class.getCanonicalName())) { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setResourceComparator(DominantResourceCalculator.class); + yarnConf = new YarnConfiguration(csConf); + } else if (schedulerCls.getCanonicalName() + .equals(FairScheduler.class.getCanonicalName())) { + FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration(); + yarnConf = new YarnConfiguration(fsConf); + } else { + throw new IllegalStateException( + "Scheduler class is of wrong type: " + schedulerCls); + } - YarnConfiguration conf = new YarnConfiguration(csconf); // Don't reset resource types since we have already configured resource // types - conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false); - conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerCls, + yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, + false); + yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerCls, ResourceScheduler.class); - conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false); + yarnConf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false); - MockRM rm = new MockRM(conf); + MockRM rm = new MockRM(yarnConf); rm.start(); MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, null)); - RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); // Now request resource, memory > allowed boolean exception = false; try { - am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( - Resource.newInstance(9 * GB, 1)).numContainers(1).resourceName("*") - .build()), null); + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(Resource.newInstance(9 * GB, 1)) + .numContainers(1) + .resourceName("*") + .build()), null); } catch (InvalidResourceRequestException e) { exception = true; } @@ -744,10 +764,12 @@ private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedul exception = false; try { - // Now request resource, vcore > allowed - am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( - Resource.newInstance(8 * GB, 18)).numContainers(1).resourceName("*") - .build()), null); + // Now request resource, vcores > allowed + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(Resource.newInstance(8 * GB, 18)) + .numContainers(1) + .resourceName("*") + .build()), null); } catch (InvalidResourceRequestException e) { exception = true; } @@ -756,6 +778,73 @@ private void testValidateRequestCapacityAgainstMinMaxAllocation(Class schedul rm.close(); } + @Test + public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits() + throws Exception { + + // Initialize resource map for 2 types. + Map riMap = new HashMap<>(); + + // Initialize mandatory resources + ResourceInformation memory = + ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = + ResourceInformation.newInstance(ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + ResourceInformation res1 = + ResourceInformation.newInstance("res_1", "G", 0, 4); + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + riMap.put("res_1", res1); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + FairSchedulerConfiguration fsConf = + new FairSchedulerConfiguration(); + + YarnConfiguration yarnConf = new YarnConfiguration(fsConf); + // Don't reset resource types since we have already configured resource + // types + yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, + false); + yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + yarnConf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false); + + MockRM rm = new MockRM(yarnConf); + rm.start(); + + MockNM nm1 = rm.registerNode("199.99.99.1:1234", + ResourceTypesTestHelper.newResource( + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, + ImmutableMap. builder() + .put("res_1", "5G").build())); + + RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // Now request res_1, 500M < 5G so it should be allowed + try { + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(ResourceTypesTestHelper.newResource(4 * GB, 1, + ImmutableMap. builder() + .put("res_1", "500M") + .build())) + .numContainers(1).resourceName("*").build()), null); + } catch (InvalidResourceRequestException e) { + fail( + "Allocate request should be accepted but exception was thrown: " + e); + } + + rm.close(); + } + @Test(timeout = 300000) public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypes() throws Exception { @@ -774,11 +863,11 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceType ResourceInformation.VCORES.getUnits(), YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); - ResourceInformation res_1 = ResourceInformation.newInstance("res_1", + ResourceInformation res1 = ResourceInformation.newInstance("res_1", ResourceInformation.VCORES.getUnits(), 0, 4); riMap.put(ResourceInformation.MEMORY_URI, memory); riMap.put(ResourceInformation.VCORES_URI, vcores); - riMap.put("res_1", res_1); + riMap.put("res_1", res1); ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); @@ -786,15 +875,16 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceType new CapacitySchedulerConfiguration(); csconf.setResourceComparator(DominantResourceCalculator.class); - YarnConfiguration conf = new YarnConfiguration(csconf); + YarnConfiguration yarnConf = new YarnConfiguration(csconf); // Don't reset resource types since we have already configured resource // types - conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, + false); + yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false); + yarnConf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false); - MockRM rm = new MockRM(conf); + MockRM rm = new MockRM(yarnConf); rm.start(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); @@ -805,18 +895,21 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceType DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, ImmutableMap.of("res_1", 4))); - RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + RMApp app1 = rm.submitApp(GB, "app", "user", null, "default"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); - Assert.assertEquals(Resource.newInstance(1 * GB, 1), + Assert.assertEquals(Resource.newInstance(GB, 1), leafQueue.getUsedResources()); // Now request resource, memory > allowed boolean exception = false; try { - am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( - TestUtils.createResource(9 * GB, 1, ImmutableMap.of("res_1", 1))) - .numContainers(1).resourceName("*").build()), null); + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(TestUtils.createResource(9 * GB, 1, + ImmutableMap.of("res_1", 1))) + .numContainers(1) + .resourceName("*") + .build()), null); } catch (InvalidResourceRequestException e) { exception = true; } @@ -824,11 +917,13 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceType exception = false; try { - // Now request resource, vcore > allowed - am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( - TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1))) - .numContainers(1).resourceName("*") - .build()), null); + // Now request resource, vcores > allowed + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability( + TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1))) + .numContainers(1) + .resourceName("*") + .build()), null); } catch (InvalidResourceRequestException e) { exception = true; } @@ -837,10 +932,12 @@ public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceType exception = false; try { // Now request resource, res_1 > allowed - am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( - TestUtils.createResource(8 * GB, 1, ImmutableMap.of("res_1", 100))) - .numContainers(1).resourceName("*") - .build()), null); + am1.allocate(Collections.singletonList(ResourceRequest.newBuilder() + .capability(TestUtils.createResource(8 * GB, 1, + ImmutableMap.of("res_1", 100))) + .numContainers(1) + .resourceName("*") + .build()), null); } catch (InvalidResourceRequestException e) { exception = true; } @@ -856,7 +953,7 @@ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { rmContainer.handle( new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); } else { - Assert.fail("Cannot find RMContainer"); + fail("Cannot find RMContainer"); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index cb1f794190..15cfdb01e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -26,7 +26,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.Arrays; @@ -35,6 +37,7 @@ import java.util.Map; import java.util.Set; +import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -42,6 +45,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; @@ -63,8 +67,10 @@ import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; @@ -83,20 +89,79 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import org.junit.rules.ExpectedException; public class TestSchedulerUtils { private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class); - + private static Resource configuredMaxAllocation; + + private static class CustomResourceTypesConfigurationProvider + extends LocalConfigurationProvider { + + @Override + public InputStream getConfigurationInputStream(Configuration bootstrapConf, + String name) throws YarnException, IOException { + if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) { + return new ByteArrayInputStream( + ("\n" + + " \n" + + " yarn.resource-types\n" + + " custom-resource-1," + + "custom-resource-2,custom-resource-3\n" + + " \n" + + " \n" + + " yarn.resource-types" + + ".custom-resource-1.units\n" + + " G\n" + + " \n" + + " \n" + + " yarn.resource-types" + + ".custom-resource-2.units\n" + + " G\n" + + " \n" + + "\n").getBytes()); + } else { + return super.getConfigurationInputStream(bootstrapConf, name); + } + } + } private RMContext rmContext = getMockRMContext(); + private static YarnConfiguration conf = new YarnConfiguration(); + @Rule + public ExpectedException exception = ExpectedException.none(); + + private void initResourceTypes() { + Configuration yarnConf = new Configuration(); + yarnConf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + CustomResourceTypesConfigurationProvider.class.getName()); + ResourceUtils.resetResourceTypes(yarnConf); + } + + @Before + public void setUp() { + initResourceTypes(); + //this needs to be initialized after initResourceTypes is called + configuredMaxAllocation = Resource.newInstance(8192, 4, + ImmutableMap.builder() + .put("custom-resource-1", Long.MAX_VALUE) + .put("custom-resource-2", Long.MAX_VALUE) + .put("custom-resource-3", Long.MAX_VALUE) + .build()); + } + @Test (timeout = 30000) public void testNormalizeRequest() { ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); @@ -150,16 +215,18 @@ public void testNormalizeRequest() { // multiple of minMemory > maxMemory, then reduce to maxMemory SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, maxResource); - assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize()); + assertEquals(maxResource.getMemorySize(), + ask.getCapability().getMemorySize()); // ask is more than max maxResource = Resources.createResource(maxMemory, 0); ask.setCapability(Resources.createResource(maxMemory + 100)); SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, maxResource); - assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize()); + assertEquals(maxResource.getMemorySize(), + ask.getCapability().getMemorySize()); } - + @Test (timeout = 30000) public void testNormalizeRequestWithDominantResourceCalculator() { ResourceCalculator resourceCalculator = new DominantResourceCalculator(); @@ -201,10 +268,11 @@ public void testValidateResourceRequestWithErrorLabelsPermission() Set queueAccessibleNodeLabels = Sets.newHashSet(); QueueInfo queueInfo = mock(QueueInfo.class); when(queueInfo.getQueueName()).thenReturn("queue"); - when(queueInfo.getAccessibleNodeLabels()).thenReturn(queueAccessibleNodeLabels); + when(queueInfo.getAccessibleNodeLabels()) + .thenReturn(queueAccessibleNodeLabels); when(scheduler.getQueueInfo(any(String.class), anyBoolean(), anyBoolean())) .thenReturn(queueInfo); - + Resource maxResource = Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); @@ -363,7 +431,7 @@ public void testValidateResourceRequestWithErrorLabelsPermission() rmContext.getNodeLabelManager().removeFromClusterNodeLabels( Arrays.asList("x")); } - Assert.assertTrue("InvalidLabelResourceRequestException excpeted", + Assert.assertTrue("InvalidLabelResourceRequestException expected", invalidlabelexception); // queue is "*", always succeeded try { @@ -610,11 +678,9 @@ public void testValidateResourceRequest() { // more than max vcores try { - Resource resource = - Resources - .createResource( - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1); + Resource resource = Resources.createResource( + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1); ResourceRequest resReq = BuilderUtils.newResourceRequest(mock(Priority.class), ResourceRequest.ANY, resource, 1); @@ -648,10 +714,10 @@ public void testValidateResourceBlacklistRequest() throws Exception { waitForLaunchedState(attempt); // Create a client to the RM. - final Configuration conf = rm.getConfig(); - final YarnRPC rpc = YarnRPC.create(conf); + final Configuration yarnConf = rm.getConfig(); + final YarnRPC rpc = YarnRPC.create(yarnConf); - UserGroupInformation currentUser = + UserGroupInformation currentUser = UserGroupInformation.createRemoteUser(applicationAttemptId.toString()); Credentials credentials = containerManager.getContainerCredentials(); final InetSocketAddress rmBindAddress = @@ -665,7 +731,7 @@ public void testValidateResourceBlacklistRequest() throws Exception { @Override public ApplicationMasterProtocol run() { return (ApplicationMasterProtocol) rpc.getProxy( - ApplicationMasterProtocol.class, rmBindAddress, conf); + ApplicationMasterProtocol.class, rmBindAddress, yarnConf); } }); @@ -775,6 +841,127 @@ public void testNormalizeNodeLabelExpression() } } + @Test + public void testCustomResourceRequestedUnitIsSmallerThanAvailableUnit() + throws InvalidResourceRequestException { + Resource requestedResource = + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "11")); + + Resource availableResource = + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "0G")); + + exception.expect(InvalidResourceRequestException.class); + exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator + .create().withRequestedResourceType("custom-resource-1") + .withRequestedResource(requestedResource) + .withAvailableAllocation(availableResource) + .withMaxAllocation(configuredMaxAllocation).build()); + + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } + + @Test + public void testCustomResourceRequestedUnitIsSmallerThanAvailableUnit2() { + Resource requestedResource = + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "11")); + + Resource availableResource = + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "1G")); + + try { + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } catch (InvalidResourceRequestException e) { + fail(String.format( + "Resource request should be accepted. Requested: %s, available: %s", + requestedResource, availableResource)); + } + } + + @Test + public void testCustomResourceRequestedUnitIsGreaterThanAvailableUnit() + throws InvalidResourceRequestException { + Resource requestedResource = + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "1M")); + + Resource availableResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap. builder().put("custom-resource-1", "120k") + .build()); + + exception.expect(InvalidResourceRequestException.class); + exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator + .create().withRequestedResourceType("custom-resource-1") + .withRequestedResource(requestedResource) + .withAvailableAllocation(availableResource) + .withMaxAllocation(configuredMaxAllocation).build()); + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } + + @Test + public void testCustomResourceRequestedUnitIsGreaterThanAvailableUnit2() { + Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap. builder().put("custom-resource-1", "11M") + .build()); + + Resource availableResource = + ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "1G")); + + try { + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } catch (InvalidResourceRequestException e) { + fail(String.format( + "Resource request should be accepted. Requested: %s, available: %s", + requestedResource, availableResource)); + } + } + + @Test + public void testCustomResourceRequestedUnitIsSameAsAvailableUnit() { + Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "11M")); + + Resource availableResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "100M")); + + try { + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } catch (InvalidResourceRequestException e) { + fail(String.format( + "Resource request should be accepted. Requested: %s, available: %s", + requestedResource, availableResource)); + } + } + + @Test + public void testCustomResourceRequestedUnitIsSameAsAvailableUnit2() + throws InvalidResourceRequestException { + Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "110M")); + + Resource availableResource = ResourceTypesTestHelper.newResource(1, 1, + ImmutableMap.of("custom-resource-1", "100M")); + + exception.expect(InvalidResourceRequestException.class); + exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator + .create().withRequestedResourceType("custom-resource-1") + .withRequestedResource(requestedResource) + .withAvailableAllocation(availableResource) + .withMaxAllocation(configuredMaxAllocation).build()); + + SchedulerUtils.checkResourceRequestAgainstAvailableResource( + requestedResource, availableResource); + } + public static void waitSchedulerApplicationAttemptStopped( AbstractYarnScheduler ys, ApplicationAttemptId attemptId) throws InterruptedException { @@ -801,8 +988,7 @@ public static void waitSchedulerApplicationAttemptStopped( public static SchedulerApplication verifyAppAddedAndRemovedFromScheduler( Map> applications, - EventHandler handler, String queueName) - throws Exception { + EventHandler handler, String queueName) { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); @@ -832,4 +1018,60 @@ private static RMContext getMockRMContext() { when(rmContext.getNodeLabelManager()).thenReturn(nlm); return rmContext; } + + private static class InvalidResourceRequestExceptionMessageGenerator { + + private StringBuilder sb; + private Resource requestedResource; + private Resource availableAllocation; + private Resource configuredMaxAllowedAllocation; + private String resourceType; + + InvalidResourceRequestExceptionMessageGenerator(StringBuilder sb) { + this.sb = sb; + } + + public static InvalidResourceRequestExceptionMessageGenerator create() { + return new InvalidResourceRequestExceptionMessageGenerator( + new StringBuilder()); + } + + InvalidResourceRequestExceptionMessageGenerator withRequestedResource( + Resource r) { + this.requestedResource = r; + return this; + } + + InvalidResourceRequestExceptionMessageGenerator withRequestedResourceType( + String rt) { + this.resourceType = rt; + return this; + } + + InvalidResourceRequestExceptionMessageGenerator withAvailableAllocation( + Resource r) { + this.availableAllocation = r; + return this; + } + + InvalidResourceRequestExceptionMessageGenerator withMaxAllocation( + Resource r) { + this.configuredMaxAllowedAllocation = r; + return this; + } + + public String build() { + return sb + .append("Invalid resource request, requested resource type=[") + .append(resourceType).append("]") + .append(" < 0 or greater than maximum allowed allocation. ") + .append("Requested resource=").append(requestedResource).append(", ") + .append("maximum allowed allocation=").append(availableAllocation) + .append(", please note that maximum allowed allocation is calculated " + + "by scheduler based on maximum resource of " + + "registered NodeManagers, which might be less than " + + "configured maximum allocation=") + .append(configuredMaxAllowedAllocation).toString(); + } + } }