YARN-1986. In Fifo Scheduler, node heartbeat in between creating app and attempt causes NPE (Hong Zhiguo via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1594476 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cf7dddb603
commit
84dfae2f8a
@ -222,6 +222,9 @@ Release 2.4.1 - UNRELEASED
|
|||||||
YARN-1957. Consider the max capacity of the queue when computing the ideal
|
YARN-1957. Consider the max capacity of the queue when computing the ideal
|
||||||
capacity for preemption. (Carlo Curino via cdouglas)
|
capacity for preemption. (Carlo Curino via cdouglas)
|
||||||
|
|
||||||
|
YARN-1986. In Fifo Scheduler, node heartbeat in between creating app and
|
||||||
|
attempt causes NPE (Hong Zhiguo via Sandy Ryza)
|
||||||
|
|
||||||
Release 2.4.0 - 2014-04-07
|
Release 2.4.0 - 2014-04-07
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -360,7 +360,8 @@ private FiCaSchedulerNode getNode(NodeId nodeId) {
|
|||||||
return nodes.get(nodeId);
|
return nodes.get(nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addApplication(ApplicationId applicationId,
|
@VisibleForTesting
|
||||||
|
public synchronized void addApplication(ApplicationId applicationId,
|
||||||
String queue, String user) {
|
String queue, String user) {
|
||||||
SchedulerApplication application =
|
SchedulerApplication application =
|
||||||
new SchedulerApplication(DEFAULT_QUEUE, user);
|
new SchedulerApplication(DEFAULT_QUEUE, user);
|
||||||
@ -372,7 +373,8 @@ private synchronized void addApplication(ApplicationId applicationId,
|
|||||||
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void
|
@VisibleForTesting
|
||||||
|
public synchronized void
|
||||||
addApplicationAttempt(ApplicationAttemptId appAttemptId,
|
addApplicationAttempt(ApplicationAttemptId appAttemptId,
|
||||||
boolean transferStateFromPreviousAttempt) {
|
boolean transferStateFromPreviousAttempt) {
|
||||||
SchedulerApplication application =
|
SchedulerApplication application =
|
||||||
@ -458,6 +460,9 @@ private void assignContainers(FiCaSchedulerNode node) {
|
|||||||
.entrySet()) {
|
.entrySet()) {
|
||||||
FiCaSchedulerApp application =
|
FiCaSchedulerApp application =
|
||||||
(FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
|
(FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
|
||||||
|
if (application == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
LOG.debug("pre-assignContainers");
|
LOG.debug("pre-assignContainers");
|
||||||
application.showRequests();
|
application.showRequests();
|
||||||
synchronized (application) {
|
synchronized (application) {
|
||||||
@ -497,6 +502,9 @@ private void assignContainers(FiCaSchedulerNode node) {
|
|||||||
for (SchedulerApplication application : applications.values()) {
|
for (SchedulerApplication application : applications.values()) {
|
||||||
FiCaSchedulerApp attempt =
|
FiCaSchedulerApp attempt =
|
||||||
(FiCaSchedulerApp) application.getCurrentAppAttempt();
|
(FiCaSchedulerApp) application.getCurrentAppAttempt();
|
||||||
|
if (attempt == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
|
attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
@ -213,6 +214,32 @@ public void test() throws Exception {
|
|||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
|
||||||
|
FifoScheduler scheduler = new FifoScheduler();
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
scheduler.reinitialize(conf, rm.getRMContext());
|
||||||
|
|
||||||
|
RMNode node = MockNodes.newNodeInfo(1,
|
||||||
|
Resources.createResource(1024, 4), 1, "127.0.0.1");
|
||||||
|
scheduler.handle(new NodeAddedSchedulerEvent(node));
|
||||||
|
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
scheduler.addApplication(appId, "queue1", "user1");
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
|
||||||
|
try {
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
} catch (NullPointerException e) {
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
scheduler.addApplicationAttempt(attId, false);
|
||||||
|
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
private void testMinimumAllocation(YarnConfiguration conf, int testAlloc)
|
private void testMinimumAllocation(YarnConfiguration conf, int testAlloc)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
MockRM rm = new MockRM(conf);
|
MockRM rm = new MockRM(conf);
|
||||||
|
Loading…
Reference in New Issue
Block a user