From 47f711eebca315804c80012eea5f31275ac25518 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Wed, 28 Mar 2018 08:47:31 -0700 Subject: [PATCH] YARN-6629. NPE occurred when container allocation proposal is applied but its resource requests are removed before. (Tao Yang via wangda) Change-Id: I805880f90b3f6798ec96ed8e8e75755f390a9ad5 --- .../scheduler/capacity/CapacityScheduler.java | 4 +- .../common/fica/FiCaSchedulerApp.java | 16 ++++-- .../capacity/TestCapacityScheduler.java | 53 +++++++++++++++++++ 3 files changed, 68 insertions(+), 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index daf0354611..bf674a8354 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2825,8 +2825,8 @@ public boolean tryCommit(Resource cluster, ResourceCommitRequest r, // proposal might be outdated if AM failover just finished // and proposal queue was not be consumed in time if (app != null && attemptId.equals(app.getApplicationAttemptId())) { - if (app.accept(cluster, request, updatePending)) { - app.apply(cluster, request, updatePending); + if (app.accept(cluster, request, updatePending) + && app.apply(cluster, request, updatePending)) { LOG.info("Allocation proposal accepted"); isSuccess = true; } else{ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f3da0a36f0..32b2cad0dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -489,7 +489,7 @@ public boolean accept(Resource cluster, return accepted; } - public void apply(Resource cluster, ResourceCommitRequest request, boolean updatePending) { boolean reReservation = false; @@ -502,8 +502,16 @@ public void apply(Resource cluster, ResourceCommitRequest schedulerContainer = allocation.getAllocatedOrReservedContainer(); - RMContainer rmContainer = schedulerContainer.getRmContainer(); + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (updatePending && + getOutstandingAsksCount(schedulerContainer.getSchedulerRequestKey()) + <= 0) { + return false; + } + + RMContainer rmContainer = schedulerContainer.getRmContainer(); reReservation = (!schedulerContainer.isAllocated()) && (rmContainer.getState() == RMContainerState.RESERVED); @@ -545,7 +553,8 @@ public void apply(Resource cluster, ResourceCommitRequest() { + public Object answer(InvocationOnMock invocation) throws Exception { + // clear resource request before applying the proposal for container_2 + spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), + Arrays.asList(ResourceRequest.newInstance(priority, "*", + Resources.createResource(1 * GB), 0)), null, + Collections.emptyList(), null, null, + NULL_UPDATE_REQUESTS); + // trigger real apply which can raise NPE before YARN-6629 + try { + FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt( + app.getCurrentAppAttempt().getAppAttemptId()); + schedulerApp.apply((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1], + (Boolean) invocation.getArguments()[2]); + // the proposal of removed request should be rejected + Assert.assertEquals(1, schedulerApp.getLiveContainers().size()); + } catch (Throwable e) { + Assert.fail(); + } + return null; + } + }).when(spyCs).tryCommit(Mockito.any(Resource.class), + Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean()); + + // rm allocates container_2 to reproduce the process that can raise NPE + spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), + Arrays.asList(ResourceRequest.newInstance(priority, "*", + Resources.createResource(1 * GB), 1)), null, + Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); + spyCs.handle(new NodeUpdateSchedulerEvent( + spyCs.getNode(nm.getNodeId()).getRMNode())); + } }