From 1079c371289cd31478ed4bc123c1c4dd846c76ee Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Sat, 1 Feb 2014 04:03:36 +0000 Subject: [PATCH] YARN-1504. RM changes for moving apps between queues (Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1563371 13f79535-47bb-0310-9956-ffa450edef68 --- .../scheduler/ResourceSchedulerWrapper.java | 7 + hadoop-yarn-project/CHANGES.txt | 2 + .../resourcemanager/ClientRMService.java | 71 ++++++- .../server/resourcemanager/RMAuditLogger.java | 1 + .../resourcemanager/rmapp/RMAppEventType.java | 1 + .../resourcemanager/rmapp/RMAppImpl.java | 32 ++++ .../resourcemanager/rmapp/RMAppMoveEvent.java | 44 +++++ .../scheduler/AbstractYarnScheduler.java | 10 +- .../scheduler/YarnScheduler.java | 14 ++ .../scheduler/fair/FairScheduler.java | 5 +- .../scheduler/fifo/FifoScheduler.java | 3 +- .../resourcemanager/TestClientRMService.java | 15 ++ .../resourcemanager/TestMoveApplication.java | 180 ++++++++++++++++++ 13 files changed, 378 insertions(+), 7 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppMoveEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 9a3cb2450e..f923733d5b 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -865,4 +866,10 @@ public class ResourceSchedulerWrapper implements public RMContainer getRMContainer(ContainerId containerId) { return null; } + + @Override + public String moveApplication(ApplicationId appId, String newQueue) + throws YarnException { + return scheduler.moveApplication(appId, newQueue); + } } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index abc56d6961..84634df909 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -12,6 +12,8 @@ Trunk - Unreleased YARN-1498. Common scheduler changes for moving apps between queues (Sandy Ryza) + YARN-1504. RM changes for moving apps between queues (Sandy Ryza) + IMPROVEMENTS OPTIMIZATIONS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 1df67f82a8..8800f290cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -94,6 +94,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -104,6 +106,9 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; + /** * The client interface to the Resource Manager. This module handles all the rpc @@ -686,10 +691,74 @@ public class ClientRMService extends AbstractService implements } } + @SuppressWarnings("unchecked") @Override public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( MoveApplicationAcrossQueuesRequest request) throws YarnException { - throw new UnsupportedOperationException("Move not yet supported"); + ApplicationId applicationId = request.getApplicationId(); + + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + RMAuditLogger.logFailure("UNKNOWN", AuditConstants.MOVE_APP_REQUEST, + "UNKNOWN", "ClientRMService" , "Error getting UGI", + applicationId); + throw RPCUtil.getRemoteException(ie); + } + + RMApp application = this.rmContext.getRMApps().get(applicationId); + if (application == null) { + RMAuditLogger.logFailure(callerUGI.getUserName(), + AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", + "Trying to move an absent application", applicationId); + throw new ApplicationNotFoundException("Trying to move an absent" + + " application " + applicationId); + } + + if (!checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.MODIFY_APP, application)) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.MOVE_APP_REQUEST, + "User doesn't have permissions to " + + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", + AuditConstants.UNAUTHORIZED_USER, applicationId); + throw RPCUtil.getRemoteException(new AccessControlException("User " + + callerUGI.getShortUserName() + " cannot perform operation " + + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); + } + + // Moves only allowed when app is in a state that means it is tracked by + // the scheduler + if (EnumSet.of(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppState.FAILED, + RMAppState.FINAL_SAVING, RMAppState.FINISHING, RMAppState.FINISHED, + RMAppState.KILLED, RMAppState.KILLING, RMAppState.FAILED) + .contains(application.getState())) { + String msg = "App in " + application.getState() + " state cannot be moved."; + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", msg); + throw new YarnException(msg); + } + + SettableFuture future = SettableFuture.create(); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppMoveEvent(applicationId, request.getTargetQueue(), future)); + + try { + Futures.get(future, YarnException.class); + } catch (YarnException ex) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", + ex.getMessage()); + throw ex; + } + + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.MOVE_APP_REQUEST, "ClientRMService" , applicationId); + MoveApplicationAcrossQueuesResponse response = recordFactory + .newRecordInstance(MoveApplicationAcrossQueuesResponse.class); + return response; } private String getRenewerForToken(Token token) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index b9261cac20..f7d12136f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -45,6 +45,7 @@ public class RMAuditLogger { public static final String KILL_APP_REQUEST = "Kill Application Request"; public static final String SUBMIT_APP_REQUEST = "Submit Application Request"; + public static final String MOVE_APP_REQUEST = "Move Application Request"; public static final String FINISH_SUCCESS_APP = "Application Finished - Succeeded"; public static final String FINISH_FAILED_APP = "Application Finished - Failed"; public static final String FINISH_KILLED_APP = "Application Finished - Killed"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java index bb63e6f3f7..3ab5db4bce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java @@ -23,6 +23,7 @@ public enum RMAppEventType { START, RECOVER, KILL, + MOVE, // Move app to a new queue // Source: Scheduler and RMAppManager APP_REJECTED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 59132ce9e8..d4cf416dd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; @@ -166,6 +167,8 @@ public class RMAppImpl implements RMApp, Recoverable { // Transitions from SUBMITTED state .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, + RMAppEventType.MOVE, new RMAppMoveTransition()) .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, RMAppEventType.APP_REJECTED, new FinalSavingTransition( @@ -181,6 +184,8 @@ public class RMAppImpl implements RMApp, Recoverable { // Transitions from ACCEPTED state .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, + RMAppEventType.MOVE, new RMAppMoveTransition()) .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, RMAppEventType.ATTEMPT_REGISTERED) .addTransition(RMAppState.ACCEPTED, @@ -204,6 +209,8 @@ public class RMAppImpl implements RMApp, Recoverable { // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, + RMAppEventType.MOVE, new RMAppMoveTransition()) .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, RMAppEventType.ATTEMPT_UNREGISTERED, new FinalSavingTransition( @@ -692,6 +699,31 @@ public class RMAppImpl implements RMApp, Recoverable { }; } + /** + * Move an app to a new queue. + * This transition must set the result on the Future in the RMAppMoveEvent, + * either as an exception for failure or null for success, or the client will + * be left waiting forever. + */ + @SuppressWarnings("unchecked") + private static final class RMAppMoveTransition extends RMAppTransition { + public void transition(RMAppImpl app, RMAppEvent event) { + RMAppMoveEvent moveEvent = (RMAppMoveEvent) event; + try { + app.queue = app.scheduler.moveApplication(app.applicationId, + moveEvent.getTargetQueue()); + } catch (YarnException ex) { + moveEvent.getResult().setException(ex); + return; + } + + // TODO: Write out change to state store (YARN-1558) + + moveEvent.getResult().set(null); + } + } + + @SuppressWarnings("unchecked") private static final class RMAppRecoveredTransition implements MultipleArcTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppMoveEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppMoveEvent.java new file mode 100644 index 0000000000..5fc63c9294 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppMoveEvent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import com.google.common.util.concurrent.SettableFuture; + +public class RMAppMoveEvent extends RMAppEvent { + private String targetQueue; + private SettableFuture result; + + public RMAppMoveEvent(ApplicationId id, String newQueue, + SettableFuture resultFuture) { + super(id, RMAppEventType.MOVE); + this.targetQueue = newQueue; + this.result = resultFuture; + } + + public String getTargetQueue() { + return targetQueue; + } + + public SettableFuture getResult() { + return result; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index e460f1cb5e..4208d1db5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -27,11 +27,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -public class AbstractYarnScheduler { +public abstract class AbstractYarnScheduler implements ResourceScheduler { protected RMContext rmContext; protected Map applications; @@ -61,4 +62,11 @@ public class AbstractYarnScheduler { public Map getSchedulerApplications() { return applications; } + + @Override + public String moveApplication(ApplicationId appId, String newQueue) + throws YarnException { + throw new YarnException(getClass().getSimpleName() + + " does not support moving apps between queues"); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 4f1cb74c8b..2348603465 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; /** @@ -180,4 +182,16 @@ public interface YarnScheduler extends EventHandler { @LimitedPrivate("yarn") @Unstable public RMContainer getRMContainer(ContainerId containerId); + + /** + * Moves the given application to the given queue + * @param appId + * @param newQueue + * @return the name of the queue the application was placed into + * @throws YarnException if the move cannot be carried out + */ + @LimitedPrivate("yarn") + @Evolving + public String moveApplication(ApplicationId appId, String newQueue) + throws YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3ff3b04e63..b88ad503dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; @@ -75,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; @@ -121,8 +121,7 @@ import com.google.common.annotations.VisibleForTesting; @LimitedPrivate("yarn") @Unstable @SuppressWarnings("unchecked") -public class FairScheduler extends AbstractYarnScheduler implements - ResourceScheduler { +public class FairScheduler extends AbstractYarnScheduler { private boolean initialized; private FairSchedulerConfiguration conf; private Resource minimumAllocation; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 026f22c49c..e33348a10d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -77,7 +77,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @@ -106,7 +105,7 @@ import com.google.common.annotations.VisibleForTesting; @Evolving @SuppressWarnings("unchecked") public class FifoScheduler extends AbstractYarnScheduler implements - ResourceScheduler, Configurable { + Configurable { private static final Log LOG = LogFactory.getLog(FifoScheduler.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 7e3d5fe073..1894a115ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -232,6 +233,20 @@ public class TestClientRMService { "application " + request.getApplicationId()); } } + + @Test (expected = ApplicationNotFoundException.class) + public void testMoveAbsentApplication() throws YarnException { + RMContext rmContext = mock(RMContext.class); + when(rmContext.getRMApps()).thenReturn( + new ConcurrentHashMap()); + ClientRMService rmService = new ClientRMService(rmContext, null, null, + null, null, null); + ApplicationId applicationId = + BuilderUtils.newApplicationId(System.currentTimeMillis(), 0); + MoveApplicationAcrossQueuesRequest request = + MoveApplicationAcrossQueuesRequest.newInstance(applicationId, "newqueue"); + rmService.moveApplicationAcrossQueues(request); + } @Test public void testGetQueueInfo() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java new file mode 100644 index 0000000000..1b60fce4d3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java @@ -0,0 +1,180 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.security.AccessControlException; +import java.security.PrivilegedExceptionAction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestMoveApplication { + private ResourceManager resourceManager = null; + private static boolean failMove; + + @Before + public void setUp() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class, + FifoSchedulerWithMove.class); + conf.set(YarnConfiguration.YARN_ADMIN_ACL, " "); + conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); + resourceManager = new ResourceManager(); + resourceManager.init(conf); + resourceManager.getRMContainerTokenSecretManager().rollMasterKey(); + resourceManager.getRMNMTokenSecretManager().rollMasterKey(); + resourceManager.start(); + failMove = false; + } + + @After + public void tearDown() { + resourceManager.stop(); + } + + @Test + public void testMoveRejectedByScheduler() throws Exception { + failMove = true; + + // Submit application + Application application = new Application("user1", resourceManager); + application.submit(); + + ClientRMService clientRMService = resourceManager.getClientRMService(); + try { + // FIFO scheduler does not support moves + clientRMService.moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest.newInstance( + application.getApplicationId(), "newqueue")); + fail("Should have hit exception"); + } catch (YarnException ex) { + assertEquals("Move not supported", ex.getCause().getMessage()); + } + } + + @Test (timeout = 10000) + public void testMoveTooLate() throws Exception { + // Submit application + Application application = new Application("user1", resourceManager); + ApplicationId appId = application.getApplicationId(); + application.submit(); + + ClientRMService clientRMService = resourceManager.getClientRMService(); + // Kill the application + clientRMService.forceKillApplication( + KillApplicationRequest.newInstance(appId)); + RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId); + // wait until it's dead + while (rmApp.getState() != RMAppState.KILLED) { + Thread.sleep(100); + } + + try { + clientRMService.moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue")); + fail("Should have hit exception"); + } catch (YarnException ex) { + assertEquals(YarnException.class, + ex.getClass()); + assertEquals("App in KILLED state cannot be moved.", ex.getMessage()); + } + } + + @Test (timeout = 5000) + public void testMoveSuccessful() throws Exception { + // Submit application + Application application = new Application("user1", resourceManager); + ApplicationId appId = application.getApplicationId(); + application.submit(); + + // Wait for app to be accepted + RMApp app = resourceManager.rmContext.getRMApps().get(appId); + while (app.getState() != RMAppState.ACCEPTED) { + Thread.sleep(100); + } + + ClientRMService clientRMService = resourceManager.getClientRMService(); + // FIFO scheduler does not support moves + clientRMService.moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue")); + + RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId); + assertEquals("newqueue", rmApp.getQueue()); + } + + @Test + public void testMoveRejectedByPermissions() throws Exception { + failMove = true; + + // Submit application + final Application application = new Application("user1", resourceManager); + application.submit(); + + final ClientRMService clientRMService = resourceManager.getClientRMService(); + try { + UserGroupInformation.createRemoteUser("otheruser").doAs( + new PrivilegedExceptionAction() { + @Override + public MoveApplicationAcrossQueuesResponse run() throws Exception { + return clientRMService.moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest.newInstance( + application.getApplicationId(), "newqueue")); + } + + }); + fail("Should have hit exception"); + } catch (Exception ex) { + assertEquals(AccessControlException.class, ex.getCause().getCause().getClass()); + } + } + + public static class FifoSchedulerWithMove extends FifoScheduler { + @Override + public String moveApplication(ApplicationId appId, String newQueue) + throws YarnException { + if (failMove) { + throw new YarnException("Move not supported"); + } + return newQueue; + } + + + @Override + public synchronized boolean checkAccess(UserGroupInformation callerUGI, + QueueACL acl, String queueName) { + return acl != QueueACL.ADMINISTER_QUEUE; + } + } +}