YARN-2883. Queuing of container requests in the NM. (Konstantinos Karanasos and Arun Suresh via kasha)

This commit is contained in:
Karthik Kambatla 2016-04-20 09:50:37 -07:00
parent af9bdbe447
commit c8172f5f14
20 changed files with 1271 additions and 144 deletions

View File

@ -34,5 +34,8 @@ public enum ContainerState {
RUNNING, RUNNING,
/** Completed container */ /** Completed container */
COMPLETE COMPLETE,
/** Queued at the NM. */
QUEUED
} }

View File

@ -659,6 +659,11 @@ public static boolean isAclEnabled(Configuration conf) {
/** Prefix for all node manager configs.*/ /** Prefix for all node manager configs.*/
public static final String NM_PREFIX = "yarn.nodemanager."; public static final String NM_PREFIX = "yarn.nodemanager.";
/** Enable Queuing of <code>OPPORTUNISTIC</code> containers. */
public static final String NM_CONTAINER_QUEUING_ENABLED = NM_PREFIX
+ "container-queuing-enabled";
public static final boolean NM_CONTAINER_QUEUING_ENABLED_DEFAULT = false;
/** Environment variables that will be sent to containers.*/ /** Environment variables that will be sent to containers.*/
public static final String NM_ADMIN_USER_ENV = NM_PREFIX + "admin-env"; public static final String NM_ADMIN_USER_ENV = NM_PREFIX + "admin-env";
public static final String DEFAULT_NM_ADMIN_USER_ENV = "MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX"; public static final String DEFAULT_NM_ADMIN_USER_ENV = "MALLOC_ARENA_MAX=$MALLOC_ARENA_MAX";

View File

@ -82,6 +82,7 @@ enum ContainerStateProto {
C_NEW = 1; C_NEW = 1;
C_RUNNING = 2; C_RUNNING = 2;
C_COMPLETE = 3; C_COMPLETE = 3;
C_QUEUED = 4;
} }
message ContainerProto { message ContainerProto {

View File

@ -964,6 +964,13 @@
<value>4</value> <value>4</value>
</property> </property>
<property>
<description>Enable Queuing of OPPORTUNISTIC containers on the
nodemanager.</description>
<name>yarn.nodemanager.container-queuing-enabled</name>
<value>false</value>
</property>
<property> <property>
<description> <description>
Number of seconds after an application finishes before the nodemanager's Number of seconds after an application finishes before the nodemanager's

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
@ -215,6 +216,13 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
public static ContainerStatus newContainerStatus(ContainerId containerId, public static ContainerStatus newContainerStatus(ContainerId containerId,
ContainerState containerState, String diagnostics, int exitStatus, ContainerState containerState, String diagnostics, int exitStatus,
Resource capability) { Resource capability) {
return newContainerStatus(containerId, containerState, diagnostics,
exitStatus, capability, ExecutionType.GUARANTEED);
}
public static ContainerStatus newContainerStatus(ContainerId containerId,
ContainerState containerState, String diagnostics, int exitStatus,
Resource capability, ExecutionType executionType) {
ContainerStatus containerStatus = recordFactory ContainerStatus containerStatus = recordFactory
.newRecordInstance(ContainerStatus.class); .newRecordInstance(ContainerStatus.class);
containerStatus.setState(containerState); containerStatus.setState(containerState);
@ -222,6 +230,7 @@ public static ContainerStatus newContainerStatus(ContainerId containerId,
containerStatus.setDiagnostics(diagnostics); containerStatus.setDiagnostics(diagnostics);
containerStatus.setExitStatus(exitStatus); containerStatus.setExitStatus(exitStatus);
containerStatus.setCapability(capability); containerStatus.setCapability(capability);
containerStatus.setExecutionType(executionType);
return containerStatus; return containerStatus;
} }

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@ -42,6 +43,15 @@
*/ */
public interface Context { public interface Context {
/**
* Interface exposing methods related to the queuing of containers in the NM.
*/
interface QueuingContext {
ConcurrentMap<ContainerId, ContainerTokenIdentifier> getQueuedContainers();
ConcurrentMap<ContainerTokenIdentifier, String> getKilledQueuedContainers();
}
/** /**
* Return the nodeId. Usable only when the ContainerManager is started. * Return the nodeId. Usable only when the ContainerManager is started.
* *
@ -89,4 +99,11 @@ public interface Context {
getLogAggregationStatusForApps(); getLogAggregationStatusForApps();
NodeStatusUpdater getNodeStatusUpdater(); NodeStatusUpdater getNodeStatusUpdater();
/**
* Returns a <code>QueuingContext</code> that provides information about the
* number of Containers Queued as well as the number of Containers that were
* queued and killed.
*/
QueuingContext getQueuingContext();
} }

View File

@ -57,11 +57,13 @@
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
@ -170,8 +172,14 @@ protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del, ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) { LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED,
metrics, dirsHandler); YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) {
return new QueuingContainerManagerImpl(context, exec, del,
nodeStatusUpdater, metrics, dirsHandler);
} else {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, dirsHandler);
}
} }
protected WebServer createWebServer(Context nmContext, protected WebServer createWebServer(Context nmContext,
@ -461,6 +469,8 @@ public static class NMContext implements Context {
logAggregationReportForApps; logAggregationReportForApps;
private NodeStatusUpdater nodeStatusUpdater; private NodeStatusUpdater nodeStatusUpdater;
private final QueuingContext queuingContext;
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager,
LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@ -475,6 +485,7 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
this.stateStore = stateStore; this.stateStore = stateStore;
this.logAggregationReportForApps = new ConcurrentLinkedQueue< this.logAggregationReportForApps = new ConcurrentLinkedQueue<
LogAggregationReport>(); LogAggregationReport>();
this.queuingContext = new QueuingNMContext();
} }
/** /**
@ -595,8 +606,35 @@ public NodeStatusUpdater getNodeStatusUpdater() {
public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) { public void setNodeStatusUpdater(NodeStatusUpdater nodeStatusUpdater) {
this.nodeStatusUpdater = nodeStatusUpdater; this.nodeStatusUpdater = nodeStatusUpdater;
} }
@Override
public QueuingContext getQueuingContext() {
return this.queuingContext;
}
} }
/**
* Class that keeps the context for containers queued at the NM.
*/
public static class QueuingNMContext implements Context.QueuingContext {
protected final ConcurrentMap<ContainerId, ContainerTokenIdentifier>
queuedContainers = new ConcurrentSkipListMap<>();
protected final ConcurrentMap<ContainerTokenIdentifier, String>
killedQueuedContainers = new ConcurrentHashMap<>();
@Override
public ConcurrentMap<ContainerId, ContainerTokenIdentifier>
getQueuedContainers() {
return this.queuedContainers;
}
@Override
public ConcurrentMap<ContainerTokenIdentifier, String>
getKilledQueuedContainers() {
return this.killedQueuedContainers;
}
}
/** /**
* @return the node health checker * @return the node health checker

View File

@ -160,11 +160,11 @@ public class ContainerManagerImpl extends CompositeService implements
private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class); private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class);
static final String INVALID_NMTOKEN_MSG = "Invalid NMToken"; public static final String INVALID_NMTOKEN_MSG = "Invalid NMToken";
static final String INVALID_CONTAINERTOKEN_MSG = static final String INVALID_CONTAINERTOKEN_MSG =
"Invalid ContainerToken"; "Invalid ContainerToken";
final Context context; protected final Context context;
private final ContainersMonitor containersMonitor; private final ContainersMonitor containersMonitor;
private Server server; private Server server;
private final ResourceLocalizationService rsrcLocalizationSrvc; private final ResourceLocalizationService rsrcLocalizationSrvc;
@ -172,7 +172,7 @@ public class ContainerManagerImpl extends CompositeService implements
private final AuxServices auxiliaryServices; private final AuxServices auxiliaryServices;
private final NodeManagerMetrics metrics; private final NodeManagerMetrics metrics;
private final NodeStatusUpdater nodeStatusUpdater; protected final NodeStatusUpdater nodeStatusUpdater;
protected LocalDirsHandlerService dirsHandler; protected LocalDirsHandlerService dirsHandler;
protected final AsyncDispatcher dispatcher; protected final AsyncDispatcher dispatcher;
@ -213,14 +213,13 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
auxiliaryServices.registerServiceListener(this); auxiliaryServices.registerServiceListener(this);
addService(auxiliaryServices); addService(auxiliaryServices);
this.containersMonitor = this.containersMonitor = createContainersMonitor(exec);
new ContainersMonitorImpl(exec, dispatcher, this.context);
addService(this.containersMonitor); addService(this.containersMonitor);
dispatcher.register(ContainerEventType.class, dispatcher.register(ContainerEventType.class,
new ContainerEventDispatcher()); new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class, dispatcher.register(ApplicationEventType.class,
new ApplicationEventDispatcher()); createApplicationEventDispatcher());
dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc); dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
dispatcher.register(AuxServicesEventType.class, auxiliaryServices); dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor); dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
@ -235,6 +234,7 @@ public ContainerManagerImpl(Context context, ContainerExecutor exec,
@Override @Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
LogHandler logHandler = LogHandler logHandler =
createLogHandler(conf, this.context, this.deletionService); createLogHandler(conf, this.context, this.deletionService);
addIfService(logHandler); addIfService(logHandler);
@ -276,6 +276,10 @@ protected void createAMRMProxyService(Configuration conf) {
} }
} }
protected ContainersMonitor createContainersMonitor(ContainerExecutor exec) {
return new ContainersMonitorImpl(exec, dispatcher, this.context);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void recover() throws IOException, URISyntaxException { private void recover() throws IOException, URISyntaxException {
NMStateStoreService stateStore = context.getNMStateStore(); NMStateStoreService stateStore = context.getNMStateStore();
@ -418,6 +422,10 @@ protected ContainersLauncher createContainersLauncher(Context context,
return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
} }
protected EventHandler<ApplicationEvent> createApplicationEventDispatcher() {
return new ApplicationEventDispatcher();
}
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
@ -802,7 +810,8 @@ public StartContainersResponse startContainers(
.equals(ContainerType.APPLICATION_MASTER)) { .equals(ContainerType.APPLICATION_MASTER)) {
this.getAMRMProxyService().processApplicationStartRequest(request); this.getAMRMProxyService().processApplicationStartRequest(request);
} }
performContainerPreStartChecks(nmTokenIdentifier, request,
containerTokenIdentifier);
startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
request); request);
succeededContainers.add(containerId); succeededContainers.add(containerId);
@ -822,6 +831,42 @@ public StartContainersResponse startContainers(
} }
} }
private void performContainerPreStartChecks(
NMTokenIdentifier nmTokenIdentifier, StartContainerRequest request,
ContainerTokenIdentifier containerTokenIdentifier)
throws YarnException, InvalidToken {
/*
* 1) It should save the NMToken into NMTokenSecretManager. This is done
* here instead of RPC layer because at the time of opening/authenticating
* the connection it doesn't know what all RPC calls user will make on it.
* Also new NMToken is issued only at startContainer (once it gets
* renewed).
*
* 2) It should validate containerToken. Need to check below things. a) It
* is signed by correct master key (part of retrieve password). b) It
* belongs to correct Node Manager (part of retrieve password). c) It has
* correct RMIdentifier. d) It is not expired.
*/
authorizeStartAndResourceIncreaseRequest(
nmTokenIdentifier, containerTokenIdentifier, true);
// update NMToken
updateNMTokenIdentifier(nmTokenIdentifier);
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
Map<String, ByteBuffer> serviceData = getAuxServiceMetaData();
if (launchContext.getServiceData()!=null &&
!launchContext.getServiceData().isEmpty()) {
for (Entry<String, ByteBuffer> meta : launchContext.getServiceData()
.entrySet()) {
if (null == serviceData.get(meta.getKey())) {
throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
+ " does not exist");
}
}
}
}
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
String user, Credentials credentials, String user, Credentials credentials,
Map<ApplicationAccessType, String> appAcls, Map<ApplicationAccessType, String> appAcls,
@ -864,26 +909,10 @@ private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier, protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier, ContainerTokenIdentifier containerTokenIdentifier,
StartContainerRequest request) throws YarnException, IOException { StartContainerRequest request) throws YarnException, IOException {
/*
* 1) It should save the NMToken into NMTokenSecretManager. This is done
* here instead of RPC layer because at the time of opening/authenticating
* the connection it doesn't know what all RPC calls user will make on it.
* Also new NMToken is issued only at startContainer (once it gets renewed).
*
* 2) It should validate containerToken. Need to check below things. a) It
* is signed by correct master key (part of retrieve password). b) It
* belongs to correct Node Manager (part of retrieve password). c) It has
* correct RMIdentifier. d) It is not expired.
*/
authorizeStartAndResourceIncreaseRequest(
nmTokenIdentifier, containerTokenIdentifier, true);
// update NMToken
updateNMTokenIdentifier(nmTokenIdentifier);
ContainerId containerId = containerTokenIdentifier.getContainerID(); ContainerId containerId = containerTokenIdentifier.getContainerID();
String containerIdStr = containerId.toString(); String containerIdStr = containerId.toString();
String user = containerTokenIdentifier.getApplicationSubmitter(); String user = containerTokenIdentifier.getApplicationSubmitter();
@ -892,18 +921,6 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
ContainerLaunchContext launchContext = request.getContainerLaunchContext(); ContainerLaunchContext launchContext = request.getContainerLaunchContext();
Map<String, ByteBuffer> serviceData = getAuxServiceMetaData();
if (launchContext.getServiceData()!=null &&
!launchContext.getServiceData().isEmpty()) {
for (Map.Entry<String, ByteBuffer> meta : launchContext.getServiceData()
.entrySet()) {
if (null == serviceData.get(meta.getKey())) {
throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
+ " does not exist");
}
}
}
Credentials credentials = Credentials credentials =
YarnServerSecurityUtils.parseCredentials(launchContext); YarnServerSecurityUtils.parseCredentials(launchContext);
@ -923,13 +940,14 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
this.readLock.lock(); this.readLock.lock();
try { try {
if (!serviceStopped) { if (!isServiceStopped()) {
// Create the application // Create the application
Application application = Application application = new ApplicationImpl(dispatcher, user,
new ApplicationImpl(dispatcher, user, applicationID, credentials, context); applicationID, credentials, context);
if (null == context.getApplications().putIfAbsent(applicationID, if (null == context.getApplications().putIfAbsent(applicationID,
application)) { application)) {
LOG.info("Creating a new application reference for app " + applicationID); LOG.info("Creating a new application reference for app "
+ applicationID);
LogAggregationContext logAggregationContext = LogAggregationContext logAggregationContext =
containerTokenIdentifier.getLogAggregationContext(); containerTokenIdentifier.getLogAggregationContext();
Map<ApplicationAccessType, String> appAcls = Map<ApplicationAccessType, String> appAcls =
@ -1147,7 +1165,9 @@ public StopContainersResponse stopContainers(StopContainersRequest requests)
} }
for (ContainerId id : requests.getContainerIds()) { for (ContainerId id : requests.getContainerIds()) {
try { try {
stopContainerInternal(identifier, id); Container container = this.context.getContainers().get(id);
authorizeGetAndStopContainerRequest(id, container, true, identifier);
stopContainerInternal(id);
succeededRequests.add(id); succeededRequests.add(id);
} catch (YarnException e) { } catch (YarnException e) {
failedRequests.put(id, SerializedException.newInstance(e)); failedRequests.put(id, SerializedException.newInstance(e));
@ -1158,13 +1178,11 @@ public StopContainersResponse stopContainers(StopContainersRequest requests)
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier, protected void stopContainerInternal(ContainerId containerID)
ContainerId containerID) throws YarnException, IOException { throws YarnException, IOException {
String containerIDStr = containerID.toString(); String containerIDStr = containerID.toString();
Container container = this.context.getContainers().get(containerID); Container container = this.context.getContainers().get(containerID);
LOG.info("Stopping container with container Id: " + containerIDStr); LOG.info("Stopping container with container Id: " + containerIDStr);
authorizeGetAndStopContainerRequest(containerID, container, true,
nmTokenIdentifier);
if (container == null) { if (container == null) {
if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) { if (!nodeStatusUpdater.isContainerRecentlyStopped(containerID)) {
@ -1211,7 +1229,7 @@ public GetContainerStatusesResponse getContainerStatuses(
failedRequests); failedRequests);
} }
private ContainerStatus getContainerStatusInternal(ContainerId containerID, protected ContainerStatus getContainerStatusInternal(ContainerId containerID,
NMTokenIdentifier nmTokenIdentifier) throws YarnException { NMTokenIdentifier nmTokenIdentifier) throws YarnException {
String containerIDStr = containerID.toString(); String containerIDStr = containerID.toString();
Container container = this.context.getContainers().get(containerID); Container container = this.context.getContainers().get(containerID);
@ -1407,4 +1425,7 @@ protected void setAMRMProxyService(AMRMProxyService amrmProxyService) {
this.amrmProxyService = amrmProxyService; this.amrmProxyService = amrmProxyService;
} }
protected boolean isServiceStopped() {
return serviceStopped;
}
} }

View File

@ -453,7 +453,8 @@ public ContainerStatus cloneAndGetContainerStatus() {
this.readLock.lock(); this.readLock.lock();
try { try {
return BuilderUtils.newContainerStatus(this.containerId, return BuilderUtils.newContainerStatus(this.containerId,
getCurrentState(), diagnostics.toString(), exitCode, getResource()); getCurrentState(), diagnostics.toString(), exitCode, getResource(),
this.containerTokenIdentifier.getExecutionType());
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
} }

View File

@ -22,8 +22,26 @@
import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ResourceView; import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
public interface ContainersMonitor extends Service, public interface ContainersMonitor extends Service,
EventHandler<ContainersMonitorEvent>, ResourceView { EventHandler<ContainersMonitorEvent>, ResourceView {
public ResourceUtilization getContainersUtilization(); public ResourceUtilization getContainersUtilization();
ResourceUtilization getContainersAllocation();
boolean hasResourcesAvailable(ProcessTreeInfo pti);
void increaseContainersAllocation(ProcessTreeInfo pti);
void decreaseContainersAllocation(ProcessTreeInfo pti);
void increaseResourceUtilization(ResourceUtilization resourceUtil,
ProcessTreeInfo pti);
void decreaseResourceUtilization(ResourceUtilization resourceUtil,
ProcessTreeInfo pti);
void subtractNodeResourcesFromResourceUtilization(
ResourceUtilization resourceUtil);
} }

View File

@ -63,7 +63,7 @@ public class ContainersMonitorImpl extends AbstractService implements
private final ContainerExecutor containerExecutor; private final ContainerExecutor containerExecutor;
private final Dispatcher eventDispatcher; private final Dispatcher eventDispatcher;
private final Context context; protected final Context context;
private ResourceCalculatorPlugin resourceCalculatorPlugin; private ResourceCalculatorPlugin resourceCalculatorPlugin;
private Configuration conf; private Configuration conf;
private static float vmemRatio; private static float vmemRatio;
@ -82,6 +82,9 @@ public class ContainersMonitorImpl extends AbstractService implements
private int nodeCpuPercentageForYARN; private int nodeCpuPercentageForYARN;
private ResourceUtilization containersUtilization; private ResourceUtilization containersUtilization;
// Tracks the aggregated allocation of the currently allocated containers
// when queuing of containers at the NMs is enabled.
private ResourceUtilization containersAllocation;
private volatile boolean stopped = false; private volatile boolean stopped = false;
@ -96,6 +99,7 @@ public ContainersMonitorImpl(ContainerExecutor exec,
this.monitoringThread = new MonitoringThread(); this.monitoringThread = new MonitoringThread();
this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f); this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
this.containersAllocation = ResourceUtilization.newInstance(0, 0, 0.0f);
} }
@Override @Override
@ -132,10 +136,11 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS); YarnConfiguration.DEFAULT_NM_CONTAINER_METRICS_UNREGISTER_DELAY_MS);
long configuredPMemForContainers = long configuredPMemForContainers =
NodeManagerHardwareUtils.getContainerMemoryMB(conf) * 1024 * 1024L; NodeManagerHardwareUtils.getContainerMemoryMB(
this.resourceCalculatorPlugin, conf) * 1024 * 1024L;
long configuredVCoresForContainers = long configuredVCoresForContainers =
NodeManagerHardwareUtils.getVCores(conf); NodeManagerHardwareUtils.getVCores(this.resourceCalculatorPlugin, conf);
// Setting these irrespective of whether checks are enabled. Required in // Setting these irrespective of whether checks are enabled. Required in
// the UI. // the UI.
@ -233,8 +238,7 @@ protected void serviceStop() throws Exception {
super.serviceStop(); super.serviceStop();
} }
@VisibleForTesting public static class ProcessTreeInfo {
static class ProcessTreeInfo {
private ContainerId containerId; private ContainerId containerId;
private String pid; private String pid;
private ResourceCalculatorProcessTree pTree; private ResourceCalculatorProcessTree pTree;
@ -697,6 +701,82 @@ public void setContainersUtilization(ResourceUtilization utilization) {
this.containersUtilization = utilization; this.containersUtilization = utilization;
} }
public ResourceUtilization getContainersAllocation() {
return this.containersAllocation;
}
/**
* @return true if there are available allocated resources for the given
* container to start.
*/
@Override
public boolean hasResourcesAvailable(ProcessTreeInfo pti) {
synchronized (this.containersAllocation) {
// Check physical memory.
if (this.containersAllocation.getPhysicalMemory() +
(int) (pti.getPmemLimit() >> 20) >
(int) (getPmemAllocatedForContainers() >> 20)) {
return false;
}
// Check virtual memory.
if (isVmemCheckEnabled() &&
this.containersAllocation.getVirtualMemory() +
(int) (pti.getVmemLimit() >> 20) >
(int) (getVmemAllocatedForContainers() >> 20)) {
return false;
}
// Check CPU.
if (this.containersAllocation.getCPU()
+ allocatedCpuUsage(pti) > 1.0f) {
return false;
}
}
return true;
}
@Override
public void increaseContainersAllocation(ProcessTreeInfo pti) {
synchronized (this.containersAllocation) {
increaseResourceUtilization(this.containersAllocation, pti);
}
}
@Override
public void decreaseContainersAllocation(ProcessTreeInfo pti) {
synchronized (this.containersAllocation) {
decreaseResourceUtilization(this.containersAllocation, pti);
}
}
@Override
public void increaseResourceUtilization(ResourceUtilization resourceUtil,
ProcessTreeInfo pti) {
resourceUtil.addTo((int) (pti.getPmemLimit() >> 20),
(int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
}
@Override
public void decreaseResourceUtilization(ResourceUtilization resourceUtil,
ProcessTreeInfo pti) {
resourceUtil.subtractFrom((int) (pti.getPmemLimit() >> 20),
(int) (pti.getVmemLimit() >> 20), allocatedCpuUsage(pti));
}
@Override
public void subtractNodeResourcesFromResourceUtilization(
ResourceUtilization resourceUtil) {
resourceUtil.subtractFrom((int) (getPmemAllocatedForContainers() >> 20),
(int) (getVmemAllocatedForContainers() >> 20), 1.0f);
}
private float allocatedCpuUsage(ProcessTreeInfo pti) {
float cpuUsagePercentPerCore = pti.getCpuVcores() * 100.0f;
float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore
/ resourceCalculatorPlugin.getNumProcessors();
return (cpuUsageTotalCoresPercentage * 1000 *
maxVCoresAllottedForContainers / nodeCpuPercentageForYARN) / 1000.0f;
}
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void handle(ContainersMonitorEvent monitoringEvent) { public void handle(ContainersMonitorEvent monitoringEvent) {
@ -714,40 +794,56 @@ public void handle(ContainersMonitorEvent monitoringEvent) {
switch (monitoringEvent.getType()) { switch (monitoringEvent.getType()) {
case START_MONITORING_CONTAINER: case START_MONITORING_CONTAINER:
ContainerStartMonitoringEvent startEvent = onStartMonitoringContainer(monitoringEvent, containerId);
(ContainerStartMonitoringEvent) monitoringEvent;
LOG.info("Starting resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
trackingContainers.put(containerId,
new ProcessTreeInfo(containerId, null, null,
startEvent.getVmemLimit(), startEvent.getPmemLimit(),
startEvent.getCpuVcores()));
break; break;
case STOP_MONITORING_CONTAINER: case STOP_MONITORING_CONTAINER:
LOG.info("Stopping resource-monitoring for " + containerId); onStopMonitoringContainer(monitoringEvent, containerId);
updateContainerMetrics(monitoringEvent);
trackingContainers.remove(containerId);
break; break;
case CHANGE_MONITORING_CONTAINER_RESOURCE: case CHANGE_MONITORING_CONTAINER_RESOURCE:
ChangeMonitoringContainerResourceEvent changeEvent = onChangeMonitoringContainerResource(monitoringEvent, containerId);
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
if (processTreeInfo == null) {
LOG.warn("Failed to track container "
+ containerId.toString()
+ ". It may have already completed.");
break;
}
LOG.info("Changing resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
long vmemLimit = (long) (pmemLimit * vmemRatio);
int cpuVcores = changeEvent.getResource().getVirtualCores();
processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
changeContainerResource(containerId, changeEvent.getResource());
break; break;
default: default:
// TODO: Wrong event. // TODO: Wrong event.
} }
} }
protected void onChangeMonitoringContainerResource(
ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
ChangeMonitoringContainerResourceEvent changeEvent =
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
if (processTreeInfo == null) {
LOG.warn("Failed to track container "
+ containerId.toString()
+ ". It may have already completed.");
return;
}
LOG.info("Changing resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
long vmemLimit = (long) (pmemLimit * vmemRatio);
int cpuVcores = changeEvent.getResource().getVirtualCores();
processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
changeContainerResource(containerId, changeEvent.getResource());
}
protected void onStopMonitoringContainer(
ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
LOG.info("Stopping resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
trackingContainers.remove(containerId);
}
protected void onStartMonitoringContainer(
ContainersMonitorEvent monitoringEvent, ContainerId containerId) {
ContainerStartMonitoringEvent startEvent =
(ContainerStartMonitoringEvent) monitoringEvent;
LOG.info("Starting resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
trackingContainers.put(containerId,
new ProcessTreeInfo(containerId, null, null,
startEvent.getVmemLimit(), startEvent.getPmemLimit(),
startEvent.getCpuVcores()));
}
} }

View File

@ -0,0 +1,556 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Class extending {@link ContainerManagerImpl} and is used when queuing at the
* NM is enabled.
*/
public class QueuingContainerManagerImpl extends ContainerManagerImpl {
private static final Logger LOG = LoggerFactory
.getLogger(QueuingContainerManagerImpl.class);
private ConcurrentMap<ContainerId, AllocatedContainerInfo>
allocatedGuaranteedContainers;
private ConcurrentMap<ContainerId, AllocatedContainerInfo>
allocatedOpportunisticContainers;
private Queue<AllocatedContainerInfo> queuedGuaranteedContainers;
private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
private Set<ContainerId> opportunisticContainersToKill;
public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
super(context, exec, deletionContext, nodeStatusUpdater, metrics,
dirsHandler);
this.allocatedGuaranteedContainers = new ConcurrentHashMap<>();
this.allocatedOpportunisticContainers = new ConcurrentHashMap<>();
this.queuedGuaranteedContainers = new ConcurrentLinkedQueue<>();
this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
this.opportunisticContainersToKill = Collections.synchronizedSet(
new HashSet<ContainerId>());
}
@Override
protected EventHandler<ApplicationEvent> createApplicationEventDispatcher() {
return new QueuingApplicationEventDispatcher(
super.createApplicationEventDispatcher());
}
@Override
protected void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
StartContainerRequest request) throws YarnException, IOException {
this.context.getQueuingContext().getQueuedContainers().put(
containerTokenIdentifier.getContainerID(), containerTokenIdentifier);
AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
containerTokenIdentifier, nmTokenIdentifier, request,
containerTokenIdentifier.getExecutionType(), containerTokenIdentifier
.getResource(), getConfig());
// If there are already free resources for the container to start, and
// there are no queued containers waiting to be executed, start this
// container immediately.
if (queuedGuaranteedContainers.isEmpty() &&
queuedOpportunisticContainers.isEmpty() &&
getContainersMonitor().
hasResourcesAvailable(allocatedContInfo.getPti())) {
startAllocatedContainer(allocatedContInfo);
} else {
if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.add(allocatedContInfo);
// Kill running opportunistic containers to make space for
// guaranteed container.
killOpportunisticContainers(allocatedContInfo);
} else {
queuedOpportunisticContainers.add(allocatedContInfo);
}
}
}
@Override
protected void stopContainerInternal(ContainerId containerID)
throws YarnException, IOException {
Container container = this.context.getContainers().get(containerID);
// If container is null and distributed scheduling is enabled, container
// might be queued. Otherwise, container might not be handled by this NM.
if (container == null && this.context.getQueuingContext()
.getQueuedContainers().containsKey(containerID)) {
ContainerTokenIdentifier containerTokenId = this.context
.getQueuingContext().getQueuedContainers().remove(containerID);
boolean foundInQueue = removeQueuedContainer(containerID,
containerTokenId.getExecutionType());
if (foundInQueue) {
this.context.getQueuingContext().getKilledQueuedContainers().put(
containerTokenId,
"Queued container request removed by ApplicationMaster.");
} else {
// The container started execution in the meanwhile.
try {
stopContainerInternalIfRunning(containerID);
} catch (YarnException | IOException e) {
LOG.error("Container did not get removed successfully.", e);
}
}
nodeStatusUpdater.sendOutofBandHeartBeat();
}
super.stopContainerInternal(containerID);
}
/**
* Start the execution of the given container. Also add it to the allocated
* containers, and update allocated resource utilization.
*/
private void startAllocatedContainer(
AllocatedContainerInfo allocatedContainerInfo) {
ProcessTreeInfo pti = allocatedContainerInfo.getPti();
if (allocatedContainerInfo.getExecutionType() ==
ExecutionType.GUARANTEED) {
allocatedGuaranteedContainers.put(pti.getContainerId(),
allocatedContainerInfo);
} else {
allocatedOpportunisticContainers.put(pti.getContainerId(),
allocatedContainerInfo);
}
getContainersMonitor().increaseContainersAllocation(pti);
// Start execution of container.
ContainerId containerId = allocatedContainerInfo
.getContainerTokenIdentifier().getContainerID();
this.context.getQueuingContext().getQueuedContainers().remove(containerId);
try {
super.startContainerInternal(
allocatedContainerInfo.getNMTokenIdentifier(),
allocatedContainerInfo.getContainerTokenIdentifier(),
allocatedContainerInfo.getStartRequest());
} catch (YarnException | IOException e) {
containerFailedToStart(pti.getContainerId(),
allocatedContainerInfo.getContainerTokenIdentifier());
LOG.error("Container failed to start.", e);
}
}
private void containerFailedToStart(ContainerId containerId,
ContainerTokenIdentifier containerTokenId) {
this.context.getQueuingContext().getQueuedContainers().remove(containerId);
removeAllocatedContainer(containerId);
this.context.getQueuingContext().getKilledQueuedContainers().put(
containerTokenId,
"Container removed from queue as it failed to start.");
}
/**
* Remove the given container from the container queues.
*
* @return true if the container was found in one of the queues.
*/
private boolean removeQueuedContainer(ContainerId containerId,
ExecutionType executionType) {
Queue<AllocatedContainerInfo> queue =
(executionType == ExecutionType.GUARANTEED) ?
queuedGuaranteedContainers : queuedOpportunisticContainers;
boolean foundInQueue = false;
Iterator<AllocatedContainerInfo> iter = queue.iterator();
while (iter.hasNext() && !foundInQueue) {
if (iter.next().getPti().getContainerId().equals(containerId)) {
iter.remove();
foundInQueue = true;
}
}
return foundInQueue;
}
/**
* Remove the given container from the allocated containers, and update
* allocated container utilization accordingly.
*/
private void removeAllocatedContainer(ContainerId containerId) {
AllocatedContainerInfo contToRemove = null;
contToRemove = allocatedGuaranteedContainers.remove(containerId);
if (contToRemove == null) {
contToRemove = allocatedOpportunisticContainers.remove(containerId);
}
// If container was indeed running, update allocated resource utilization.
if (contToRemove != null) {
getContainersMonitor().decreaseContainersAllocation(contToRemove
.getPti());
}
}
/**
* Stop a container only if it is currently running. If queued, do not stop
* it.
*/
private void stopContainerInternalIfRunning(ContainerId containerID)
throws YarnException, IOException {
if (this.context.getContainers().containsKey(containerID)) {
stopContainerInternal(containerID);
}
}
/**
* Kill opportunistic containers to free up resources for running the given
* container.
*
* @param allocatedContInfo
* the container whose execution needs to start by freeing up
* resources occupied by opportunistic containers.
*/
private void killOpportunisticContainers(
AllocatedContainerInfo allocatedContInfo) {
ContainerId containerToStartId = allocatedContInfo.getPti()
.getContainerId();
List<ContainerId> extraOpportContainersToKill =
pickOpportunisticContainersToKill(containerToStartId);
// Kill the opportunistic containers that were chosen.
for (ContainerId contIdToKill : extraOpportContainersToKill) {
try {
stopContainerInternalIfRunning(contIdToKill);
} catch (YarnException | IOException e) {
LOG.error("Container did not get removed successfully.", e);
}
LOG.info(
"Opportunistic container {} will be killed in order to start the "
+ "execution of guaranteed container {}.",
contIdToKill, containerToStartId);
}
}
/**
* Choose the opportunistic containers to kill in order to free up resources
* for running the given container.
*
* @param containerToStartId
* the container whose execution needs to start by freeing up
* resources occupied by opportunistic containers.
* @return the additional opportunistic containers that need to be killed.
*/
protected List<ContainerId> pickOpportunisticContainersToKill(
ContainerId containerToStartId) {
// The additional opportunistic containers that need to be killed for the
// given container to start.
List<ContainerId> extraOpportContainersToKill = new ArrayList<>();
// Track resources that need to be freed.
ResourceUtilization resourcesToFreeUp = resourcesToFreeUp(
containerToStartId);
// Go over the running opportunistic containers. Avoid containers that have
// already been marked for killing.
boolean hasSufficientResources = false;
for (Map.Entry<ContainerId, AllocatedContainerInfo> runningOpportCont :
allocatedOpportunisticContainers.entrySet()) {
ContainerId runningOpportContId = runningOpportCont.getKey();
// If there are sufficient resources to execute the given container, do
// not kill more opportunistic containers.
if (resourcesToFreeUp.getPhysicalMemory() <= 0 &&
resourcesToFreeUp.getVirtualMemory() <= 0 &&
resourcesToFreeUp.getCPU() <= 0.0f) {
hasSufficientResources = true;
break;
}
if (!opportunisticContainersToKill.contains(runningOpportContId)) {
extraOpportContainersToKill.add(runningOpportContId);
opportunisticContainersToKill.add(runningOpportContId);
getContainersMonitor().decreaseResourceUtilization(resourcesToFreeUp,
runningOpportCont.getValue().getPti());
}
}
if (!hasSufficientResources) {
LOG.info(
"There are no sufficient resources to start guaranteed {} even after "
+ "attempting to kill any running opportunistic containers.",
containerToStartId);
}
return extraOpportContainersToKill;
}
/**
* Calculates the amount of resources that need to be freed up (by killing
* opportunistic containers) in order for the given guaranteed container to
* start its execution. Resource allocation to be freed up =
* <code>containersAllocation</code> -
* allocation of <code>opportunisticContainersToKill</code> +
* allocation of <code>queuedGuaranteedContainers</code> that will start
* before the given container +
* allocation of given container -
* total resources of node.
*
* @param containerToStartId
* the ContainerId of the guaranteed container for which we need to
* free resources, so that its execution can start.
* @return the resources that need to be freed up for the given guaranteed
* container to start.
*/
private ResourceUtilization resourcesToFreeUp(
ContainerId containerToStartId) {
// Get allocation of currently allocated containers.
ResourceUtilization resourceAllocationToFreeUp = ResourceUtilization
.newInstance(getContainersMonitor().getContainersAllocation());
// Subtract from the allocation the allocation of the opportunistic
// containers that are marked for killing.
for (ContainerId opportContId : opportunisticContainersToKill) {
if (allocatedOpportunisticContainers.containsKey(opportContId)) {
getContainersMonitor().decreaseResourceUtilization(
resourceAllocationToFreeUp,
allocatedOpportunisticContainers.get(opportContId).getPti());
}
}
// Add to the allocation the allocation of the pending guaranteed
// containers that will start before the current container will be started.
for (AllocatedContainerInfo guarContInfo : queuedGuaranteedContainers) {
getContainersMonitor().increaseResourceUtilization(
resourceAllocationToFreeUp, guarContInfo.getPti());
if (guarContInfo.getPti().getContainerId().equals(containerToStartId)) {
break;
}
}
// Subtract the overall node resources.
getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
resourceAllocationToFreeUp);
return resourceAllocationToFreeUp;
}
/**
* If there are available resources, try to start as many pending containers
* as possible.
*/
private void startPendingContainers() {
// Start pending guaranteed containers, if resources available.
boolean resourcesAvailable =
startContainersFromQueue(queuedGuaranteedContainers);
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainersFromQueue(queuedOpportunisticContainers);
}
}
private boolean startContainersFromQueue(
Queue<AllocatedContainerInfo> queuedContainers) {
Iterator<AllocatedContainerInfo> guarIter = queuedContainers.iterator();
boolean resourcesAvailable = true;
while (guarIter.hasNext() && resourcesAvailable) {
AllocatedContainerInfo allocatedContInfo = guarIter.next();
if (getContainersMonitor().hasResourcesAvailable(
allocatedContInfo.getPti())) {
startAllocatedContainer(allocatedContInfo);
guarIter.remove();
} else {
resourcesAvailable = false;
}
}
return resourcesAvailable;
}
@Override
protected ContainerStatus getContainerStatusInternal(ContainerId containerID,
NMTokenIdentifier nmTokenIdentifier) throws YarnException {
Container container = this.context.getContainers().get(containerID);
if (container == null) {
ContainerTokenIdentifier containerTokenId = this.context
.getQueuingContext().getQueuedContainers().get(containerID);
if (containerTokenId != null) {
ExecutionType executionType = this.context.getQueuingContext()
.getQueuedContainers().get(containerID).getExecutionType();
return BuilderUtils.newContainerStatus(containerID,
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, "",
ContainerExitStatus.INVALID, this.context.getQueuingContext()
.getQueuedContainers().get(containerID).getResource(),
executionType);
}
}
return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
}
@VisibleForTesting
public int getNumAllocatedGuaranteedContainers() {
return allocatedGuaranteedContainers.size();
}
@VisibleForTesting
public int getNumAllocatedOpportunisticContainers() {
return allocatedOpportunisticContainers.size();
}
class QueuingApplicationEventDispatcher implements
EventHandler<ApplicationEvent> {
private EventHandler<ApplicationEvent> applicationEventDispatcher;
public QueuingApplicationEventDispatcher(
EventHandler<ApplicationEvent> applicationEventDispatcher) {
this.applicationEventDispatcher = applicationEventDispatcher;
}
@Override
@SuppressWarnings("unchecked")
public void handle(ApplicationEvent event) {
if (event.getType() ==
ApplicationEventType.APPLICATION_CONTAINER_FINISHED) {
if (!(event instanceof ApplicationContainerFinishedEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
ApplicationContainerFinishedEvent finishEvent =
(ApplicationContainerFinishedEvent) event;
// Remove finished container from the allocated containers, and
// attempt to start new containers.
ContainerId contIdToRemove = finishEvent.getContainerID();
removeAllocatedContainer(contIdToRemove);
opportunisticContainersToKill.remove(contIdToRemove);
startPendingContainers();
}
this.applicationEventDispatcher.handle(event);
}
}
static class AllocatedContainerInfo {
private final ContainerTokenIdentifier containerTokenIdentifier;
private final NMTokenIdentifier nmTokenIdentifier;
private final StartContainerRequest startRequest;
private final ExecutionType executionType;
private final ProcessTreeInfo pti;
AllocatedContainerInfo(ContainerTokenIdentifier containerTokenIdentifier,
NMTokenIdentifier nmTokenIdentifier, StartContainerRequest startRequest,
ExecutionType executionType, Resource resource, Configuration conf) {
this.containerTokenIdentifier = containerTokenIdentifier;
this.nmTokenIdentifier = nmTokenIdentifier;
this.startRequest = startRequest;
this.executionType = executionType;
this.pti = createProcessTreeInfo(containerTokenIdentifier
.getContainerID(), resource, conf);
}
private ContainerTokenIdentifier getContainerTokenIdentifier() {
return this.containerTokenIdentifier;
}
private NMTokenIdentifier getNMTokenIdentifier() {
return this.nmTokenIdentifier;
}
private StartContainerRequest getStartRequest() {
return this.startRequest;
}
private ExecutionType getExecutionType() {
return this.executionType;
}
protected ProcessTreeInfo getPti() {
return this.pti;
}
private ProcessTreeInfo createProcessTreeInfo(ContainerId containerId,
Resource resource, Configuration conf) {
long pmemBytes = resource.getMemory() * 1024 * 1024L;
float pmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
long vmemBytes = (long) (pmemRatio * pmemBytes);
int cpuVcores = resource.getVirtualCores();
return new ProcessTreeInfo(containerId, null, null, vmemBytes, pmemBytes,
cpuVcores);
}
@Override
public boolean equals(Object obj) {
boolean equal = false;
if (obj instanceof AllocatedContainerInfo) {
AllocatedContainerInfo otherContInfo = (AllocatedContainerInfo) obj;
equal = this.getPti().getContainerId()
.equals(otherContInfo.getPti().getContainerId());
}
return equal;
}
@Override
public int hashCode() {
return this.getPti().getContainerId().hashCode();
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* This package contains classes related to the queuing of containers at
* the NM.
*
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;

View File

@ -41,7 +41,6 @@
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;

View File

@ -683,5 +683,10 @@ public NodeResourceMonitor getNodeResourceMonitor() {
public NodeStatusUpdater getNodeStatusUpdater() { public NodeStatusUpdater getNodeStatusUpdater() {
return null; return null;
} }
@Override
public QueuingContext getQueuingContext() {
return null;
}
} }
} }

View File

@ -280,21 +280,22 @@ public static void waitForContainerState(ContainerManagementProtocol containerMa
list.add(containerID); list.add(containerID);
GetContainerStatusesRequest request = GetContainerStatusesRequest request =
GetContainerStatusesRequest.newInstance(list); GetContainerStatusesRequest.newInstance(list);
ContainerStatus containerStatus = ContainerStatus containerStatus = null;
containerManager.getContainerStatuses(request).getContainerStatuses()
.get(0);
int timeoutSecs = 0; int timeoutSecs = 0;
while (!containerStatus.getState().equals(finalState) do {
&& timeoutSecs++ < timeOutMax) { Thread.sleep(2000);
Thread.sleep(1000); containerStatus =
LOG.info("Waiting for container to get into state " + finalState containerManager.getContainerStatuses(request)
+ ". Current state is " + containerStatus.getState()); .getContainerStatuses().get(0);
containerStatus = containerManager.getContainerStatuses(request).getContainerStatuses().get(0); LOG.info("Waiting for container to get into state " + finalState
} + ". Current state is " + containerStatus.getState());
LOG.info("Container state is " + containerStatus.getState()); timeoutSecs += 2;
Assert.assertEquals("ContainerState is not correct (timedout)", } while (!containerStatus.getState().equals(finalState)
finalState, containerStatus.getState()); && timeoutSecs < timeOutMax);
} LOG.info("Container state is " + containerStatus.getState());
Assert.assertEquals("ContainerState is not correct (timedout)",
finalState, containerStatus.getState());
}
static void waitForApplicationState(ContainerManagerImpl containerManager, static void waitForApplicationState(ContainerManagerImpl containerManager,
ApplicationId appID, ApplicationState finalState) ApplicationId appID, ApplicationState finalState)
@ -328,19 +329,24 @@ public static void waitForNMContainerState(ContainerManagerImpl
org.apache.hadoop.yarn.server.nodemanager.containermanager org.apache.hadoop.yarn.server.nodemanager.containermanager
.container.ContainerState finalState, int timeOutMax) .container.ContainerState finalState, int timeOutMax)
throws InterruptedException, YarnException, IOException { throws InterruptedException, YarnException, IOException {
Container container = Container container = null;
containerManager.getContext().getContainers().get(containerID);
org.apache.hadoop.yarn.server.nodemanager org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.ContainerState currentState = .containermanager.container.ContainerState currentState = null;
container.getContainerState();
int timeoutSecs = 0; int timeoutSecs = 0;
while (!currentState.equals(finalState) do {
&& timeoutSecs++ < timeOutMax) { Thread.sleep(2000);
Thread.sleep(1000); container =
LOG.info("Waiting for NM container to get into state " + finalState containerManager.getContext().getContainers().get(containerID);
+ ". Current state is " + currentState); if (container != null) {
currentState = container.getContainerState(); currentState = container.getContainerState();
} }
if (currentState != null) {
LOG.info("Waiting for NM container to get into state " + finalState
+ ". Current state is " + currentState);
}
timeoutSecs += 2;
} while (!currentState.equals(finalState)
&& timeoutSecs++ < timeOutMax);
LOG.info("Container state is " + currentState); LOG.info("Container state is " + currentState);
Assert.assertEquals("ContainerState is not correct (timedout)", Assert.assertEquals("ContainerState is not correct (timedout)",
finalState, currentState); finalState, currentState);

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -74,6 +75,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
@ -176,7 +178,7 @@ public void testContainerManagerInitialization() throws IOException {
// Just do a query for a non-existing container. // Just do a query for a non-existing container.
boolean throwsException = false; boolean throwsException = false;
try { try {
List<ContainerId> containerIds = new ArrayList<ContainerId>(); List<ContainerId> containerIds = new ArrayList<>();
ContainerId id =createContainerId(0); ContainerId id =createContainerId(0);
containerIds.add(id); containerIds.add(id);
GetContainerStatusesRequest request = GetContainerStatusesRequest request =
@ -231,14 +233,14 @@ public void testContainerSetup() throws Exception {
containerLaunchContext, containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager())); user, context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); List<StartContainerRequest> list = new ArrayList<>();
list.add(scRequest); list.add(scRequest);
StartContainersRequest allRequests = StartContainersRequest allRequests =
StartContainersRequest.newInstance(list); StartContainersRequest.newInstance(list);
containerManager.startContainers(allRequests); containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForContainerState(containerManager, cId, BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE); ContainerState.COMPLETE, 40);
// Now ascertain that the resources are localised correctly. // Now ascertain that the resources are localised correctly.
ApplicationId appId = cId.getApplicationAttemptId().getApplicationId(); ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
@ -323,7 +325,7 @@ public void testContainerLaunchAndStop() throws IOException,
createContainerToken(cId, createContainerToken(cId,
DUMMY_RM_IDENTIFIER, context.getNodeId(), user, DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
context.getContainerTokenSecretManager())); context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); List<StartContainerRequest> list = new ArrayList<>();
list.add(scRequest); list.add(scRequest);
StartContainersRequest allRequests = StartContainersRequest allRequests =
StartContainersRequest.newInstance(list); StartContainersRequest.newInstance(list);
@ -355,7 +357,7 @@ public void testContainerLaunchAndStop() throws IOException,
Assert.assertTrue("Process is not alive!", Assert.assertTrue("Process is not alive!",
DefaultContainerExecutor.containerIsAlive(pid)); DefaultContainerExecutor.containerIsAlive(pid));
List<ContainerId> containerIds = new ArrayList<ContainerId>(); List<ContainerId> containerIds = new ArrayList<>();
containerIds.add(cId); containerIds.add(cId);
StopContainersRequest stopRequest = StopContainersRequest stopRequest =
StopContainersRequest.newInstance(containerIds); StopContainersRequest.newInstance(containerIds);
@ -375,7 +377,7 @@ public void testContainerLaunchAndStop() throws IOException,
DefaultContainerExecutor.containerIsAlive(pid)); DefaultContainerExecutor.containerIsAlive(pid));
} }
private void testContainerLaunchAndExit(int exitCode) throws IOException, protected void testContainerLaunchAndExit(int exitCode) throws IOException,
InterruptedException, YarnException { InterruptedException, YarnException {
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
@ -430,7 +432,7 @@ private void testContainerLaunchAndExit(int exitCode) throws IOException,
containerLaunchContext, containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager())); user, context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); List<StartContainerRequest> list = new ArrayList<>();
list.add(scRequest); list.add(scRequest);
StartContainersRequest allRequests = StartContainersRequest allRequests =
StartContainersRequest.newInstance(list); StartContainersRequest.newInstance(list);
@ -439,12 +441,12 @@ private void testContainerLaunchAndExit(int exitCode) throws IOException,
BaseContainerManagerTest.waitForContainerState(containerManager, cId, BaseContainerManagerTest.waitForContainerState(containerManager, cId,
ContainerState.COMPLETE); ContainerState.COMPLETE);
List<ContainerId> containerIds = new ArrayList<ContainerId>(); List<ContainerId> containerIds = new ArrayList<>();
containerIds.add(cId); containerIds.add(cId);
GetContainerStatusesRequest gcsRequest = GetContainerStatusesRequest gcsRequest =
GetContainerStatusesRequest.newInstance(containerIds); GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus = ContainerStatus containerStatus = containerManager.
containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0); getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
// Verify exit status matches exit state of script // Verify exit status matches exit state of script
Assert.assertEquals(exitCode, Assert.assertEquals(exitCode,
@ -520,7 +522,7 @@ public void testLocalFilesCleanup() throws InterruptedException,
containerLaunchContext, containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager())); user, context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); List<StartContainerRequest> list = new ArrayList<>();
list.add(scRequest); list.add(scRequest);
StartContainersRequest allRequests = StartContainersRequest allRequests =
StartContainersRequest.newInstance(list); StartContainersRequest.newInstance(list);
@ -605,7 +607,7 @@ public void testContainerLaunchFromPreviousRM() throws IOException,
createContainerToken(cId1, createContainerToken(cId1,
ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(), ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager())); user, context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); List<StartContainerRequest> list = new ArrayList<>();
list.add(startRequest1); list.add(startRequest1);
StartContainersRequest allRequests = StartContainersRequest allRequests =
StartContainersRequest.newInstance(list); StartContainersRequest.newInstance(list);
@ -635,7 +637,7 @@ public void testContainerLaunchFromPreviousRM() throws IOException,
createContainerToken(cId2, createContainerToken(cId2,
DUMMY_RM_IDENTIFIER, context.getNodeId(), user, DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
context.getContainerTokenSecretManager())); context.getContainerTokenSecretManager()));
List<StartContainerRequest> list2 = new ArrayList<StartContainerRequest>(); List<StartContainerRequest> list2 = new ArrayList<>();
list.add(startRequest2); list.add(startRequest2);
StartContainersRequest allRequests2 = StartContainersRequest allRequests2 =
StartContainersRequest.newInstance(list2); StartContainersRequest.newInstance(list2);
@ -655,7 +657,7 @@ public void testContainerLaunchFromPreviousRM() throws IOException,
public void testMultipleContainersLaunch() throws Exception { public void testMultipleContainersLaunch() throws Exception {
containerManager.start(); containerManager.start();
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); List<StartContainerRequest> list = new ArrayList<>();
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class); recordFactory.newRecordInstance(ContainerLaunchContext.class);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
@ -679,6 +681,7 @@ public void testMultipleContainersLaunch() throws Exception {
StartContainersResponse response = StartContainersResponse response =
containerManager.startContainers(requestList); containerManager.startContainers(requestList);
Thread.sleep(5000);
Assert.assertEquals(5, response.getSuccessfullyStartedContainers().size()); Assert.assertEquals(5, response.getSuccessfullyStartedContainers().size());
for (ContainerId id : response.getSuccessfullyStartedContainers()) { for (ContainerId id : response.getSuccessfullyStartedContainers()) {
@ -699,12 +702,11 @@ public void testMultipleContainersLaunch() throws Exception {
@Test @Test
public void testMultipleContainersStopAndGetStatus() throws Exception { public void testMultipleContainersStopAndGetStatus() throws Exception {
containerManager.start(); containerManager.start();
List<StartContainerRequest> startRequest = List<StartContainerRequest> startRequest = new ArrayList<>();
new ArrayList<StartContainerRequest>();
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class); recordFactory.newRecordInstance(ContainerLaunchContext.class);
List<ContainerId> containerIds = new ArrayList<ContainerId>(); List<ContainerId> containerIds = new ArrayList<>();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
ContainerId cId = createContainerId(i); ContainerId cId = createContainerId(i);
String user = null; String user = null;
@ -727,6 +729,7 @@ public void testMultipleContainersStopAndGetStatus() throws Exception {
StartContainersRequest requestList = StartContainersRequest requestList =
StartContainersRequest.newInstance(startRequest); StartContainersRequest.newInstance(startRequest);
containerManager.startContainers(requestList); containerManager.startContainers(requestList);
Thread.sleep(5000);
// Get container statuses // Get container statuses
GetContainerStatusesRequest statusRequest = GetContainerStatusesRequest statusRequest =
@ -777,8 +780,7 @@ public void testStartContainerFailureWithUnknownAuxService() throws Exception {
ServiceA.class, Service.class); ServiceA.class, Service.class);
containerManager.start(); containerManager.start();
List<StartContainerRequest> startRequest = List<StartContainerRequest> startRequest = new ArrayList<>();
new ArrayList<StartContainerRequest>();
ContainerLaunchContext containerLaunchContext = ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class); recordFactory.newRecordInstance(ContainerLaunchContext.class);
@ -803,8 +805,8 @@ public void testStartContainerFailureWithUnknownAuxService() throws Exception {
StartContainersResponse response = StartContainersResponse response =
containerManager.startContainers(requestList); containerManager.startContainers(requestList);
Assert.assertTrue(response.getFailedRequests().size() == 1); Assert.assertEquals(1, response.getFailedRequests().size());
Assert.assertTrue(response.getSuccessfullyStartedContainers().size() == 0); Assert.assertEquals(0, response.getSuccessfullyStartedContainers().size());
Assert.assertTrue(response.getFailedRequests().containsKey(cId)); Assert.assertTrue(response.getFailedRequests().containsKey(cId));
Assert.assertTrue(response.getFailedRequests().get(cId).getMessage() Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
.contains("The auxService:" + serviceName + " does not exist")); .contains("The auxService:" + serviceName + " does not exist"));
@ -880,8 +882,7 @@ public void testNullTokens() throws Exception {
ContainerManagerImpl.INVALID_NMTOKEN_MSG); ContainerManagerImpl.INVALID_NMTOKEN_MSG);
Mockito.doNothing().when(spyContainerMgr).authorizeUser(ugInfo, null); Mockito.doNothing().when(spyContainerMgr).authorizeUser(ugInfo, null);
List<StartContainerRequest> reqList List<StartContainerRequest> reqList = new ArrayList<>();
= new ArrayList<StartContainerRequest>();
reqList.add(StartContainerRequest.newInstance(null, null)); reqList.add(StartContainerRequest.newInstance(null, null));
StartContainersRequest reqs = new StartContainersRequestPBImpl(); StartContainersRequest reqs = new StartContainersRequestPBImpl();
reqs.setStartContainerRequests(reqList); reqs.setStartContainerRequests(reqList);
@ -925,7 +926,7 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception
Thread.sleep(2000); Thread.sleep(2000);
// Construct container resource increase request, // Construct container resource increase request,
List<Token> increaseTokens = new ArrayList<Token>(); List<Token> increaseTokens = new ArrayList<>();
// Add increase request for container-0, the request will fail as the // Add increase request for container-0, the request will fail as the
// container will have exited, and won't be in RUNNING state // container will have exited, and won't be in RUNNING state
ContainerId cId0 = createContainerId(0); ContainerId cId0 = createContainerId(0);
@ -1012,7 +1013,7 @@ public void testIncreaseContainerResourceWithInvalidResource() throws Exception
containerLaunchContext, containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager())); user, context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); List<StartContainerRequest> list = new ArrayList<>();
list.add(scRequest); list.add(scRequest);
StartContainersRequest allRequests = StartContainersRequest allRequests =
StartContainersRequest.newInstance(list); StartContainersRequest.newInstance(list);
@ -1022,7 +1023,7 @@ public void testIncreaseContainerResourceWithInvalidResource() throws Exception
org.apache.hadoop.yarn.server.nodemanager. org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING); containermanager.container.ContainerState.RUNNING);
// Construct container resource increase request, // Construct container resource increase request,
List<Token> increaseTokens = new ArrayList<Token>(); List<Token> increaseTokens = new ArrayList<>();
// Add increase request. The increase request should fail // Add increase request. The increase request should fail
// as the current resource does not fit in the target resource // as the current resource does not fit in the target resource
Token containerToken = Token containerToken =
@ -1096,7 +1097,7 @@ public void testChangeContainerResource() throws Exception {
createContainerToken(cId, DUMMY_RM_IDENTIFIER, createContainerToken(cId, DUMMY_RM_IDENTIFIER,
context.getNodeId(), user, context.getNodeId(), user,
context.getContainerTokenSecretManager())); context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); List<StartContainerRequest> list = new ArrayList<>();
list.add(scRequest); list.add(scRequest);
StartContainersRequest allRequests = StartContainersRequest allRequests =
StartContainersRequest.newInstance(list); StartContainersRequest.newInstance(list);
@ -1106,7 +1107,7 @@ public void testChangeContainerResource() throws Exception {
org.apache.hadoop.yarn.server.nodemanager. org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING); containermanager.container.ContainerState.RUNNING);
// Construct container resource increase request, // Construct container resource increase request,
List<Token> increaseTokens = new ArrayList<Token>(); List<Token> increaseTokens = new ArrayList<>();
// Add increase request. // Add increase request.
Resource targetResource = Resource.newInstance(4096, 2); Resource targetResource = Resource.newInstance(4096, 2);
Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER, Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER,
@ -1184,6 +1185,21 @@ public static Token createContainerToken(ContainerId cId, long rmIdentifier,
containerTokenIdentifier); containerTokenIdentifier);
} }
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
NodeId nodeId, String user, Resource resource,
NMContainerTokenSecretManager containerTokenSecretManager,
LogAggregationContext logAggregationContext, ExecutionType executionType)
throws IOException {
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
Priority.newInstance(0), 0, logAggregationContext, null,
ContainerType.TASK, executionType);
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
.retrievePassword(containerTokenIdentifier),
containerTokenIdentifier);
}
@Test @Test
public void testOutputThreadDumpSignal() throws IOException, public void testOutputThreadDumpSignal() throws IOException,
InterruptedException, YarnException { InterruptedException, YarnException {
@ -1241,7 +1257,7 @@ private void testContainerLaunchAndSignal(SignalContainerCommand command)
new HashMap<String, LocalResource>(); new HashMap<String, LocalResource>();
localResources.put(destinationFile, rsrc_alpha); localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources); containerLaunchContext.setLocalResources(localResources);
List<String> commands = new ArrayList<String>(); List<String> commands = new ArrayList<>();
commands.add("/bin/bash"); commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath()); commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands); containerLaunchContext.setCommands(commands);
@ -1250,7 +1266,7 @@ private void testContainerLaunchAndSignal(SignalContainerCommand command)
containerLaunchContext, containerLaunchContext,
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(), createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
user, context.getContainerTokenSecretManager())); user, context.getContainerTokenSecretManager()));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); List<StartContainerRequest> list = new ArrayList<>();
list.add(scRequest); list.add(scRequest);
StartContainersRequest allRequests = StartContainersRequest allRequests =
StartContainersRequest.newInstance(list); StartContainersRequest.newInstance(list);
@ -1267,7 +1283,7 @@ private void testContainerLaunchAndSignal(SignalContainerCommand command)
// Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent // Simulate NodeStatusUpdaterImpl sending CMgrSignalContainersEvent
SignalContainerRequest signalReq = SignalContainerRequest signalReq =
SignalContainerRequest.newInstance(cId, command); SignalContainerRequest.newInstance(cId, command);
List<SignalContainerRequest> reqs = new ArrayList<SignalContainerRequest>(); List<SignalContainerRequest> reqs = new ArrayList<>();
reqs.add(signalReq); reqs.add(signalReq);
containerManager.handle(new CMgrSignalContainersEvent(reqs)); containerManager.handle(new CMgrSignalContainersEvent(reqs));

