YARN-2308. Changed CapacityScheduler to explicitly throw exception if the queue
to which the apps were submitted is changed across RM restart. Contributed by Craig Welch & Chang Li
This commit is contained in:
parent
a56ea01002
commit
f9680d9a16
@ -648,6 +648,19 @@ private synchronized void addApplication(ApplicationId applicationId,
|
|||||||
// sanity checks.
|
// sanity checks.
|
||||||
CSQueue queue = getQueue(queueName);
|
CSQueue queue = getQueue(queueName);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
|
//During a restart, this indicates a queue was removed, which is
|
||||||
|
//not presently supported
|
||||||
|
if (isAppRecovering) {
|
||||||
|
//throwing RuntimeException because some other exceptions are caught
|
||||||
|
//(including YarnRuntimeException) and we want this to force an exit
|
||||||
|
String queueErrorMsg = "Queue named " + queueName
|
||||||
|
+ " missing during application recovery."
|
||||||
|
+ " Queue removal during recovery is not presently supported by the"
|
||||||
|
+ " capacity scheduler, please restart with all queues configured"
|
||||||
|
+ " which were present before shutdown/restart.";
|
||||||
|
LOG.fatal(queueErrorMsg);
|
||||||
|
throw new RuntimeException(queueErrorMsg);
|
||||||
|
}
|
||||||
String message = "Application " + applicationId +
|
String message = "Application " + applicationId +
|
||||||
" submitted by user " + user + " to unknown queue: " + queueName;
|
" submitted by user " + user + " to unknown queue: " + queueName;
|
||||||
this.rmContext.getDispatcher().getEventHandler()
|
this.rmContext.getDispatcher().getEventHandler()
|
||||||
|
@ -250,6 +250,15 @@ public RMApp submitApp(int masterMemory, String name, String user,
|
|||||||
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public RMApp submitApp(int masterMemory, String name, String user,
|
||||||
|
Map<ApplicationAccessType, String> acls, String queue,
|
||||||
|
boolean waitForAccepted) throws Exception {
|
||||||
|
return submitApp(masterMemory, name, user, acls, false, queue,
|
||||||
|
super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
|
||||||
|
waitForAccepted);
|
||||||
|
}
|
||||||
|
|
||||||
public RMApp submitApp(int masterMemory, String name, String user,
|
public RMApp submitApp(int masterMemory, String name, String user,
|
||||||
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
||||||
int maxAppAttempts, Credentials ts) throws Exception {
|
int maxAppAttempts, Credentials ts) throws Exception {
|
||||||
|
@ -336,6 +336,8 @@ private void checkFifoQueue(SchedulerApplication schedulerApp,
|
|||||||
private static final String R = "Default";
|
private static final String R = "Default";
|
||||||
private static final String A = "QueueA";
|
private static final String A = "QueueA";
|
||||||
private static final String B = "QueueB";
|
private static final String B = "QueueB";
|
||||||
|
//don't ever create the below queue ;-)
|
||||||
|
private static final String QUEUE_DOESNT_EXIST = "NoSuchQueue";
|
||||||
private static final String USER_1 = "user1";
|
private static final String USER_1 = "user1";
|
||||||
private static final String USER_2 = "user2";
|
private static final String USER_2 = "user2";
|
||||||
|
|
||||||
@ -352,6 +354,18 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
|||||||
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
|
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setupQueueConfigurationOnlyA(
|
||||||
|
CapacitySchedulerConfiguration conf) {
|
||||||
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R });
|
||||||
|
final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R;
|
||||||
|
conf.setCapacity(Q_R, 100);
|
||||||
|
final String Q_A = Q_R + "." + A;
|
||||||
|
conf.setQueues(Q_R, new String[] {A});
|
||||||
|
conf.setCapacity(Q_A, 100);
|
||||||
|
conf.setDouble(CapacitySchedulerConfiguration
|
||||||
|
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 1.0f);
|
||||||
|
}
|
||||||
|
|
||||||
// Test CS recovery with multi-level queues and multi-users:
|
// Test CS recovery with multi-level queues and multi-users:
|
||||||
// 1. setup 2 NMs each with 8GB memory;
|
// 1. setup 2 NMs each with 8GB memory;
|
||||||
// 2. setup 2 level queues: Default -> (QueueA, QueueB)
|
// 2. setup 2 level queues: Default -> (QueueA, QueueB)
|
||||||
@ -471,6 +485,70 @@ public void testCapacitySchedulerRecovery() throws Exception {
|
|||||||
totalUsedResource.getVirtualCores());
|
totalUsedResource.getVirtualCores());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Test that we receive a meaningful exit-causing exception if a queue
|
||||||
|
//is removed during recovery
|
||||||
|
//1. Add some apps to two queues, attempt to add an app to a non-existant
|
||||||
|
// queue to verify that the new logic is not in effect during normal app
|
||||||
|
// submission
|
||||||
|
//2. Remove one of the queues, restart the RM
|
||||||
|
//3. Verify that the expected exception was thrown
|
||||||
|
@Test (timeout = 30000)
|
||||||
|
public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
|
||||||
|
if (!schedulerClass.equals(CapacityScheduler.class)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
||||||
|
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||||
|
DominantResourceCalculator.class.getName());
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration(conf);
|
||||||
|
setupQueueConfiguration(csConf);
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(csConf);
|
||||||
|
rm1 = new MockRM(csConf, memStore);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||||
|
MockNM nm2 =
|
||||||
|
new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
nm2.registerNode();
|
||||||
|
RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A);
|
||||||
|
MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1);
|
||||||
|
RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A);
|
||||||
|
MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2);
|
||||||
|
|
||||||
|
RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B);
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
||||||
|
|
||||||
|
//Submit an app with a non existant queue to make sure it does not
|
||||||
|
//cause a fatal failure in the non-recovery case
|
||||||
|
RMApp appNA = rm1.submitApp(1024, "app1_2", USER_1, null,
|
||||||
|
QUEUE_DOESNT_EXIST, false);
|
||||||
|
|
||||||
|
// clear queue metrics
|
||||||
|
rm1.clearQueueMetrics(app1_1);
|
||||||
|
rm1.clearQueueMetrics(app1_2);
|
||||||
|
rm1.clearQueueMetrics(app2);
|
||||||
|
|
||||||
|
// Re-start RM
|
||||||
|
csConf =
|
||||||
|
new CapacitySchedulerConfiguration(conf);
|
||||||
|
setupQueueConfigurationOnlyA(csConf);
|
||||||
|
rm2 = new MockRM(csConf, memStore);
|
||||||
|
boolean runtimeThrown = false;
|
||||||
|
try {
|
||||||
|
rm2.start();
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
//we're catching it because we want to verify the message
|
||||||
|
//and we don't want to set it as an expected exception for the
|
||||||
|
//test because we only want it to happen here
|
||||||
|
assertTrue(e.getMessage().contains(B + " missing"));
|
||||||
|
runtimeThrown = true;
|
||||||
|
}
|
||||||
|
assertTrue(runtimeThrown);
|
||||||
|
}
|
||||||
|
|
||||||
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
|
private void checkParentQueue(ParentQueue parentQueue, int numContainers,
|
||||||
Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) {
|
Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) {
|
||||||
assertEquals(numContainers, parentQueue.getNumContainers());
|
assertEquals(numContainers, parentQueue.getNumContainers());
|
||||||
|
Loading…
Reference in New Issue
Block a user