YARN-11141. Capacity Scheduler does not support ambiguous queue names when moving application across queues. Contributed by Andras Gyori
This commit is contained in:
parent
09a69f996b
commit
b62d6ce6fd
@ -2678,7 +2678,12 @@ public class CapacityScheduler extends
|
||||
if (application == null) {
|
||||
throw new YarnException("App to be moved " + appId + " not found.");
|
||||
}
|
||||
String sourceQueueName = application.getQueue().getQueueName();
|
||||
if (!(application.getQueue() instanceof CSQueue)) {
|
||||
throw new YarnException("Source queue is not a Capacity Scheduler queue");
|
||||
}
|
||||
|
||||
CSQueue csQueue = (CSQueue) application.getQueue();
|
||||
String sourceQueueName = csQueue.getQueuePath();
|
||||
AbstractLeafQueue source =
|
||||
this.queueManager.getAndCheckLeafQueue(sourceQueueName);
|
||||
String destQueueName = handleMoveToPlanQueue(targetQueueName);
|
||||
|
@ -29,6 +29,7 @@ public final class CapacitySchedulerQueueHelpers {
|
||||
|
||||
public static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
public static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||
public static final String A_CHILD = A + ".a";
|
||||
public static final String A1 = A + ".a1";
|
||||
public static final String A2 = A + ".a2";
|
||||
public static final String B1 = B + ".b1";
|
||||
@ -89,6 +90,35 @@ public final class CapacitySchedulerQueueHelpers {
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf, to be modified
|
||||
* @return CS configuration which has deleted all children of queue(b)
|
||||
* root
|
||||
* / \
|
||||
* a b
|
||||
* / \
|
||||
* a1 a2
|
||||
*/
|
||||
public static CapacitySchedulerConfiguration setupQueueConfAmbiguousQueue(
|
||||
CapacitySchedulerConfiguration conf) {
|
||||
|
||||
// Define top-level queues
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[]{"a", "b"});
|
||||
|
||||
conf.setCapacity(A, A_CAPACITY);
|
||||
conf.setCapacity(B, B_CAPACITY);
|
||||
|
||||
// Define 2nd-level queues
|
||||
conf.setQueues(A, new String[]{"a", "a1"});
|
||||
conf.setCapacity(A_CHILD, A1_CAPACITY);
|
||||
conf.setUserLimitFactor(A1, 100.0f);
|
||||
conf.setCapacity(A1, A2_CAPACITY);
|
||||
conf.setUserLimitFactor(A2, 100.0f);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf, to be modified
|
||||
* @return CS configuration which has deleted all childred of queue(b)
|
||||
|
@ -56,6 +56,7 @@ import org.junit.Assert;
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfAmbiguousQueue;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
@ -174,6 +175,16 @@ public final class CapacitySchedulerTestUtilities {
|
||||
return appAttemptId1;
|
||||
}
|
||||
|
||||
public static MockRM setUpMoveAmbiguousQueue() {
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfAmbiguousQueue(conf);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
return rm;
|
||||
}
|
||||
|
||||
public static MockRM setUpMove() {
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
return setUpMove(conf);
|
||||
|
@ -106,6 +106,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.nodeUpdate;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.registerNode;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.setUpMove;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.setUpMoveAmbiguousQueue;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerTestUtilities.stopResourceManager;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
@ -347,6 +348,46 @@ public class TestCapacitySchedulerApps {
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAppAmbiguousQueue() throws Exception {
|
||||
MockRM rm = setUpMoveAmbiguousQueue();
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm.getResourceScheduler();
|
||||
QueueMetrics metrics = scheduler.getRootQueueMetrics();
|
||||
Assert.assertEquals(0, metrics.getAppsPending());
|
||||
// submit an app
|
||||
MockRMAppSubmissionData data =
|
||||
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
|
||||
.withAppName("test-move-1")
|
||||
.withUser("user_0")
|
||||
.withAcls(null)
|
||||
.withQueue("root.a.a")
|
||||
.withUnmanagedAM(false)
|
||||
.build();
|
||||
RMApp app = MockRMAppSubmitter.submit(rm, data);
|
||||
// check preconditions
|
||||
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("root.a.a");
|
||||
assertEquals(1, appsInA.size());
|
||||
String queue =
|
||||
scheduler.getApplicationAttempt(appsInA.get(0)).getQueue()
|
||||
.getQueueName();
|
||||
Assert.assertEquals("a", queue);
|
||||
|
||||
// now move the app
|
||||
scheduler.moveApplication(app.getApplicationId(), "a1");
|
||||
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("root.a.a1");
|
||||
assertEquals(1, appsInA1.size());
|
||||
queue =
|
||||
scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
|
||||
.getQueueName();
|
||||
Assert.assertEquals("a1", queue);
|
||||
|
||||
appsInA = scheduler.getAppsInQueue("root.a.a");
|
||||
assertTrue(appsInA.isEmpty());
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMoveAppBasic() throws Exception {
|
||||
MockRM rm = setUpMove();
|
||||
|
Loading…
x
Reference in New Issue
Block a user