YARN-1504. RM changes for moving apps between queues (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1563371 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
140246bad8
commit
1079c37128
@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
@ -865,4 +866,10 @@ public class ResourceSchedulerWrapper implements
|
|||||||
public RMContainer getRMContainer(ContainerId containerId) {
|
public RMContainer getRMContainer(ContainerId containerId) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String moveApplication(ApplicationId appId, String newQueue)
|
||||||
|
throws YarnException {
|
||||||
|
return scheduler.moveApplication(appId, newQueue);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,8 @@ Trunk - Unreleased
|
|||||||
YARN-1498. Common scheduler changes for moving apps between queues (Sandy
|
YARN-1498. Common scheduler changes for moving apps between queues (Sandy
|
||||||
Ryza)
|
Ryza)
|
||||||
|
|
||||||
|
YARN-1504. RM changes for moving apps between queues (Sandy Ryza)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
@ -94,6 +94,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
@ -104,6 +106,9 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The client interface to the Resource Manager. This module handles all the rpc
|
* The client interface to the Resource Manager. This module handles all the rpc
|
||||||
@ -686,10 +691,74 @@ public class ClientRMService extends AbstractService implements
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
||||||
MoveApplicationAcrossQueuesRequest request) throws YarnException {
|
MoveApplicationAcrossQueuesRequest request) throws YarnException {
|
||||||
throw new UnsupportedOperationException("Move not yet supported");
|
ApplicationId applicationId = request.getApplicationId();
|
||||||
|
|
||||||
|
UserGroupInformation callerUGI;
|
||||||
|
try {
|
||||||
|
callerUGI = UserGroupInformation.getCurrentUser();
|
||||||
|
} catch (IOException ie) {
|
||||||
|
LOG.info("Error getting UGI ", ie);
|
||||||
|
RMAuditLogger.logFailure("UNKNOWN", AuditConstants.MOVE_APP_REQUEST,
|
||||||
|
"UNKNOWN", "ClientRMService" , "Error getting UGI",
|
||||||
|
applicationId);
|
||||||
|
throw RPCUtil.getRemoteException(ie);
|
||||||
|
}
|
||||||
|
|
||||||
|
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||||
|
if (application == null) {
|
||||||
|
RMAuditLogger.logFailure(callerUGI.getUserName(),
|
||||||
|
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
|
||||||
|
"Trying to move an absent application", applicationId);
|
||||||
|
throw new ApplicationNotFoundException("Trying to move an absent"
|
||||||
|
+ " application " + applicationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!checkAccess(callerUGI, application.getUser(),
|
||||||
|
ApplicationAccessType.MODIFY_APP, application)) {
|
||||||
|
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||||
|
AuditConstants.MOVE_APP_REQUEST,
|
||||||
|
"User doesn't have permissions to "
|
||||||
|
+ ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService",
|
||||||
|
AuditConstants.UNAUTHORIZED_USER, applicationId);
|
||||||
|
throw RPCUtil.getRemoteException(new AccessControlException("User "
|
||||||
|
+ callerUGI.getShortUserName() + " cannot perform operation "
|
||||||
|
+ ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Moves only allowed when app is in a state that means it is tracked by
|
||||||
|
// the scheduler
|
||||||
|
if (EnumSet.of(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppState.FAILED,
|
||||||
|
RMAppState.FINAL_SAVING, RMAppState.FINISHING, RMAppState.FINISHED,
|
||||||
|
RMAppState.KILLED, RMAppState.KILLING, RMAppState.FAILED)
|
||||||
|
.contains(application.getState())) {
|
||||||
|
String msg = "App in " + application.getState() + " state cannot be moved.";
|
||||||
|
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||||
|
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService", msg);
|
||||||
|
throw new YarnException(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
SettableFuture<Object> future = SettableFuture.create();
|
||||||
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMAppMoveEvent(applicationId, request.getTargetQueue(), future));
|
||||||
|
|
||||||
|
try {
|
||||||
|
Futures.get(future, YarnException.class);
|
||||||
|
} catch (YarnException ex) {
|
||||||
|
RMAuditLogger.logFailure(callerUGI.getShortUserName(),
|
||||||
|
AuditConstants.MOVE_APP_REQUEST, "UNKNOWN", "ClientRMService",
|
||||||
|
ex.getMessage());
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
|
||||||
|
RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
|
||||||
|
AuditConstants.MOVE_APP_REQUEST, "ClientRMService" , applicationId);
|
||||||
|
MoveApplicationAcrossQueuesResponse response = recordFactory
|
||||||
|
.newRecordInstance(MoveApplicationAcrossQueuesResponse.class);
|
||||||
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
|
private String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
|
||||||
|
@ -45,6 +45,7 @@ public class RMAuditLogger {
|
|||||||
|
|
||||||
public static final String KILL_APP_REQUEST = "Kill Application Request";
|
public static final String KILL_APP_REQUEST = "Kill Application Request";
|
||||||
public static final String SUBMIT_APP_REQUEST = "Submit Application Request";
|
public static final String SUBMIT_APP_REQUEST = "Submit Application Request";
|
||||||
|
public static final String MOVE_APP_REQUEST = "Move Application Request";
|
||||||
public static final String FINISH_SUCCESS_APP = "Application Finished - Succeeded";
|
public static final String FINISH_SUCCESS_APP = "Application Finished - Succeeded";
|
||||||
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
|
public static final String FINISH_FAILED_APP = "Application Finished - Failed";
|
||||||
public static final String FINISH_KILLED_APP = "Application Finished - Killed";
|
public static final String FINISH_KILLED_APP = "Application Finished - Killed";
|
||||||
|
@ -23,6 +23,7 @@ public enum RMAppEventType {
|
|||||||
START,
|
START,
|
||||||
RECOVER,
|
RECOVER,
|
||||||
KILL,
|
KILL,
|
||||||
|
MOVE, // Move app to a new queue
|
||||||
|
|
||||||
// Source: Scheduler and RMAppManager
|
// Source: Scheduler and RMAppManager
|
||||||
APP_REJECTED,
|
APP_REJECTED,
|
||||||
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
||||||
@ -166,6 +167,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
// Transitions from SUBMITTED state
|
// Transitions from SUBMITTED state
|
||||||
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
||||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||||
|
.addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
|
||||||
|
RMAppEventType.MOVE, new RMAppMoveTransition())
|
||||||
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
|
.addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
|
||||||
RMAppEventType.APP_REJECTED,
|
RMAppEventType.APP_REJECTED,
|
||||||
new FinalSavingTransition(
|
new FinalSavingTransition(
|
||||||
@ -181,6 +184,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
// Transitions from ACCEPTED state
|
// Transitions from ACCEPTED state
|
||||||
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
||||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||||
|
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
|
||||||
|
RMAppEventType.MOVE, new RMAppMoveTransition())
|
||||||
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
|
.addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
|
||||||
RMAppEventType.ATTEMPT_REGISTERED)
|
RMAppEventType.ATTEMPT_REGISTERED)
|
||||||
.addTransition(RMAppState.ACCEPTED,
|
.addTransition(RMAppState.ACCEPTED,
|
||||||
@ -204,6 +209,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
// Transitions from RUNNING state
|
// Transitions from RUNNING state
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
||||||
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
|
||||||
|
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
|
||||||
|
RMAppEventType.MOVE, new RMAppMoveTransition())
|
||||||
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
|
.addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
|
||||||
RMAppEventType.ATTEMPT_UNREGISTERED,
|
RMAppEventType.ATTEMPT_UNREGISTERED,
|
||||||
new FinalSavingTransition(
|
new FinalSavingTransition(
|
||||||
@ -692,6 +699,31 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move an app to a new queue.
|
||||||
|
* This transition must set the result on the Future in the RMAppMoveEvent,
|
||||||
|
* either as an exception for failure or null for success, or the client will
|
||||||
|
* be left waiting forever.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static final class RMAppMoveTransition extends RMAppTransition {
|
||||||
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
||||||
|
RMAppMoveEvent moveEvent = (RMAppMoveEvent) event;
|
||||||
|
try {
|
||||||
|
app.queue = app.scheduler.moveApplication(app.applicationId,
|
||||||
|
moveEvent.getTargetQueue());
|
||||||
|
} catch (YarnException ex) {
|
||||||
|
moveEvent.getResult().setException(ex);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Write out change to state store (YARN-1558)
|
||||||
|
|
||||||
|
moveEvent.getResult().set(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
private static final class RMAppRecoveredTransition implements
|
private static final class RMAppRecoveredTransition implements
|
||||||
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
|
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
|
||||||
|
|
||||||
|
@ -0,0 +1,44 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
|
||||||
|
public class RMAppMoveEvent extends RMAppEvent {
|
||||||
|
private String targetQueue;
|
||||||
|
private SettableFuture<Object> result;
|
||||||
|
|
||||||
|
public RMAppMoveEvent(ApplicationId id, String newQueue,
|
||||||
|
SettableFuture<Object> resultFuture) {
|
||||||
|
super(id, RMAppEventType.MOVE);
|
||||||
|
this.targetQueue = newQueue;
|
||||||
|
this.result = resultFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTargetQueue() {
|
||||||
|
return targetQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SettableFuture<Object> getResult() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -27,11 +27,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
|
||||||
public class AbstractYarnScheduler {
|
public abstract class AbstractYarnScheduler implements ResourceScheduler {
|
||||||
|
|
||||||
protected RMContext rmContext;
|
protected RMContext rmContext;
|
||||||
protected Map<ApplicationId, SchedulerApplication> applications;
|
protected Map<ApplicationId, SchedulerApplication> applications;
|
||||||
@ -61,4 +62,11 @@ public class AbstractYarnScheduler {
|
|||||||
public Map<ApplicationId, SchedulerApplication> getSchedulerApplications() {
|
public Map<ApplicationId, SchedulerApplication> getSchedulerApplications() {
|
||||||
return applications;
|
return applications;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String moveApplication(ApplicationId appId, String newQueue)
|
||||||
|
throws YarnException {
|
||||||
|
throw new YarnException(getClass().getSimpleName()
|
||||||
|
+ " does not support moving apps between queues");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
|
|||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -180,4 +182,16 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
|||||||
@LimitedPrivate("yarn")
|
@LimitedPrivate("yarn")
|
||||||
@Unstable
|
@Unstable
|
||||||
public RMContainer getRMContainer(ContainerId containerId);
|
public RMContainer getRMContainer(ContainerId containerId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Moves the given application to the given queue
|
||||||
|
* @param appId
|
||||||
|
* @param newQueue
|
||||||
|
* @return the name of the queue the application was placed into
|
||||||
|
* @throws YarnException if the move cannot be carried out
|
||||||
|
*/
|
||||||
|
@LimitedPrivate("yarn")
|
||||||
|
@Evolving
|
||||||
|
public String moveApplication(ApplicationId appId, String newQueue)
|
||||||
|
throws YarnException;
|
||||||
}
|
}
|
||||||
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||||
@ -75,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
@ -121,8 +121,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||||||
@LimitedPrivate("yarn")
|
@LimitedPrivate("yarn")
|
||||||
@Unstable
|
@Unstable
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class FairScheduler extends AbstractYarnScheduler implements
|
public class FairScheduler extends AbstractYarnScheduler {
|
||||||
ResourceScheduler {
|
|
||||||
private boolean initialized;
|
private boolean initialized;
|
||||||
private FairSchedulerConfiguration conf;
|
private FairSchedulerConfiguration conf;
|
||||||
private Resource minimumAllocation;
|
private Resource minimumAllocation;
|
||||||
|
@ -77,7 +77,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||||
@ -106,7 +105,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||||||
@Evolving
|
@Evolving
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class FifoScheduler extends AbstractYarnScheduler implements
|
public class FifoScheduler extends AbstractYarnScheduler implements
|
||||||
ResourceScheduler, Configurable {
|
Configurable {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
|
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
|
||||||
|
|
||||||
|
@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
|||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
@ -232,6 +233,20 @@ public class TestClientRMService {
|
|||||||
"application " + request.getApplicationId());
|
"application " + request.getApplicationId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (expected = ApplicationNotFoundException.class)
|
||||||
|
public void testMoveAbsentApplication() throws YarnException {
|
||||||
|
RMContext rmContext = mock(RMContext.class);
|
||||||
|
when(rmContext.getRMApps()).thenReturn(
|
||||||
|
new ConcurrentHashMap<ApplicationId, RMApp>());
|
||||||
|
ClientRMService rmService = new ClientRMService(rmContext, null, null,
|
||||||
|
null, null, null);
|
||||||
|
ApplicationId applicationId =
|
||||||
|
BuilderUtils.newApplicationId(System.currentTimeMillis(), 0);
|
||||||
|
MoveApplicationAcrossQueuesRequest request =
|
||||||
|
MoveApplicationAcrossQueuesRequest.newInstance(applicationId, "newqueue");
|
||||||
|
rmService.moveApplicationAcrossQueues(request);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetQueueInfo() throws Exception {
|
public void testGetQueueInfo() throws Exception {
|
||||||
|
@ -0,0 +1,180 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.security.AccessControlException;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestMoveApplication {
|
||||||
|
private ResourceManager resourceManager = null;
|
||||||
|
private static boolean failMove;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class,
|
||||||
|
FifoSchedulerWithMove.class);
|
||||||
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, " ");
|
||||||
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||||
|
resourceManager = new ResourceManager();
|
||||||
|
resourceManager.init(conf);
|
||||||
|
resourceManager.getRMContainerTokenSecretManager().rollMasterKey();
|
||||||
|
resourceManager.getRMNMTokenSecretManager().rollMasterKey();
|
||||||
|
resourceManager.start();
|
||||||
|
failMove = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
resourceManager.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMoveRejectedByScheduler() throws Exception {
|
||||||
|
failMove = true;
|
||||||
|
|
||||||
|
// Submit application
|
||||||
|
Application application = new Application("user1", resourceManager);
|
||||||
|
application.submit();
|
||||||
|
|
||||||
|
ClientRMService clientRMService = resourceManager.getClientRMService();
|
||||||
|
try {
|
||||||
|
// FIFO scheduler does not support moves
|
||||||
|
clientRMService.moveApplicationAcrossQueues(
|
||||||
|
MoveApplicationAcrossQueuesRequest.newInstance(
|
||||||
|
application.getApplicationId(), "newqueue"));
|
||||||
|
fail("Should have hit exception");
|
||||||
|
} catch (YarnException ex) {
|
||||||
|
assertEquals("Move not supported", ex.getCause().getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 10000)
|
||||||
|
public void testMoveTooLate() throws Exception {
|
||||||
|
// Submit application
|
||||||
|
Application application = new Application("user1", resourceManager);
|
||||||
|
ApplicationId appId = application.getApplicationId();
|
||||||
|
application.submit();
|
||||||
|
|
||||||
|
ClientRMService clientRMService = resourceManager.getClientRMService();
|
||||||
|
// Kill the application
|
||||||
|
clientRMService.forceKillApplication(
|
||||||
|
KillApplicationRequest.newInstance(appId));
|
||||||
|
RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
|
||||||
|
// wait until it's dead
|
||||||
|
while (rmApp.getState() != RMAppState.KILLED) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
clientRMService.moveApplicationAcrossQueues(
|
||||||
|
MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
|
||||||
|
fail("Should have hit exception");
|
||||||
|
} catch (YarnException ex) {
|
||||||
|
assertEquals(YarnException.class,
|
||||||
|
ex.getClass());
|
||||||
|
assertEquals("App in KILLED state cannot be moved.", ex.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 5000)
|
||||||
|
public void testMoveSuccessful() throws Exception {
|
||||||
|
// Submit application
|
||||||
|
Application application = new Application("user1", resourceManager);
|
||||||
|
ApplicationId appId = application.getApplicationId();
|
||||||
|
application.submit();
|
||||||
|
|
||||||
|
// Wait for app to be accepted
|
||||||
|
RMApp app = resourceManager.rmContext.getRMApps().get(appId);
|
||||||
|
while (app.getState() != RMAppState.ACCEPTED) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
ClientRMService clientRMService = resourceManager.getClientRMService();
|
||||||
|
// FIFO scheduler does not support moves
|
||||||
|
clientRMService.moveApplicationAcrossQueues(
|
||||||
|
MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
|
||||||
|
|
||||||
|
RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
|
||||||
|
assertEquals("newqueue", rmApp.getQueue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMoveRejectedByPermissions() throws Exception {
|
||||||
|
failMove = true;
|
||||||
|
|
||||||
|
// Submit application
|
||||||
|
final Application application = new Application("user1", resourceManager);
|
||||||
|
application.submit();
|
||||||
|
|
||||||
|
final ClientRMService clientRMService = resourceManager.getClientRMService();
|
||||||
|
try {
|
||||||
|
UserGroupInformation.createRemoteUser("otheruser").doAs(
|
||||||
|
new PrivilegedExceptionAction<MoveApplicationAcrossQueuesResponse>() {
|
||||||
|
@Override
|
||||||
|
public MoveApplicationAcrossQueuesResponse run() throws Exception {
|
||||||
|
return clientRMService.moveApplicationAcrossQueues(
|
||||||
|
MoveApplicationAcrossQueuesRequest.newInstance(
|
||||||
|
application.getApplicationId(), "newqueue"));
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
fail("Should have hit exception");
|
||||||
|
} catch (Exception ex) {
|
||||||
|
assertEquals(AccessControlException.class, ex.getCause().getCause().getClass());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class FifoSchedulerWithMove extends FifoScheduler {
|
||||||
|
@Override
|
||||||
|
public String moveApplication(ApplicationId appId, String newQueue)
|
||||||
|
throws YarnException {
|
||||||
|
if (failMove) {
|
||||||
|
throw new YarnException("Move not supported");
|
||||||
|
}
|
||||||
|
return newQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean checkAccess(UserGroupInformation callerUGI,
|
||||||
|
QueueACL acl, String queueName) {
|
||||||
|
return acl != QueueACL.ADMINISTER_QUEUE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user