YARN-5308. FairScheduler: Move continuous scheduling related tests to TestContinuousScheduling (Kai Sasaki via Varun Saxena)
This commit is contained in:
parent
ac35ee9393
commit
79aeddc88f
@ -22,20 +22,32 @@ import org.apache.hadoop.conf.Configuration;
|
|||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -43,18 +55,22 @@ import java.util.ArrayList;
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
public class TestContinuousScheduling extends FairSchedulerTestBase {
|
public class TestContinuousScheduling extends FairSchedulerTestBase {
|
||||||
private ControlledClock mockClock;
|
private ControlledClock mockClock;
|
||||||
|
private static int delayThresholdTimeMs = 1000;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Configuration createConfiguration() {
|
public Configuration createConfiguration() {
|
||||||
Configuration conf = super.createConfiguration();
|
Configuration conf = super.createConfiguration();
|
||||||
conf.setBoolean(
|
conf.setBoolean(
|
||||||
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true);
|
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true);
|
||||||
conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS, 100);
|
conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS,
|
||||||
conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS, 100);
|
delayThresholdTimeMs);
|
||||||
|
conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS,
|
||||||
|
delayThresholdTimeMs);
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,6 +183,175 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
|
|||||||
Assert.assertEquals(2, nodes.size());
|
Assert.assertEquals(2, nodes.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithNodeRemoved() throws Exception {
|
||||||
|
// Disable continuous scheduling, will invoke continuous
|
||||||
|
// scheduling once manually
|
||||||
|
scheduler = new FairScheduler();
|
||||||
|
conf = super.createConfiguration();
|
||||||
|
resourceManager = new MockRM(conf);
|
||||||
|
|
||||||
|
// TODO: This test should really be using MockRM. For now starting stuff
|
||||||
|
// that is needed at a bare minimum.
|
||||||
|
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
|
||||||
|
resourceManager.getRMContext().getStateStore().start();
|
||||||
|
|
||||||
|
// to initialize the master key
|
||||||
|
resourceManager.getRMContext().getContainerTokenSecretManager()
|
||||||
|
.rollMasterKey();
|
||||||
|
|
||||||
|
scheduler.setRMContext(resourceManager.getRMContext());
|
||||||
|
Assert.assertTrue("Continuous scheduling should be disabled.",
|
||||||
|
!scheduler.isContinuousSchedulingEnabled());
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
|
||||||
|
// Add two nodes
|
||||||
|
RMNode node1 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||||
|
"127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
RMNode node2 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
||||||
|
"127.0.0.2");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeEvent2);
|
||||||
|
Assert.assertEquals("We should have two alive nodes.",
|
||||||
|
2, scheduler.getNumClusterNodes());
|
||||||
|
|
||||||
|
// Remove one node
|
||||||
|
NodeRemovedSchedulerEvent removeNode1
|
||||||
|
= new NodeRemovedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(removeNode1);
|
||||||
|
Assert.assertEquals("We should only have one alive node.",
|
||||||
|
1, scheduler.getNumClusterNodes());
|
||||||
|
|
||||||
|
// Invoke the continuous scheduling once
|
||||||
|
try {
|
||||||
|
scheduler.continuousSchedulingAttempt();
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Exception happened when doing continuous scheduling. " +
|
||||||
|
e.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInterruptedException()
|
||||||
|
throws Exception {
|
||||||
|
// Disable continuous scheduling, will invoke continuous
|
||||||
|
// scheduling once manually
|
||||||
|
scheduler = new FairScheduler();
|
||||||
|
conf = super.createConfiguration();
|
||||||
|
resourceManager = new MockRM(conf);
|
||||||
|
|
||||||
|
// TODO: This test should really be using MockRM. For now starting stuff
|
||||||
|
// that is needed at a bare minimum.
|
||||||
|
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
|
||||||
|
resourceManager.getRMContext().getStateStore().start();
|
||||||
|
|
||||||
|
// to initialize the master key
|
||||||
|
resourceManager.getRMContext().getContainerTokenSecretManager()
|
||||||
|
.rollMasterKey();
|
||||||
|
|
||||||
|
scheduler.setRMContext(resourceManager.getRMContext());
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
FairScheduler spyScheduler = spy(scheduler);
|
||||||
|
Assert.assertTrue("Continuous scheduling should be disabled.",
|
||||||
|
!spyScheduler.isContinuousSchedulingEnabled());
|
||||||
|
// Add one nodes
|
||||||
|
RMNode node1 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||||
|
"127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
spyScheduler.handle(nodeEvent1);
|
||||||
|
Assert.assertEquals("We should have one alive node.",
|
||||||
|
1, spyScheduler.getNumClusterNodes());
|
||||||
|
InterruptedException ie = new InterruptedException();
|
||||||
|
doThrow(new YarnRuntimeException(ie)).when(spyScheduler).
|
||||||
|
attemptScheduling(isA(FSSchedulerNode.class));
|
||||||
|
// Invoke the continuous scheduling once
|
||||||
|
try {
|
||||||
|
spyScheduler.continuousSchedulingAttempt();
|
||||||
|
fail("Expected InterruptedException to stop schedulingThread");
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Assert.assertEquals(ie, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testThreadLifeCycle() throws InterruptedException {
|
||||||
|
scheduler.start();
|
||||||
|
|
||||||
|
Thread updateThread = scheduler.updateThread;
|
||||||
|
Thread schedulingThread = scheduler.schedulingThread;
|
||||||
|
|
||||||
|
assertTrue(updateThread.isAlive());
|
||||||
|
assertTrue(schedulingThread.isAlive());
|
||||||
|
|
||||||
|
scheduler.stop();
|
||||||
|
|
||||||
|
int numRetries = 100;
|
||||||
|
while (numRetries-- > 0 &&
|
||||||
|
(updateThread.isAlive() || schedulingThread.isAlive())) {
|
||||||
|
Thread.sleep(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertNotEquals("One of the threads is still alive", 0, numRetries);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
|
||||||
|
scheduler.start();
|
||||||
|
|
||||||
|
int priorityValue;
|
||||||
|
Priority priority;
|
||||||
|
FSAppAttempt fsAppAttempt;
|
||||||
|
ResourceRequest request1;
|
||||||
|
ResourceRequest request2;
|
||||||
|
ApplicationAttemptId id11;
|
||||||
|
|
||||||
|
priorityValue = 1;
|
||||||
|
id11 = createAppAttemptId(1, 1);
|
||||||
|
createMockRMApp(id11);
|
||||||
|
priority = Priority.newInstance(priorityValue);
|
||||||
|
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
|
||||||
|
false);
|
||||||
|
scheduler.addApplicationAttempt(id11, false, false);
|
||||||
|
fsAppAttempt = scheduler.getApplicationAttempt(id11);
|
||||||
|
|
||||||
|
String hostName = "127.0.0.1";
|
||||||
|
RMNode node1 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(16 * 1024, 16), 1,
|
||||||
|
hostName);
|
||||||
|
List<ResourceRequest> ask1 = new ArrayList<>();
|
||||||
|
request1 =
|
||||||
|
createResourceRequest(1024, 8, node1.getRackName(), priorityValue, 1,
|
||||||
|
true);
|
||||||
|
request2 =
|
||||||
|
createResourceRequest(1024, 8, ResourceRequest.ANY, priorityValue, 1,
|
||||||
|
true);
|
||||||
|
ask1.add(request1);
|
||||||
|
ask1.add(request2);
|
||||||
|
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null,
|
||||||
|
null, null);
|
||||||
|
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
FSSchedulerNode node =
|
||||||
|
(FSSchedulerNode) scheduler.getSchedulerNode(node1.getNodeID());
|
||||||
|
// Tick the time and let the fsApp startTime different from initScheduler
|
||||||
|
// time
|
||||||
|
mockClock.tickSec(delayThresholdTimeMs / 1000);
|
||||||
|
scheduler.attemptScheduling(node);
|
||||||
|
Map<SchedulerRequestKey, Long> lastScheduledContainer =
|
||||||
|
fsAppAttempt.getLastScheduledContainer();
|
||||||
|
long initSchedulerTime =
|
||||||
|
lastScheduledContainer.get(TestUtils.toSchedulerKey(priority));
|
||||||
|
assertEquals(delayThresholdTimeMs, initSchedulerTime);
|
||||||
|
}
|
||||||
|
|
||||||
private void triggerSchedulingAttempt() throws InterruptedException {
|
private void triggerSchedulingAttempt() throws InterruptedException {
|
||||||
Thread.sleep(
|
Thread.sleep(
|
||||||
2 * scheduler.getConf().getContinuousSchedulingSleepMs());
|
2 * scheduler.getConf().getContinuousSchedulingSleepMs());
|
||||||
|
@ -25,10 +25,7 @@ import static org.junit.Assert.assertNotNull;
|
|||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.isA;
|
|
||||||
import static org.mockito.Mockito.doThrow;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
@ -62,7 +59,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
|
||||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
@ -99,12 +95,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
|
|
||||||
.TestUtils;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||||
@ -4124,71 +4117,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||||||
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
verifyQueueNumRunnable("queue1.sub3", 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testContinuousSchedulingWithNodeRemoved() throws Exception {
|
|
||||||
// Disable continuous scheduling, will invoke continuous scheduling once manually
|
|
||||||
scheduler.init(conf);
|
|
||||||
scheduler.start();
|
|
||||||
Assert.assertTrue("Continuous scheduling should be disabled.",
|
|
||||||
!scheduler.isContinuousSchedulingEnabled());
|
|
||||||
|
|
||||||
// Add two nodes
|
|
||||||
RMNode node1 =
|
|
||||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
|
||||||
"127.0.0.1");
|
|
||||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
||||||
scheduler.handle(nodeEvent1);
|
|
||||||
RMNode node2 =
|
|
||||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
|
|
||||||
"127.0.0.2");
|
|
||||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
|
||||||
scheduler.handle(nodeEvent2);
|
|
||||||
Assert.assertEquals("We should have two alive nodes.",
|
|
||||||
2, scheduler.getNumClusterNodes());
|
|
||||||
|
|
||||||
// Remove one node
|
|
||||||
NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1);
|
|
||||||
scheduler.handle(removeNode1);
|
|
||||||
Assert.assertEquals("We should only have one alive node.",
|
|
||||||
1, scheduler.getNumClusterNodes());
|
|
||||||
|
|
||||||
// Invoke the continuous scheduling once
|
|
||||||
try {
|
|
||||||
scheduler.continuousSchedulingAttempt();
|
|
||||||
} catch (Exception e) {
|
|
||||||
fail("Exception happened when doing continuous scheduling. " +
|
|
||||||
e.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testContinuousSchedulingInterruptedException()
|
|
||||||
throws Exception {
|
|
||||||
scheduler.init(conf);
|
|
||||||
scheduler.start();
|
|
||||||
FairScheduler spyScheduler = spy(scheduler);
|
|
||||||
Assert.assertTrue("Continuous scheduling should be disabled.",
|
|
||||||
!spyScheduler.isContinuousSchedulingEnabled());
|
|
||||||
// Add one nodes
|
|
||||||
RMNode node1 =
|
|
||||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
|
||||||
"127.0.0.1");
|
|
||||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
||||||
spyScheduler.handle(nodeEvent1);
|
|
||||||
Assert.assertEquals("We should have one alive node.",
|
|
||||||
1, spyScheduler.getNumClusterNodes());
|
|
||||||
InterruptedException ie = new InterruptedException();
|
|
||||||
doThrow(new YarnRuntimeException(ie)).when(spyScheduler).
|
|
||||||
attemptScheduling(isA(FSSchedulerNode.class));
|
|
||||||
// Invoke the continuous scheduling once
|
|
||||||
try {
|
|
||||||
spyScheduler.continuousSchedulingAttempt();
|
|
||||||
fail("Expected InterruptedException to stop schedulingThread");
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Assert.assertEquals(ie, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSchedulingOnRemovedNode() throws Exception {
|
public void testSchedulingOnRemovedNode() throws Exception {
|
||||||
// Disable continuous scheduling, will invoke continuous scheduling manually
|
// Disable continuous scheduling, will invoke continuous scheduling manually
|
||||||
@ -4486,30 +4414,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||||||
assertEquals(ancestorQueue, queue1);
|
assertEquals(ancestorQueue, queue1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testThreadLifeCycle() throws InterruptedException {
|
|
||||||
conf.setBoolean(
|
|
||||||
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true);
|
|
||||||
scheduler.init(conf);
|
|
||||||
scheduler.start();
|
|
||||||
|
|
||||||
Thread updateThread = scheduler.updateThread;
|
|
||||||
Thread schedulingThread = scheduler.schedulingThread;
|
|
||||||
|
|
||||||
assertTrue(updateThread.isAlive());
|
|
||||||
assertTrue(schedulingThread.isAlive());
|
|
||||||
|
|
||||||
scheduler.stop();
|
|
||||||
|
|
||||||
int numRetries = 100;
|
|
||||||
while (numRetries-- > 0 &&
|
|
||||||
(updateThread.isAlive() || schedulingThread.isAlive())) {
|
|
||||||
Thread.sleep(50);
|
|
||||||
}
|
|
||||||
|
|
||||||
assertNotEquals("One of the threads is still alive", 0, numRetries);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPerfMetricsInited() {
|
public void testPerfMetricsInited() {
|
||||||
scheduler.init(conf);
|
scheduler.init(conf);
|
||||||
@ -4644,67 +4548,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||||||
.get(attId3.getApplicationId()).getQueue());
|
.get(attId3.getApplicationId()).getQueue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
|
|
||||||
int DELAY_THRESHOLD_TIME_MS = 1000;
|
|
||||||
conf.set(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, "true");
|
|
||||||
conf.set(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS,
|
|
||||||
String.valueOf(DELAY_THRESHOLD_TIME_MS));
|
|
||||||
conf.set(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS,
|
|
||||||
String.valueOf(DELAY_THRESHOLD_TIME_MS));
|
|
||||||
|
|
||||||
ControlledClock clock = new ControlledClock();
|
|
||||||
scheduler.setClock(clock);
|
|
||||||
scheduler.init(conf);
|
|
||||||
scheduler.start();
|
|
||||||
|
|
||||||
int priorityValue;
|
|
||||||
Priority priority;
|
|
||||||
FSAppAttempt fsAppAttempt;
|
|
||||||
ResourceRequest request1;
|
|
||||||
ResourceRequest request2;
|
|
||||||
ApplicationAttemptId id11;
|
|
||||||
|
|
||||||
priorityValue = 1;
|
|
||||||
id11 = createAppAttemptId(1, 1);
|
|
||||||
createMockRMApp(id11);
|
|
||||||
priority = Priority.newInstance(priorityValue);
|
|
||||||
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
|
|
||||||
false);
|
|
||||||
scheduler.addApplicationAttempt(id11, false, false);
|
|
||||||
fsAppAttempt = scheduler.getApplicationAttempt(id11);
|
|
||||||
|
|
||||||
String hostName = "127.0.0.1";
|
|
||||||
RMNode node1 =
|
|
||||||
MockNodes.newNodeInfo(1, Resources.createResource(16 * 1024, 16), 1,
|
|
||||||
hostName);
|
|
||||||
List<ResourceRequest> ask1 = new ArrayList<>();
|
|
||||||
request1 =
|
|
||||||
createResourceRequest(1024, 8, node1.getRackName(), priorityValue, 1,
|
|
||||||
true);
|
|
||||||
request2 =
|
|
||||||
createResourceRequest(1024, 8, ResourceRequest.ANY, priorityValue, 1,
|
|
||||||
true);
|
|
||||||
ask1.add(request1);
|
|
||||||
ask1.add(request2);
|
|
||||||
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null,
|
|
||||||
null, null);
|
|
||||||
|
|
||||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
|
||||||
scheduler.handle(nodeEvent1);
|
|
||||||
FSSchedulerNode node =
|
|
||||||
(FSSchedulerNode) scheduler.getSchedulerNode(node1.getNodeID());
|
|
||||||
// Tick the time and let the fsApp startTime different from initScheduler
|
|
||||||
// time
|
|
||||||
clock.tickSec(DELAY_THRESHOLD_TIME_MS / 1000);
|
|
||||||
scheduler.attemptScheduling(node);
|
|
||||||
Map<SchedulerRequestKey, Long> lastScheduledContainer =
|
|
||||||
fsAppAttempt.getLastScheduledContainer();
|
|
||||||
long initSchedulerTime =
|
|
||||||
lastScheduledContainer.get(TestUtils.toSchedulerKey(priority));
|
|
||||||
assertEquals(DELAY_THRESHOLD_TIME_MS, initSchedulerTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testResourceUpdateDecommissioningNode() throws Exception {
|
public void testResourceUpdateDecommissioningNode() throws Exception {
|
||||||
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
|
// Mock the RMNodeResourceUpdate event handler to update SchedulerNode
|
||||||
|
Loading…
x
Reference in New Issue
Block a user