diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 0db775f674..1bc40c4dcb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -152,7 +152,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppKillByClientEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; @@ -174,8 +173,6 @@ import org.apache.hadoop.yarn.util.UTCClock; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.SettableFuture; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -1191,23 +1188,18 @@ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( } // Moves only allowed when app is in a state that means it is tracked by - // the scheduler - if (EnumSet.of(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppState.FAILED, - RMAppState.FINAL_SAVING, RMAppState.FINISHING, RMAppState.FINISHED, - RMAppState.KILLED, RMAppState.KILLING, RMAppState.FAILED) - .contains(application.getState())) { + // the scheduler. Introducing SUBMITTED state also to this list as there + // could be a corner scenario that app may not be in Scheduler in SUBMITTED + // state. + if (!ACTIVE_APP_STATES.contains(application.getState())) { String msg = "App in " + application.getState() + " state cannot be moved."; RMAuditLogger.logFailure(callerUGI.getShortUserName(), AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", msg); throw new YarnException(msg); } - SettableFuture future = SettableFuture.create(); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppMoveEvent(applicationId, request.getTargetQueue(), future)); - try { - Futures.get(future, YarnException.class); + this.rmAppManager.moveApplicationAcrossQueue(applicationId, request.getTargetQueue()); } catch (YarnException ex) { RMAuditLogger.logFailure(callerUGI.getShortUserName(), AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index ce3da06b8d..4d628ee5b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -498,17 +498,26 @@ public void handle(RMAppManagerEvent event) { ApplicationId applicationId = event.getApplicationId(); LOG.debug("RMAppManager processing event for " + applicationId + " of type " + event.getType()); - switch(event.getType()) { - case APP_COMPLETED: - { - finishApplication(applicationId); - logApplicationSummary(applicationId); - checkAppNumCompletedLimit(); - } + switch (event.getType()) { + case APP_COMPLETED : + finishApplication(applicationId); + logApplicationSummary(applicationId); + checkAppNumCompletedLimit(); break; - default: - LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); + case APP_MOVE : + // moveAllApps from scheduler will fire this event for each of + // those applications which needed to be moved to a new queue. + // Use the standard move application api to do the same. + try { + moveApplicationAcrossQueue(applicationId, + event.getTargetQueueForMove()); + } catch (YarnException e) { + LOG.warn("Move Application has failed: " + e.getMessage()); } + break; + default : + LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); + } } // transaction method. @@ -587,4 +596,87 @@ public void updateApplicationPriority(ApplicationId applicationId, rmContext.getSystemMetricsPublisher().appUpdated(app, System.currentTimeMillis()); } + + /** + * moveToQueue will invoke scheduler api to perform move queue operation. + * + * @param applicationId + * Application Id. + * @param targetQueue + * Target queue to which this app has to be moved. + * @throws YarnException + * Handle exceptions. + */ + public void moveApplicationAcrossQueue(ApplicationId applicationId, String targetQueue) + throws YarnException { + RMApp app = this.rmContext.getRMApps().get(applicationId); + + // Capacity scheduler will directly follow below approach. + // 1. Do a pre-validate check to ensure that changes are fine. + // 2. Update this information to state-store + // 3. Perform real move operation and update in-memory data structures. + synchronized (applicationId) { + if (app.isAppInCompletedStates()) { + return; + } + + String sourceQueue = app.getQueue(); + // 1. pre-validate move application request to check for any access + // violations or other errors. If there are any violations, YarnException + // will be thrown. + rmContext.getScheduler().preValidateMoveApplication(applicationId, + targetQueue); + + // 2. Update to state store with new queue and throw exception is failed. + updateAppDataToStateStore(targetQueue, app, false); + + // 3. Perform the real move application + String queue = ""; + try { + queue = rmContext.getScheduler().moveApplication(applicationId, + targetQueue); + } catch (YarnException e) { + // Revert to source queue since in-memory move has failed. Chances + // of this is very rare as we have already done the pre-validation. + updateAppDataToStateStore(sourceQueue, app, true); + throw e; + } + + // update in-memory + if (queue != null && !queue.isEmpty()) { + app.setQueue(queue); + } + } + + rmContext.getSystemMetricsPublisher().appUpdated(app, + System.currentTimeMillis()); + } + + private void updateAppDataToStateStore(String queue, RMApp app, + boolean toSuppressException) throws YarnException { + // Create a future object to capture exceptions from StateStore. + SettableFuture future = SettableFuture.create(); + + // Update new queue in Submission Context to update to StateStore. + app.getApplicationSubmissionContext().setQueue(queue); + + ApplicationStateData appState = ApplicationStateData.newInstance( + app.getSubmitTime(), app.getStartTime(), + app.getApplicationSubmissionContext(), app.getUser(), + app.getCallerContext()); + appState.setApplicationTimeouts(app.getApplicationTimeouts()); + rmContext.getStateStore().updateApplicationStateSynchronously(appState, + false, future); + + try { + Futures.get(future, YarnException.class); + } catch (YarnException ex) { + if (!toSuppressException) { + throw ex; + } + LOG.error("Statestore update failed for move application '" + + app.getApplicationId() + "' to queue '" + queue + + "' with below exception:" + ex.getMessage()); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java index f1a67819bd..0df3cab47c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEvent.java @@ -24,13 +24,24 @@ public class RMAppManagerEvent extends AbstractEvent { private final ApplicationId appId; + private final String targetQueueForMove; public RMAppManagerEvent(ApplicationId appId, RMAppManagerEventType type) { + this(appId, "", type); + } + + public RMAppManagerEvent(ApplicationId appId, String targetQueueForMove, + RMAppManagerEventType type) { super(type); this.appId = appId; + this.targetQueueForMove = targetQueueForMove; } public ApplicationId getApplicationId() { return this.appId; } + + public String getTargetQueueForMove() { + return this.targetQueueForMove; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java index 1b6a44c7ea..7acf7535fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManagerEventType.java @@ -19,5 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; public enum RMAppManagerEventType { - APP_COMPLETED + APP_COMPLETED, + APP_MOVE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index 2b42638a9f..aa5d6f0971 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -23,7 +23,6 @@ public enum RMAppEventType { START, RECOVER, KILL, - MOVE, // Move app to a new queue // Source: Scheduler and RMAppManager APP_REJECTED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 0bf5f510d2..1f1586aa8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; @@ -247,14 +246,10 @@ RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition()) RMAppEventType.APP_REJECTED, new FinalSavingTransition(new AppRejectedTransition(), RMAppState.FAILED)) - .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, - RMAppEventType.MOVE, new RMAppMoveTransition()) // Transitions from SUBMITTED state .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, - RMAppEventType.MOVE, new RMAppMoveTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, @@ -271,8 +266,6 @@ RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition()) // Transitions from ACCEPTED state .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.MOVE, new RMAppMoveTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, @@ -300,8 +293,6 @@ RMAppEventType.KILL, new KillAttemptTransition()) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, - RMAppEventType.MOVE, new RMAppMoveTransition()) .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, @@ -338,7 +329,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, - RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE)) + RMAppEventType.APP_NEW_SAVED)) // Transitions from FINISHING state .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, @@ -353,7 +344,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) EnumSet.of(RMAppEventType.NODE_UPDATE, // ignore Kill/Move as we have already saved the final Finished state // in state store. - RMAppEventType.KILL, RMAppEventType.MOVE)) + RMAppEventType.KILL)) // Transitions from KILLING state .addTransition(RMAppState.KILLING, RMAppState.KILLING, @@ -383,7 +374,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_REGISTERED, RMAppEventType.APP_UPDATE_SAVED, - RMAppEventType.KILL, RMAppEventType.MOVE)) + RMAppEventType.KILL)) // Transitions from FINISHED state // ignorable transitions @@ -395,7 +386,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_UNREGISTERED, RMAppEventType.ATTEMPT_FINISHED, - RMAppEventType.KILL, RMAppEventType.MOVE)) + RMAppEventType.KILL)) // Transitions from FAILED state // ignorable transitions @@ -403,8 +394,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) .addTransition(RMAppState.FAILED, RMAppState.FAILED, - EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE, - RMAppEventType.MOVE)) + EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) // Transitions from KILLED state // ignorable transitions @@ -417,7 +407,7 @@ RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) EnumSet.of(RMAppEventType.APP_ACCEPTED, RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, - RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE)) + RMAppEventType.NODE_UPDATE)) .installTopology(); @@ -1077,32 +1067,6 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } - /** - * Move an app to a new queue. - * This transition must set the result on the Future in the RMAppMoveEvent, - * either as an exception for failure or null for success, or the client will - * be left waiting forever. - */ - private static final class RMAppMoveTransition extends RMAppTransition { - public void transition(RMAppImpl app, RMAppEvent event) { - RMAppMoveEvent moveEvent = (RMAppMoveEvent) event; - try { - app.queue = app.scheduler.moveApplication(app.applicationId, - moveEvent.getTargetQueue()); - } catch (YarnException ex) { - moveEvent.getResult().setException(ex); - return; - } - - app.rmContext.getSystemMetricsPublisher().appUpdated(app, - app.systemClock.getTime()); - - // TODO: Write out change to state store (YARN-1558) - // Also take care of RM failover - moveEvent.getResult().set(null); - } - } - // synchronously recover attempt to ensure any incoming external events // to be processed after the attempt processes the recover event. private void recoverAppAttempts() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppMoveEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppMoveEvent.java deleted file mode 100644 index 5fc63c9294..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppMoveEvent.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.rmapp; - -import org.apache.hadoop.yarn.api.records.ApplicationId; - -import com.google.common.util.concurrent.SettableFuture; - -public class RMAppMoveEvent extends RMAppEvent { - private String targetQueue; - private SettableFuture result; - - public RMAppMoveEvent(ApplicationId id, String newQueue, - SettableFuture resultFuture) { - super(id, RMAppEventType.MOVE); - this.targetQueue = newQueue; - this.result = resultFuture; - } - - public String getTargetQueue() { - return targetQueue; - } - - public SettableFuture getResult() { - return result; - } - -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 4818ea3568..c0cc6b0a0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -64,7 +66,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -360,6 +361,13 @@ public String moveApplication(ApplicationId appId, String newQueue) + " does not support moving apps between queues"); } + @Override + public void preValidateMoveApplication(ApplicationId appId, + String newQueue) throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support pre-validation of moving apps between queues"); + } + public void removeQueue(String queueName) throws YarnException { throw new YarnException(getClass().getSimpleName() + " does not support removing queues"); @@ -675,10 +683,10 @@ public void moveAllApps(String sourceQueue, String destQueue) throw new YarnException(errMsg); } // generate move events for each pending/running app - for (ApplicationAttemptId app : apps) { - SettableFuture future = SettableFuture.create(); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppMoveEvent(app.getApplicationId(), destQueue, future)); + for (ApplicationAttemptId appAttemptId : apps) { + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppManagerEvent(appAttemptId.getApplicationId(), + destQueue, RMAppManagerEventType.APP_MOVE)); } } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 7167384ec2..ea1ae60007 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -229,6 +229,17 @@ boolean checkAccess(UserGroupInformation callerUGI, public String moveApplication(ApplicationId appId, String newQueue) throws YarnException; + /** + * + * @param appId Application ID + * @param newQueue Target QueueName + * @throws YarnException if the pre-validation for move cannot be carried out + */ + @LimitedPrivate("yarn") + @Evolving + public void preValidateMoveApplication(ApplicationId appId, + String newQueue) throws YarnException; + /** * Completely drain sourceQueue of applications, by moving all of them to * destQueue. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index dd2f0d9ee4..33723929e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -32,8 +32,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -67,6 +69,7 @@ import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { + private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); volatile CSQueue parent; final String queueName; @@ -837,4 +840,10 @@ public boolean accept(Resource cluster, return true; } + + @Override + public void validateSubmitApplication(ApplicationId applicationId, + String userName, String queue) throws AccessControlException { + // Dummy implementation + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index baf60e4f30..550e206534 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -362,4 +362,14 @@ void apply(Resource cluster, * @return readLock of corresponding queue. */ public ReentrantReadWriteLock.ReadLock getReadLock(); + + /** + * Validate submitApplication api so that moveApplication do a pre-check. + * @param applicationId Application ID + * @param userName User Name + * @param queue Queue Name + * @throws AccessControlException if any acl violation is there. + */ + public void validateSubmitApplication(ApplicationId applicationId, + String userName, String queue) throws AccessControlException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e42b20cb6c..9a73a65182 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2049,9 +2049,8 @@ public String moveApplication(ApplicationId appId, sourceQueueName); String destQueueName = handleMoveToPlanQueue(targetQueueName); LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName); - // Validation check - ACLs, submission limits for user & queue + String user = app.getUser(); - checkQueuePartition(app, dest); try { dest.submitApplication(appId, user, destQueueName); } catch (AccessControlException e) { @@ -2079,6 +2078,30 @@ public String moveApplication(ApplicationId appId, } } + @Override + public void preValidateMoveApplication(ApplicationId appId, + String newQueue) throws YarnException { + try { + writeLock.lock(); + FiCaSchedulerApp app = getApplicationAttempt( + ApplicationAttemptId.newInstance(appId, 0)); + String sourceQueueName = app.getQueue().getQueueName(); + this.queueManager.getAndCheckLeafQueue(sourceQueueName); + String destQueueName = handleMoveToPlanQueue(newQueue); + LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName); + // Validation check - ACLs, submission limits for user & queue + String user = app.getUser(); + checkQueuePartition(app, dest); + try { + dest.validateSubmitApplication(appId, user, destQueueName); + } catch (AccessControlException e) { + throw new YarnException(e); + } + } finally { + writeLock.unlock(); + } + } + /** * Check application can be moved to queue with labels enabled. All labels in * application life time will be checked diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9661206f1e..1c6471f512 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -564,6 +564,21 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, public void submitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException { // Careful! Locking order is important! + validateSubmitApplication(applicationId, userName, queue); + + // Inform the parent queue + try { + getParent().submitApplication(applicationId, userName, queue); + } catch (AccessControlException ace) { + LOG.info("Failed to submit application to parent-queue: " + + getParent().getQueuePath(), ace); + throw ace; + } + + } + + public void validateSubmitApplication(ApplicationId applicationId, + String userName, String queue) throws AccessControlException { try { writeLock.lock(); // Check if the queue is accepting jobs @@ -598,15 +613,13 @@ public void submitApplication(ApplicationId applicationId, String userName, writeLock.unlock(); } - // Inform the parent queue try { - getParent().submitApplication(applicationId, userName, queue); + getParent().validateSubmitApplication(applicationId, userName, queue); } catch (AccessControlException ace) { LOG.info("Failed to submit application to parent-queue: " + getParent().getQueuePath(), ace); throw ace; } - } public Resource getAMResourceLimit() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index fd0c68bbcb..0ba4edef2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -340,16 +340,7 @@ public void submitApplication(ApplicationId applicationId, String user, try { writeLock.lock(); // Sanity check - if (queue.equals(queueName)) { - throw new AccessControlException( - "Cannot submit application " + "to non-leaf queue: " + queueName); - } - - if (state != QueueState.RUNNING) { - throw new AccessControlException("Queue " + getQueuePath() - + " is STOPPED. Cannot accept submission of application: " - + applicationId); - } + validateSubmitApplication(applicationId, user, queue); addApplication(applicationId, user); } finally { @@ -369,6 +360,24 @@ public void submitApplication(ApplicationId applicationId, String user, } } + public void validateSubmitApplication(ApplicationId applicationId, + String userName, String queue) throws AccessControlException { + try { + writeLock.lock(); + if (queue.equals(queueName)) { + throw new AccessControlException( + "Cannot submit application " + "to non-leaf queue: " + queueName); + } + + if (state != QueueState.RUNNING) { + throw new AccessControlException("Queue " + getQueuePath() + + " is STOPPED. Cannot accept submission of application: " + + applicationId); + } + } finally { + writeLock.unlock(); + } + } @Override public void submitApplicationAttempt(FiCaSchedulerApp application, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index fbcac76c9a..03df5d4b0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1544,7 +1544,41 @@ public String moveApplication(ApplicationId appId, writeLock.unlock(); } } - + + @Override + public void preValidateMoveApplication(ApplicationId appId, String newQueue) + throws YarnException { + try { + writeLock.lock(); + SchedulerApplication app = applications.get(appId); + if (app == null) { + throw new YarnException("App to be moved " + appId + " not found."); + } + + FSAppAttempt attempt = app.getCurrentAppAttempt(); + // To serialize with FairScheduler#allocate, synchronize on app attempt + + try { + attempt.getWriteLock().lock(); + FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); + String destQueueName = handleMoveToPlanQueue(newQueue); + FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false); + if (targetQueue == null) { + throw new YarnException("Target queue " + newQueue + + " not found or is not a leaf queue."); + } + + if (oldQueue.isRunnableApp(attempt)) { + verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue); + } + } finally { + attempt.getWriteLock().unlock(); + } + } finally { + writeLock.unlock(); + } + } + private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app, FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException { String queueName = targetQueue.getQueueName(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java index d2bde80a7e..05b25df3a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java @@ -87,10 +87,10 @@ public void testMoveRejectedByScheduler() throws Exception { application.getApplicationId(), "newqueue")); fail("Should have hit exception"); } catch (YarnException ex) { - assertEquals("Move not supported", ex.getCause().getMessage()); + assertEquals("Move not supported", ex.getMessage()); } } - + @Test (timeout = 10000) public void testMoveTooLate() throws Exception { // Submit application @@ -178,5 +178,13 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName) { return acl != QueueACL.ADMINISTER_QUEUE; } + + @Override + public void preValidateMoveApplication(ApplicationId appId, String newQueue) + throws YarnException { + if (failMove) { + throw new YarnException("Move not supported"); + } + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 439e9dfed7..0a864fd5a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -457,6 +457,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CapacityScheduler scheduler = ((CapacityScheduler) rm.getResourceScheduler()); try { + scheduler.preValidateMoveApplication(app1.getApplicationId(), "a2"); scheduler.moveApplication(app1.getApplicationId(), "a2"); fail("Should throw exception since target queue doesnt have " + "required labels");