YARN-4440. FSAppAttempt#getAllowedLocalityLevelByTime should init the lastScheduler time. Contributed by Lin Yiqun
This commit is contained in:
parent
d8a45425eb
commit
2aaed10327
@ -1135,6 +1135,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
YARN-4402. TestNodeManagerShutdown And TestNodeManagerResync fails with
|
YARN-4402. TestNodeManagerShutdown And TestNodeManagerResync fails with
|
||||||
bind exception. (Brahma Reddy Battula via jianhe)
|
bind exception. (Brahma Reddy Battula via jianhe)
|
||||||
|
|
||||||
|
YARN-4440. FSAppAttempt#getAllowedLocalityLevelByTime should init the
|
||||||
|
lastScheduler time. (Lin Yiqun via zxu)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -286,6 +286,13 @@ public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
|
|||||||
|
|
||||||
// default level is NODE_LOCAL
|
// default level is NODE_LOCAL
|
||||||
if (! allowedLocalityLevel.containsKey(priority)) {
|
if (! allowedLocalityLevel.containsKey(priority)) {
|
||||||
|
// add the initial time of priority to prevent comparing with FsApp
|
||||||
|
// startTime and allowedLocalityLevel degrade
|
||||||
|
lastScheduledContainer.put(priority, currentTimeMs);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Init the lastScheduledContainer time, priority: " + priority
|
||||||
|
+ ", time: " + currentTimeMs);
|
||||||
|
}
|
||||||
allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
|
allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
|
||||||
return NodeType.NODE_LOCAL;
|
return NodeType.NODE_LOCAL;
|
||||||
}
|
}
|
||||||
|
@ -5151,4 +5151,64 @@ public void testUserAsDefaultQueueWithLeadingTrailingSpaceUserName()
|
|||||||
assertEquals("root.user1", resourceManager.getRMContext().getRMApps()
|
assertEquals("root.user1", resourceManager.getRMContext().getRMApps()
|
||||||
.get(attId3.getApplicationId()).getQueue());
|
.get(attId3.getApplicationId()).getQueue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
|
||||||
|
int DELAY_THRESHOLD_TIME_MS = 1000;
|
||||||
|
conf.set(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, "true");
|
||||||
|
conf.set(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS,
|
||||||
|
String.valueOf(DELAY_THRESHOLD_TIME_MS));
|
||||||
|
conf.set(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS,
|
||||||
|
String.valueOf(DELAY_THRESHOLD_TIME_MS));
|
||||||
|
|
||||||
|
ControlledClock clock = new ControlledClock();
|
||||||
|
scheduler.setClock(clock);
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
|
||||||
|
int priorityValue;
|
||||||
|
Priority priority;
|
||||||
|
FSAppAttempt fsAppAttempt;
|
||||||
|
ResourceRequest request1;
|
||||||
|
ResourceRequest request2;
|
||||||
|
ApplicationAttemptId id11;
|
||||||
|
|
||||||
|
priorityValue = 1;
|
||||||
|
id11 = createAppAttemptId(1, 1);
|
||||||
|
createMockRMApp(id11);
|
||||||
|
priority = Priority.newInstance(priorityValue);
|
||||||
|
scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1",
|
||||||
|
false);
|
||||||
|
scheduler.addApplicationAttempt(id11, false, false);
|
||||||
|
fsAppAttempt = scheduler.getApplicationAttempt(id11);
|
||||||
|
|
||||||
|
String hostName = "127.0.0.1";
|
||||||
|
RMNode node1 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(16 * 1024, 16), 1,
|
||||||
|
hostName);
|
||||||
|
List<ResourceRequest> ask1 = new ArrayList<>();
|
||||||
|
request1 =
|
||||||
|
createResourceRequest(1024, 8, node1.getRackName(), priorityValue, 1,
|
||||||
|
true);
|
||||||
|
request2 =
|
||||||
|
createResourceRequest(1024, 8, ResourceRequest.ANY, priorityValue, 1,
|
||||||
|
true);
|
||||||
|
ask1.add(request1);
|
||||||
|
ask1.add(request2);
|
||||||
|
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>(), null, null,
|
||||||
|
null, null);
|
||||||
|
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
FSSchedulerNode node =
|
||||||
|
(FSSchedulerNode) scheduler.getSchedulerNode(node1.getNodeID());
|
||||||
|
// Tick the time and let the fsApp startTime different from initScheduler
|
||||||
|
// time
|
||||||
|
clock.tickSec(DELAY_THRESHOLD_TIME_MS / 1000);
|
||||||
|
scheduler.attemptScheduling(node);
|
||||||
|
Map<Priority, Long> lastScheduledContainer =
|
||||||
|
fsAppAttempt.getLastScheduledContainer();
|
||||||
|
long initSchedulerTime = lastScheduledContainer.get(priority);
|
||||||
|
assertEquals(DELAY_THRESHOLD_TIME_MS, initSchedulerTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user