View File

@ -22,6 +22,10 @@
public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin { public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin {
public MockResourceCalculatorPlugin() {
super(null);
}
@Override @Override
public long getVirtualMemorySize() { public long getVirtualMemorySize() {
return 0; return 0;

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -173,8 +174,8 @@ public void testContainersResourceChange() throws Exception {
assertTrue(containerEventHandler assertTrue(containerEventHandler
.isContainerKilled(getContainerId(1))); .isContainerKilled(getContainerId(1)));
// create container 2 // create container 2
containersMonitor.handle(new ContainerStartMonitoringEvent( containersMonitor.handle(new ContainerStartMonitoringEvent(getContainerId(
getContainerId(2), 2202009L, 1048576L, 1, 0, 0)); 2), 2202009L, 1048576L, 1, 0, 0));
// verify that this container is properly tracked // verify that this container is properly tracked
assertNotNull(getProcessTreeInfo(getContainerId(2))); assertNotNull(getProcessTreeInfo(getContainerId(2)));
assertEquals(1048576L, getProcessTreeInfo(getContainerId(2)) assertEquals(1048576L, getProcessTreeInfo(getContainerId(2))
@ -215,8 +216,8 @@ public void testContainersResourceChangeIsTriggeredImmediately()
// now waiting for the next monitor cycle // now waiting for the next monitor cycle
Thread.sleep(1000); Thread.sleep(1000);
// create a container with id 3 // create a container with id 3
containersMonitor.handle(new ContainerStartMonitoringEvent( containersMonitor.handle(new ContainerStartMonitoringEvent(getContainerId(
getContainerId(3), 2202009L, 1048576L, 1, 0, 0)); 3), 2202009L, 1048576L, 1, 0, 0));
// Verify that this container has been tracked // Verify that this container has been tracked
assertNotNull(getProcessTreeInfo(getContainerId(3))); assertNotNull(getProcessTreeInfo(getContainerId(3)));
// trigger a change resource event, check limit after change // trigger a change resource event, check limit after change

View File

@ -0,0 +1,301 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TestQueuingContainerManager extends TestContainerManager {
interface HasResources {
boolean decide(Context context, ContainerId cId);
}
public TestQueuingContainerManager() throws UnsupportedFileSystemException {
super();
}
static {
LOG = LogFactory.getLog(TestQueuingContainerManager.class);
}
HasResources hasResources = null;
boolean shouldDeleteWait = false;
@Override
protected ContainerManagerImpl
createContainerManager(DeletionService delSrvc) {
return new QueuingContainerManagerImpl(context, exec, delSrvc,
nodeStatusUpdater, metrics, dirsHandler) {
@Override
public void serviceInit(Configuration conf) throws Exception {
conf.set(
YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
MockResourceCalculatorPlugin.class.getCanonicalName());
conf.set(
YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
MockResourceCalculatorProcessTree.class.getCanonicalName());
super.serviceInit(conf);
}
@Override
public void
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}
@Override
protected UserGroupInformation getRemoteUgi() throws YarnException {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
ugi.addTokenIdentifier(new NMTokenIdentifier(appAttemptId, context
.getNodeId(), user, context.getNMTokenSecretManager().getCurrentKey()
.getKeyId()));
return ugi;
}
@Override
protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
if(container == null || container.getUser().equals("Fail")){
throw new YarnException("Reject this container");
}
}
@Override
protected ContainersMonitor createContainersMonitor(ContainerExecutor
exec) {
return new ContainersMonitorImpl(exec, dispatcher, this.context) {
@Override
public boolean hasResourcesAvailable(
ContainersMonitorImpl.ProcessTreeInfo pti) {
return hasResources.decide(this.context, pti.getContainerId());
}
};
}
};
}
@Override
protected DeletionService createDeletionService() {
return new DeletionService(exec) {
@Override
public void delete(String user, Path subDir, Path... baseDirs) {
// Don't do any deletions.
if (shouldDeleteWait) {
try {
Thread.sleep(10000);
LOG.info("\n\nSleeping Pseudo delete : user - " + user + ", " +
"subDir - " + subDir + ", " +
"baseDirs - " + Arrays.asList(baseDirs));
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
LOG.info("\n\nPseudo delete : user - " + user + ", " +
"subDir - " + subDir + ", " +
"baseDirs - " + Arrays.asList(baseDirs));
}
}
};
}
@Override
public void setup() throws IOException {
super.setup();
shouldDeleteWait = false;
hasResources = new HasResources() {
@Override
public boolean decide(Context context, ContainerId cId) {
return true;
}
};
}
/**
* Test to verify that an OPPORTUNISTIC container is killed when
* a GUARANTEED container arrives and all the Node Resources are used up
*
* For this specific test case, 4 containers are requested (last one being
* guaranteed). Assumptions :
* 1) The first OPPORTUNISTIC Container will start running
* 2) The second and third OPP containers will be queued
* 3) When the GUARANTEED container comes in, the running OPP container
* will be killed to make room
* 4) After the GUARANTEED container finishes, the remaining 2 OPP
* containers will be dequeued and run.
* 5) Only the first OPP container will be killed.
*
* @throws Exception
*/
@Test
public void testSimpleOpportunisticContainer() throws Exception {
shouldDeleteWait = true;
containerManager.start();
// ////// Create the resources for the container
File dir = new File(tmpDir, "dir");
dir.mkdirs();
File file = new File(dir, "file");
PrintWriter fileWriter = new PrintWriter(file);
fileWriter.write("Hello World!");
fileWriter.close();
// ////// Construct the container-spec.
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
URL resource_alpha =
ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(file.getAbsolutePath())));
LocalResource rsrc_alpha =
recordFactory.newRecordInstance(LocalResource.class);
rsrc_alpha.setResource(resource_alpha);
rsrc_alpha.setSize(-1);
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(file.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
// Start 3 OPPORTUNISTIC containers and 1 GUARANTEED container
List<StartContainerRequest> list = new ArrayList<>();
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(1024, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(1024, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, BuilderUtils.newResource(1024, 1),
context.getContainerTokenSecretManager(), null,
ExecutionType.OPPORTUNISTIC)));
// GUARANTEED
list.add(StartContainerRequest.newInstance(
containerLaunchContext,
createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
context.getNodeId(),
user, context.getContainerTokenSecretManager())));
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
// Plugin to simulate that the Node is full
// It only allows 1 container to run at a time.
hasResources = new HasResources() {
@Override
public boolean decide(Context context, ContainerId cId) {
int nOpp = ((QueuingContainerManagerImpl) containerManager)
.getNumAllocatedOpportunisticContainers();
int nGuar = ((QueuingContainerManagerImpl) containerManager)
.getNumAllocatedGuaranteedContainers();
boolean val = (nOpp + nGuar < 1);
System.out.println("\nHasResources : [" + cId + "]," +
"Opp[" + nOpp + "], Guar[" + nGuar + "], [" + val + "]\n");
return val;
}
};
containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForContainerState(containerManager,
createContainerId(3),
ContainerState.COMPLETE, 40);
List<ContainerId> statList = new ArrayList<ContainerId>();
for (int i = 0; i < 4; i++) {
statList.add(createContainerId(i));
}
GetContainerStatusesRequest statRequest =
GetContainerStatusesRequest.newInstance(statList);
List<ContainerStatus> containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
// Ensure that the first opportunistic container is killed
if (status.getContainerId().equals(createContainerId(0))) {
Assert.assertTrue(status.getDiagnostics()
.contains("Container killed by the ApplicationMaster"));
}
System.out.println("\nStatus : [" + status + "]\n");
}
}
}