YARN-3974. Refactor the reservation system test cases to use parameterized base test. (subru via curino)

This commit is contained in:
Carlo Curino 2015-08-02 01:51:38 -07:00
parent f4c523b69b
commit 8572a5a14b
11 changed files with 338 additions and 401 deletions

View File

@ -377,6 +377,9 @@ Release 2.8.0 - UNRELEASED
YARN-4019. Add JvmPauseMonitor to ResourceManager and NodeManager. (Robert Kanter
via junping_du)
YARN-3974. Refactor the reservation system test cases to use parameterized
base test. (subru via curino)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
/**
* A Plan represents the central data structure of a reservation system that
@ -28,7 +27,7 @@
* previously accepted will be honored.
*
* {@link ReservationDefinition} submitted by the users through the RM public
* APIs are passed to appropriate {@link ReservationAgent}s, which in turn will
* APIs are passed to appropriate {@code ReservationAgent}s, which in turn will
* consult the Plan (via the {@link PlanView} interface) and try to determine
* whether there are sufficient resources available in this Plan to satisfy the
* temporal and resource constraints of a {@link ReservationDefinition}. If a

View File

@ -17,15 +17,14 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import java.util.Set;
/**
* This interface provides a read-only view on the allocations made in this
* plan. This methods are used for example by {@link ReservationAgent}s to
* plan. This methods are used for example by {@code ReservationAgent}s to
* determine the free resources in a certain point in time, and by
* PlanFollowerPolicy to publish this plan to the scheduler.
*/
@ -66,7 +65,7 @@ public interface PlanView extends PlanContext {
* @return the total {@link Resource} reserved for all users at the specified
* time
*/
public Resource getTotalCommittedResources(long tick);
Resource getTotalCommittedResources(long tick);
/**
* Returns the total {@link Resource} reserved for a given user at the
@ -88,7 +87,7 @@ public interface PlanView extends PlanContext {
* @return the overall capacity in terms of {@link Resource} assigned to this
* plan
*/
public Resource getTotalCapacity();
Resource getTotalCapacity();
/**
* Gets the time (UTC in ms) at which the first reservation starts

View File

@ -21,8 +21,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner;
public abstract class ReservationSchedulerConfiguration extends Configuration {
@ -64,7 +62,7 @@ public ReservationSchedulerConfiguration(
/**
* Checks if the queue participates in reservation based scheduling
* @param queue
* @param queue name of the queue
* @return true if the queue participates in reservation based scheduling
*/
public abstract boolean isReservable(String queue);
@ -110,10 +108,10 @@ public String getReservationAdmissionPolicy(String queue) {
}
/**
* Gets the name of the {@link ReservationAgent} class associated with the
* Gets the name of the {@code ReservationAgent} class associated with the
* queue
* @param queue name of the queue
* @return the class name of the {@link ReservationAgent}
* @return the class name of the {@code ReservationAgent}
*/
public String getReservationAgent(String queue) {
return DEFAULT_RESERVATION_AGENT_NAME;
@ -129,10 +127,10 @@ public boolean getShowReservationAsQueues(String queuePath) {
}
/**
* Gets the name of the {@link Planner} class associated with the
* Gets the name of the {@code Planner} class associated with the
* queue
* @param queue name of the queue
* @return the class name of the {@link Planner}
* @return the class name of the {@code Planner}
*/
public String getReplanner(String queue) {
return DEFAULT_RESERVATION_PLANNER_NAME;
@ -150,7 +148,7 @@ public boolean getMoveOnExpiry(String queue) {
}
/**
* Gets the time in milliseconds for which the {@link Planner} will verify
* Gets the time in milliseconds for which the {@code Planner} will verify
* the {@link Plan}s satisfy the constraints
* @param queue name of the queue
* @return the time in milliseconds for which to check constraints

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@ -28,16 +26,15 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent;
import java.util.Map;
/**
* This interface is the one implemented by any system that wants to support
* Reservations i.e. make {@link Resource} allocations in future. Implementors
* Reservations i.e. make {@code Resource} allocations in future. Implementors
* need to bootstrap all configured {@link Plan}s in the active
* {@link ResourceScheduler} along with their corresponding
* {@link ReservationAgent} and {@link SharingPolicy}. It is also responsible
* {@code ReservationAgent} and {@link SharingPolicy}. It is also responsible
* for managing the {@link PlanFollower} to ensure the {@link Plan}s are in sync
* with the {@link ResourceScheduler}.
*/
@ -49,7 +46,7 @@ public interface ReservationSystem {
* Set RMContext for {@link ReservationSystem}. This method should be called
* immediately after instantiating a reservation system once.
*
* @param rmContext created by {@link ResourceManager}
* @param rmContext created by {@code ResourceManager}
*/
void setRMContext(RMContext rmContext);
@ -57,7 +54,7 @@ public interface ReservationSystem {
* Re-initialize the {@link ReservationSystem}.
*
* @param conf configuration
* @param rmContext current context of the {@link ResourceManager}
* @param rmContext current context of the {@code ResourceManager}
* @throws YarnException
*/
void reinitialize(Configuration conf, RMContext rmContext)

View File

@ -1,19 +1,19 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
@ -73,11 +73,11 @@ public static ReservationId getNewReservationId() {
public static ReservationSchedulerConfiguration createConf(
String reservationQ, long timeWindow, float instConstraint,
float avgConstraint) {
ReservationSchedulerConfiguration conf = mock
(ReservationSchedulerConfiguration.class);
ReservationSchedulerConfiguration conf =
mock(ReservationSchedulerConfiguration.class);
when(conf.getReservationWindow(reservationQ)).thenReturn(timeWindow);
when(conf.getInstantaneousMaxCapacity(reservationQ)).thenReturn
(instConstraint);
when(conf.getInstantaneousMaxCapacity(reservationQ))
.thenReturn(instConstraint);
when(conf.getAverageCapacity(reservationQ)).thenReturn(avgConstraint);
return conf;
}
@ -91,21 +91,8 @@ public static void validateReservationQueue(
Assert.assertEquals(8192, plan.getTotalCapacity().getMemory());
Assert.assertTrue(
plan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
Assert.assertTrue(
plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
}
public static void validateNewReservationQueue(
AbstractReservationSystem reservationSystem, String newQ) {
Plan newPlan = reservationSystem.getPlan(newQ);
Assert.assertNotNull(newPlan);
Assert.assertTrue(newPlan instanceof InMemoryPlan);
Assert.assertEquals(newQ, newPlan.getQueueName());
Assert.assertEquals(1024, newPlan.getTotalCapacity().getMemory());
Assert
.assertTrue(newPlan.getReservationAgent() instanceof AlignedPlannerWithGreedy);
Assert
.assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy);
.assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
}
public static void setupFSAllocationFile(String allocationFile)
@ -129,7 +116,8 @@ public static void setupFSAllocationFile(String allocationFile)
out.println("<reservation></reservation>");
out.println("<weight>8</weight>");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println(
"<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
}
@ -153,21 +141,20 @@ public static void updateFSAllocationFile(String allocationFile)
out.println("</queue>");
out.println("<queue name=\"dedicated\">");
out.println("<reservation></reservation>");
out.println("<weight>80</weight>");
out.println("<weight>10</weight>");
out.println("</queue>");
out.println("<queue name=\"reservation\">");
out.println("<reservation></reservation>");
out.println("<weight>10</weight>");
out.println("<weight>80</weight>");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println(
"<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
}
public static FairScheduler setupFairScheduler(
ReservationSystemTestUtil testUtil,
RMContext rmContext, Configuration conf, int numContainers) throws
IOException {
public static FairScheduler setupFairScheduler(RMContext rmContext,
Configuration conf, int numContainers) throws IOException {
FairScheduler scheduler = new FairScheduler();
scheduler.setRMContext(rmContext);
@ -178,7 +165,8 @@ public static FairScheduler setupFairScheduler(
scheduler.reinitialize(conf, rmContext);
Resource resource = testUtil.calculateClusterResource(numContainers);
Resource resource =
ReservationSystemTestUtil.calculateClusterResource(numContainers);
RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
@ -224,8 +212,9 @@ public CapacityScheduler mockCapacityScheduler(int numContainers)
return cs;
}
public static void initializeRMContext(int numContainers,
AbstractYarnScheduler scheduler, RMContext mockRMContext) {
@SuppressWarnings("rawtypes") public static void initializeRMContext(
int numContainers, AbstractYarnScheduler scheduler,
RMContext mockRMContext) {
when(mockRMContext.getScheduler()).thenReturn(scheduler);
Resource r = calculateClusterResource(numContainers);
@ -233,18 +222,17 @@ public static void initializeRMContext(int numContainers,
}
public static RMContext createRMContext(Configuration conf) {
RMContext mockRmContext =
Mockito.spy(new RMContextImpl(null, null, null, null, null, null,
RMContext mockRmContext = Mockito.spy(
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null));
RMNodeLabelsManager nlm = mock(RMNodeLabelsManager.class);
when(
nlm.getQueueResource(any(String.class), anySetOf(String.class),
when(nlm.getQueueResource(any(String.class), anySetOf(String.class),
any(Resource.class))).thenAnswer(new Answer<Resource>() {
@Override
public Resource answer(InvocationOnMock invocation) throws Throwable {
@Override public Resource answer(InvocationOnMock invocation)
throws Throwable {
Object[] args = invocation.getArguments();
return (Resource) args[2];
}
@ -252,8 +240,8 @@ public Resource answer(InvocationOnMock invocation) throws Throwable {
when(nlm.getResourceByLabel(any(String.class), any(Resource.class)))
.thenAnswer(new Answer<Resource>() {
@Override
public Resource answer(InvocationOnMock invocation) throws Throwable {
@Override public Resource answer(InvocationOnMock invocation)
throws Throwable {
Object[] args = invocation.getArguments();
return (Resource) args[1];
}
@ -263,21 +251,22 @@ public Resource answer(InvocationOnMock invocation) throws Throwable {
return mockRmContext;
}
public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
public static void setupQueueConfiguration(
CapacitySchedulerConfiguration conf) {
// Define default queue
final String defQ = CapacitySchedulerConfiguration.ROOT + ".default";
conf.setCapacity(defQ, 10);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
"default", "a", reservationQ });
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "default", "a", reservationQ });
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacity(A, 10);
final String dedicated =
CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT + reservationQ;
CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT
+ reservationQ;
conf.setCapacity(dedicated, 80);
// Set as reservation queue
conf.setReservable(dedicated, true);
@ -290,44 +279,43 @@ public static void setupQueueConfiguration(CapacitySchedulerConfiguration conf)
conf.setCapacity(A2, 70);
}
public String getFullReservationQueueName() {
public static String getFullReservationQueueName() {
return CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT + reservationQ;
}
public String getreservationQueueName() {
public static String getReservationQueueName() {
return reservationQ;
}
public void updateQueueConfiguration(CapacitySchedulerConfiguration conf,
String newQ) {
public static void updateQueueConfiguration(
CapacitySchedulerConfiguration conf, String newQ) {
// Define default queue
final String prefix =
CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT;
final String prefix = CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT;
final String defQ = prefix + "default";
conf.setCapacity(defQ, 5);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {
"default", "a", reservationQ, newQ });
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "default", "a", reservationQ, newQ });
final String A = prefix + "a";
conf.setCapacity(A, 5);
final String dedicated = prefix + reservationQ;
conf.setCapacity(dedicated, 80);
conf.setCapacity(dedicated, 10);
// Set as reservation queue
conf.setReservable(dedicated, true);
conf.setCapacity(prefix + newQ, 10);
conf.setCapacity(prefix + newQ, 80);
// Set as reservation queue
conf.setReservable(prefix + newQ, true);
// Define 2nd-level queues
final String A1 = A + ".a1";
final String A2 = A + ".a2";
conf.setQueues(A, new String[]{"a1", "a2"});
conf.setQueues(A, new String[] { "a1", "a2" });
conf.setCapacity(A1, 30);
conf.setCapacity(A2, 70);
}
@ -349,9 +337,8 @@ public static ReservationDefinition generateRandomRR(Random rand, long i) {
int gang = 1 + rand.nextInt(9);
int par = (rand.nextInt(1000) + 1) * gang;
long dur = rand.nextInt(2 * 3600 * 1000); // random duration within 2h
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
gang, dur);
ReservationRequest r = ReservationRequest
.newInstance(Resource.newInstance(1024, 1), par, gang, dur);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
rand.nextInt(3);
@ -364,53 +351,19 @@ public static ReservationDefinition generateRandomRR(Random rand, long i) {
}
public static ReservationDefinition generateBigRR(Random rand, long i) {
rand.setSeed(i);
long now = System.currentTimeMillis();
// start time at random in the next 2 hours
long arrival = rand.nextInt(2 * 3600 * 1000);
// deadline at random in the next day
long deadline = rand.nextInt(24 * 3600 * 1000);
// create a request with a single atomic ask
ReservationDefinition rr = new ReservationDefinitionPBImpl();
rr.setArrival(now + arrival);
rr.setDeadline(now + deadline);
int gang = 1;
int par = 100000; // 100k tasks
long dur = rand.nextInt(60 * 1000); // 1min tasks
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1), par,
gang, dur);
ReservationRequests reqs = new ReservationRequestsPBImpl();
reqs.setReservationResources(Collections.singletonList(r));
rand.nextInt(3);
ReservationRequestInterpreter[] type =
ReservationRequestInterpreter.values();
reqs.setInterpreter(type[rand.nextInt(type.length)]);
rr.setReservationRequests(reqs);
return rr;
}
public static Map<ReservationInterval, Resource> generateAllocation(
long startTime, long step, int[] alloc) {
Map<ReservationInterval, Resource> req =
new TreeMap<ReservationInterval, Resource>();
Map<ReservationInterval, Resource> req = new TreeMap<>();
for (int i = 0; i < alloc.length; i++) {
req.put(new ReservationInterval(startTime + i * step, startTime + (i + 1)
* step), ReservationSystemUtil.toResource(ReservationRequest
.newInstance(
Resource.newInstance(1024, 1), alloc[i])));
req.put(new ReservationInterval(startTime + i * step,
startTime + (i + 1) * step), ReservationSystemUtil.toResource(
ReservationRequest
.newInstance(Resource.newInstance(1024, 1), alloc[i])));
}
return req;
}
public static Resource calculateClusterResource(int numContainers) {
Resource clusterResource = Resource.newInstance(numContainers * 1024,
numContainers);
return clusterResource;
return Resource.newInstance(numContainers * 1024, numContainers);
}
}

View File

@ -1,94 +0,0 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.io.IOException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.Assert;
import org.junit.Test;
public class TestCapacityReservationSystem {
@Test
public void testInitialize() {
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler capScheduler = null;
try {
capScheduler = testUtil.mockCapacityScheduler(10);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
CapacityReservationSystem reservationSystem =
new CapacityReservationSystem();
reservationSystem.setRMContext(capScheduler.getRMContext());
try {
reservationSystem.reinitialize(capScheduler.getConf(),
capScheduler.getRMContext());
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
String planQName = testUtil.getreservationQueueName();
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
planQName);
}
@Test
public void testReinitialize() {
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
CapacityScheduler capScheduler = null;
try {
capScheduler = testUtil.mockCapacityScheduler(10);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
CapacityReservationSystem reservationSystem =
new CapacityReservationSystem();
CapacitySchedulerConfiguration conf = capScheduler.getConfiguration();
RMContext mockContext = capScheduler.getRMContext();
reservationSystem.setRMContext(mockContext);
try {
reservationSystem.reinitialize(capScheduler.getConfiguration(),
mockContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
// Assert queue in original config
String planQName = testUtil.getreservationQueueName();
ReservationSystemTestUtil.validateReservationQueue(reservationSystem, planQName);
// Dynamically add a plan
String newQ = "reservation";
Assert.assertNull(reservationSystem.getPlan(newQ));
testUtil.updateQueueConfiguration(conf, newQ);
try {
capScheduler.reinitialize(conf, mockContext);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
try {
reservationSystem.reinitialize(conf, mockContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
ReservationSystemTestUtil.validateNewReservationQueue(reservationSystem, newQ);
}
}

View File

@ -57,7 +57,8 @@
import org.mockito.Matchers;
import org.mockito.Mockito;
public class TestCapacitySchedulerPlanFollower extends TestSchedulerPlanFollowerBase {
public class TestCapacitySchedulerPlanFollower extends
TestSchedulerPlanFollowerBase {
private RMContext rmContext;
private RMContext spyRMContext;
@ -116,11 +117,11 @@ public void setUp() throws Exception {
}
private void setupPlanFollower() throws Exception {
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
mClock = mock(Clock.class);
mAgent = mock(ReservationAgent.class);
String reservationQ = testUtil.getFullReservationQueueName();
String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName();
CapacitySchedulerConfiguration csConf = cs.getConfiguration();
csConf.setReservationWindow(reservationQ, 20L);
csConf.setMaximumCapacity(reservationQ, 40);
@ -144,7 +145,7 @@ public void testWithKillOnExpiry() throws PlanningException,
@Override
protected void verifyCapacity(Queue defQ) {
CSQueue csQueue = (CSQueue)defQ;
CSQueue csQueue = (CSQueue) defQ;
assertTrue(csQueue.getCapacity() > 0.9);
}
@ -155,7 +156,7 @@ protected Queue getDefaultQueue() {
@Override
protected int getNumberOfApplications(Queue queue) {
CSQueue csQueue = (CSQueue)queue;
CSQueue csQueue = (CSQueue) queue;
int numberOfApplications = csQueue.getNumApplications();
return numberOfApplications;
}

View File

@ -1,127 +0,0 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
public class TestFairReservationSystem {
private final static String ALLOC_FILE = new File(FairSchedulerTestBase.
TEST_DIR,
TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath();
private Configuration conf;
private FairScheduler scheduler;
private FairSchedulerTestBase testHelper = new FairSchedulerTestBase();
protected Configuration createConfiguration() {
Configuration conf = testHelper.createConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
return conf;
}
@Before
public void setup() throws IOException {
conf = createConfiguration();
}
@After
public void teardown() {
conf = null;
}
@Test
public void testFairReservationSystemInitialize() throws IOException {
ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
// Setup
RMContext mockRMContext = testUtil.createRMContext(conf);
scheduler = ReservationSystemTestUtil.setupFairScheduler(testUtil,
mockRMContext, conf, 10);
FairReservationSystem reservationSystem = new FairReservationSystem();
reservationSystem.setRMContext(mockRMContext);
try {
reservationSystem.reinitialize(scheduler.getConf(), mockRMContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
testUtil.getFullReservationQueueName());
}
@Test
public void testFairReservationSystemReinitialize() throws IOException {
ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
// Setup
RMContext mockRMContext = testUtil.createRMContext(conf);
scheduler = ReservationSystemTestUtil.setupFairScheduler(testUtil,
mockRMContext, conf, 10);
FairReservationSystem reservationSystem = new FairReservationSystem();
reservationSystem.setRMContext(mockRMContext);
try {
reservationSystem.reinitialize(scheduler.getConf(), mockRMContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
// Assert queue in original config
final String planQNam = testUtil.getFullReservationQueueName();
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
planQNam);
// Dynamically add a plan
ReservationSystemTestUtil.updateFSAllocationFile(ALLOC_FILE);
scheduler.reinitialize(conf, mockRMContext);
try {
reservationSystem.reinitialize(conf, mockRMContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
String newQueue = "root.reservation";
ReservationSystemTestUtil.validateNewReservationQueue
(reservationSystem, newQueue);
}
}

View File

@ -1,20 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertNotNull;
@ -62,9 +62,9 @@
public class TestFairSchedulerPlanFollower extends
TestSchedulerPlanFollowerBase {
private final static String ALLOC_FILE = new File(FairSchedulerTestBase.
TEST_DIR,
TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath();
private final static String ALLOC_FILE = new File(
FairSchedulerTestBase.TEST_DIR,
TestSchedulerPlanFollowerBase.class.getName() + ".xml").getAbsolutePath();
private RMContext rmContext;
private RMContext spyRMContext;
private FairScheduler fs;
@ -86,13 +86,11 @@ protected Configuration createConfiguration() {
public void setUp() throws Exception {
conf = createConfiguration();
ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
// Setup
rmContext = TestUtils.getMockRMContext();
spyRMContext = spy(rmContext);
fs = ReservationSystemTestUtil.setupFairScheduler(testUtil,
spyRMContext, conf, 125);
fs = ReservationSystemTestUtil.setupFairScheduler(spyRMContext, conf, 125);
scheduler = fs;
ConcurrentMap<ApplicationId, RMApp> spyApps =
@ -108,11 +106,11 @@ public void setUp() throws Exception {
}
private void setupPlanFollower() throws Exception {
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
mClock = mock(Clock.class);
mAgent = mock(ReservationAgent.class);
String reservationQ = testUtil.getFullReservationQueueName();
String reservationQ =
ReservationSystemTestUtil.getFullReservationQueueName();
AllocationConfiguration allocConf = fs.getAllocationConfiguration();
allocConf.setReservationWindow(20L);
allocConf.setAverageCapacity(20);
@ -135,14 +133,13 @@ public void testWithKillOnExpiry() throws PlanningException,
@Override
protected void verifyCapacity(Queue defQ) {
assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) >
0.9);
assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) > 0.9);
}
@Override
protected Queue getDefaultQueue() {
return getReservationQueue("dedicated" +
ReservationConstants.DEFAULT_QUEUE_SUFFIX);
return getReservationQueue("dedicated"
+ ReservationConstants.DEFAULT_QUEUE_SUFFIX);
}
@Override
@ -153,8 +150,7 @@ protected int getNumberOfApplications(Queue queue) {
@Override
protected AbstractSchedulerPlanFollower createPlanFollower() {
FairSchedulerPlanFollower planFollower =
new FairSchedulerPlanFollower();
FairSchedulerPlanFollower planFollower = new FairSchedulerPlanFollower();
planFollower.init(mClock, scheduler, Collections.singletonList(plan));
return planFollower;
}
@ -168,13 +164,13 @@ protected void assertReservationQueueExists(ReservationId r) {
@Override
protected void assertReservationQueueExists(ReservationId r,
double expectedCapacity, double expectedMaxCapacity) {
FSLeafQueue q = fs.getQueueManager().getLeafQueue(plan.getQueueName() + "" +
"." +
r, false);
FSLeafQueue q =
fs.getQueueManager().getLeafQueue(plan.getQueueName() + "" + "." + r,
false);
assertNotNull(q);
// For now we are setting both to same weight
Assert.assertEquals(expectedCapacity, q.getWeights().getWeight
(ResourceType.MEMORY), 0.01);
Assert.assertEquals(expectedCapacity,
q.getWeights().getWeight(ResourceType.MEMORY), 0.01);
}
@Override
@ -185,9 +181,8 @@ protected void assertReservationQueueDoesNotExist(ReservationId r) {
@Override
protected Queue getReservationQueue(String r) {
return fs.getQueueManager().getLeafQueue(plan.getQueueName() + "" +
"." +
r, false);
return fs.getQueueManager().getLeafQueue(
plan.getQueueName() + "" + "." + r, false);
}
public static ApplicationACLsManager mockAppACLsManager() {

View File

@ -0,0 +1,213 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
@SuppressWarnings({ "rawtypes" })
public class TestReservationSystem extends
ParameterizedSchedulerTestBase {
private final static String ALLOC_FILE = new File(
FairSchedulerTestBase.TEST_DIR, TestReservationSystem.class.getName()
+ ".xml").getAbsolutePath();
private AbstractYarnScheduler scheduler;
private AbstractReservationSystem reservationSystem;
private RMContext rmContext;
private Configuration conf;
private RMContext mockRMContext;
public TestReservationSystem(SchedulerType type) {
super(type);
}
@Before
public void setUp() throws IOException {
scheduler = initializeScheduler();
rmContext = getRMContext();
reservationSystem = configureReservationSystem();
reservationSystem.setRMContext(rmContext);
DefaultMetricsSystem.setMiniClusterMode(true);
}
@After
public void tearDown() {
conf = null;
reservationSystem = null;
rmContext = null;
scheduler = null;
clearRMContext();
QueueMetrics.clearQueueMetrics();
}
@Test
public void testInitialize() throws IOException {
try {
reservationSystem.reinitialize(scheduler.getConfig(), rmContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
if (getSchedulerType().equals(SchedulerType.CAPACITY)) {
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
ReservationSystemTestUtil.getReservationQueueName());
} else {
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
ReservationSystemTestUtil.getFullReservationQueueName());
}
}
@Test
public void testReinitialize() throws IOException {
conf = scheduler.getConfig();
try {
reservationSystem.reinitialize(conf, rmContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
if (getSchedulerType().equals(SchedulerType.CAPACITY)) {
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
ReservationSystemTestUtil.getReservationQueueName());
} else {
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
ReservationSystemTestUtil.getFullReservationQueueName());
}
// Dynamically add a plan
String newQ = "reservation";
Assert.assertNull(reservationSystem.getPlan(newQ));
updateSchedulerConf(conf, newQ);
try {
scheduler.reinitialize(conf, rmContext);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
try {
reservationSystem.reinitialize(conf, rmContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
if (getSchedulerType().equals(SchedulerType.CAPACITY)) {
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
newQ);
} else {
ReservationSystemTestUtil.validateReservationQueue(reservationSystem,
"root." + newQ);
}
}
@SuppressWarnings("rawtypes")
public AbstractYarnScheduler initializeScheduler() throws IOException {
switch (getSchedulerType()) {
case CAPACITY:
return initializeCapacityScheduler();
case FAIR:
return initializeFairScheduler();
}
return null;
}
public AbstractReservationSystem configureReservationSystem() {
switch (getSchedulerType()) {
case CAPACITY:
return new CapacityReservationSystem();
case FAIR:
return new FairReservationSystem();
}
return null;
}
public void updateSchedulerConf(Configuration conf, String newQ)
throws IOException {
switch (getSchedulerType()) {
case CAPACITY:
ReservationSystemTestUtil.updateQueueConfiguration(
(CapacitySchedulerConfiguration) conf, newQ);
case FAIR:
ReservationSystemTestUtil.updateFSAllocationFile(ALLOC_FILE);
}
}
public RMContext getRMContext() {
return mockRMContext;
}
public void clearRMContext() {
mockRMContext = null;
}
private CapacityScheduler initializeCapacityScheduler() {
// stolen from TestCapacityScheduler
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
CapacityScheduler cs = Mockito.spy(new CapacityScheduler());
cs.setConf(conf);
mockRMContext = ReservationSystemTestUtil.createRMContext(conf);
cs.setRMContext(mockRMContext);
try {
cs.serviceInit(conf);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
ReservationSystemTestUtil.initializeRMContext(10, cs, mockRMContext);
return cs;
}
private Configuration createFSConfiguration() {
FairSchedulerTestBase testHelper = new FairSchedulerTestBase();
Configuration conf = testHelper.createConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
return conf;
}
private FairScheduler initializeFairScheduler() throws IOException {
Configuration conf = createFSConfiguration();
ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
// Setup
mockRMContext = ReservationSystemTestUtil.createRMContext(conf);
return ReservationSystemTestUtil
.setupFairScheduler(mockRMContext, conf, 10);
}
}