From 6341c3a437489737a9c4bf0911b218b0023d8dd9 Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Thu, 10 May 2018 11:01:01 -0700 Subject: [PATCH] YARN-7715. Support NM promotion/demotion of running containers. (Miklos Szegedi via Haibo Chen) --- .../CGroupsBlkioResourceHandlerImpl.java | 6 ++ .../CGroupsCpuResourceHandlerImpl.java | 75 +++++++++++-------- .../CGroupsMemoryResourceHandlerImpl.java | 73 ++++++++++-------- .../NetworkPacketTaggingHandlerImpl.java | 6 ++ .../linux/resources/ResourceHandler.java | 16 +++- .../linux/resources/ResourceHandlerChain.java | 18 +++++ .../TrafficControlBandwidthHandlerImpl.java | 6 ++ .../fpga/FpgaResourceHandlerImpl.java | 6 ++ .../resources/gpu/GpuResourceHandlerImpl.java | 6 ++ .../numa/NumaResourceHandlerImpl.java | 6 ++ .../scheduler/ContainerScheduler.java | 30 ++++++++ .../TestCGroupsCpuResourceHandlerImpl.java | 1 + .../TestCGroupsMemoryResourceHandlerImpl.java | 1 + .../TestResourcePluginManager.java | 6 ++ .../TestContainerSchedulerQueuing.java | 10 +++ 15 files changed, 202 insertions(+), 64 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java index 42fc63420d..2c402c013d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsBlkioResourceHandlerImpl.java @@ -155,6 +155,12 @@ public List reacquireContainer(ContainerId containerId) return null; } + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + return null; + } + @Override public List postComplete(ContainerId containerId) throws ResourceHandlerException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java index 7ea7be2c06..37221f4555 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsCpuResourceHandlerImpl.java @@ -186,38 +186,8 @@ public static int[] getOverallLimits(float yarnProcessors) { public List preStart(Container container) throws ResourceHandlerException { String cgroupId = container.getContainerId().toString(); - Resource containerResource = container.getResource(); cGroupsHandler.createCGroup(CPU, cgroupId); - try { - int containerVCores = containerResource.getVirtualCores(); - ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); - if (id != null && id.getExecutionType() == - ExecutionType.OPPORTUNISTIC) { - cGroupsHandler - .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES, - String.valueOf(CPU_DEFAULT_WEIGHT_OPPORTUNISTIC)); - } else { - int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; - cGroupsHandler - .updateCGroupParam(CPU, cgroupId, CGroupsHandler.CGROUP_CPU_SHARES, - String.valueOf(cpuShares)); - } - if (strictResourceUsageMode) { - if (nodeVCores != containerVCores) { - float containerCPU = - (containerVCores * yarnProcessors) / (float) nodeVCores; - int[] limits = getOverallLimits(containerCPU); - cGroupsHandler.updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(limits[0])); - cGroupsHandler.updateCGroupParam(CPU, cgroupId, - CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(limits[1])); - } - } - } catch (ResourceHandlerException re) { - cGroupsHandler.deleteCGroup(CPU, cgroupId); - LOG.warn("Could not update cgroup for container", re); - throw re; - } + updateContainer(container); List ret = new ArrayList<>(); ret.add(new PrivilegedOperation( PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, @@ -232,6 +202,49 @@ public List reacquireContainer(ContainerId containerId) return null; } + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + Resource containerResource = container.getResource(); + String cgroupId = container.getContainerId().toString(); + File cgroup = new File(cGroupsHandler.getPathForCGroup(CPU, cgroupId)); + if (cgroup.exists()) { + try { + int containerVCores = containerResource.getVirtualCores(); + ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); + if (id != null && id.getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + cGroupsHandler + .updateCGroupParam(CPU, cgroupId, + CGroupsHandler.CGROUP_CPU_SHARES, + String.valueOf(CPU_DEFAULT_WEIGHT_OPPORTUNISTIC)); + } else { + int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; + cGroupsHandler + .updateCGroupParam(CPU, cgroupId, + CGroupsHandler.CGROUP_CPU_SHARES, + String.valueOf(cpuShares)); + } + if (strictResourceUsageMode) { + if (nodeVCores != containerVCores) { + float containerCPU = + (containerVCores * yarnProcessors) / (float) nodeVCores; + int[] limits = getOverallLimits(containerCPU); + cGroupsHandler.updateCGroupParam(CPU, cgroupId, + CGroupsHandler.CGROUP_CPU_PERIOD_US, String.valueOf(limits[0])); + cGroupsHandler.updateCGroupParam(CPU, cgroupId, + CGroupsHandler.CGROUP_CPU_QUOTA_US, String.valueOf(limits[1])); + } + } + } catch (ResourceHandlerException re) { + cGroupsHandler.deleteCGroup(CPU, cgroupId); + LOG.warn("Could not update cgroup for container", re); + throw re; + } + } + return null; + } + @Override public List postComplete(ContainerId containerId) throws ResourceHandlerException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java index 558751fb9b..2d1585e95d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import java.io.File; import java.util.ArrayList; import java.util.List; @@ -119,43 +120,53 @@ public List reacquireContainer(ContainerId containerId) } @Override - public List preStart(Container container) + public List updateContainer(Container container) throws ResourceHandlerException { - String cgroupId = container.getContainerId().toString(); - //memory is in MB - long containerSoftLimit = - (long) (container.getResource().getMemorySize() * this.softLimit); - long containerHardLimit = container.getResource().getMemorySize(); - cGroupsHandler.createCGroup(MEMORY, cgroupId); - if (enforce) { - try { - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, - String.valueOf(containerHardLimit) + "M"); - ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); - if (id != null && id.getExecutionType() == - ExecutionType.OPPORTUNISTIC) { + File cgroup = new File(cGroupsHandler.getPathForCGroup(MEMORY, cgroupId)); + if (cgroup.exists()) { + //memory is in MB + long containerSoftLimit = + (long) (container.getResource().getMemorySize() * this.softLimit); + long containerHardLimit = container.getResource().getMemorySize(); + if (enforce) { + try { cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, - String.valueOf(OPPORTUNISTIC_SOFT_LIMIT) + "M"); - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, - String.valueOf(OPPORTUNISTIC_SWAPPINESS)); - } else { - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, - String.valueOf(containerSoftLimit) + "M"); - cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, - CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, - String.valueOf(swappiness)); + CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, + String.valueOf(containerHardLimit) + "M"); + ContainerTokenIdentifier id = container.getContainerTokenIdentifier(); + if (id != null && id.getExecutionType() == + ExecutionType.OPPORTUNISTIC) { + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, + String.valueOf(OPPORTUNISTIC_SOFT_LIMIT) + "M"); + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, + String.valueOf(OPPORTUNISTIC_SWAPPINESS)); + } else { + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES, + String.valueOf(containerSoftLimit) + "M"); + cGroupsHandler.updateCGroupParam(MEMORY, cgroupId, + CGroupsHandler.CGROUP_PARAM_MEMORY_SWAPPINESS, + String.valueOf(swappiness)); + } + } catch (ResourceHandlerException re) { + cGroupsHandler.deleteCGroup(MEMORY, cgroupId); + LOG.warn("Could not update cgroup for container", re); + throw re; } - } catch (ResourceHandlerException re) { - cGroupsHandler.deleteCGroup(MEMORY, cgroupId); - LOG.warn("Could not update cgroup for container", re); - throw re; } } + return null; + } + + @Override + public List preStart(Container container) + throws ResourceHandlerException { + String cgroupId = container.getContainerId().toString(); + cGroupsHandler.createCGroup(MEMORY, cgroupId); + updateContainer(container); List ret = new ArrayList<>(); ret.add(new PrivilegedOperation( PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkPacketTaggingHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkPacketTaggingHandlerImpl.java index 1580e2c4ed..3f6d4b6340 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkPacketTaggingHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkPacketTaggingHandlerImpl.java @@ -128,6 +128,12 @@ public List reacquireContainer(ContainerId containerId) return null; } + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + return null; + } + /** * Cleanup operation once container is completed - deletes cgroup. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandler.java index 3dfc86b3e6..35c64608ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandler.java @@ -61,7 +61,7 @@ List preStart(Container container) /** * Require state for container that was already launched * - * @param containerId if of the container being reacquired. + * @param containerId id of the container being reacquired. * @return (possibly empty) list of operations that require elevated * privileges * @throws ResourceHandlerException @@ -71,7 +71,19 @@ List reacquireContainer(ContainerId containerId) throws ResourceHandlerException; /** - * Perform any tasks necessary after container completion + * Update state for container that was already launched + * + * @param container the container being updated. + * @return (possibly empty) list of operations that require elevated + * privileges + * @throws ResourceHandlerException + */ + + List updateContainer(Container container) + throws ResourceHandlerException; + + /** + * Perform any tasks necessary after container completion. * @param containerId of the container that was completed. * @return (possibly empty) list of operations that require elevated * privileges diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerChain.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerChain.java index 72bf30ce87..2fc301a4ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerChain.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerChain.java @@ -100,6 +100,24 @@ public List reacquireContainer(ContainerId containerId) return allOperations; } + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + List allOperations = new + ArrayList(); + + for (ResourceHandler resourceHandler : resourceHandlers) { + List handlerOperations = + resourceHandler.updateContainer(container); + + if (handlerOperations != null) { + allOperations.addAll(handlerOperations); + } + + } + return allOperations; + } + @Override public List postComplete(ContainerId containerId) throws ResourceHandlerException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java index 126685f338..c04e935c78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TrafficControlBandwidthHandlerImpl.java @@ -203,6 +203,12 @@ public List reacquireContainer(ContainerId containerId) return null; } + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + return null; + } + /** * Returns total bytes sent per container to be used for metrics tracking * purposes. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java index bf3d9b072d..11f7114a40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java @@ -205,6 +205,12 @@ public List reacquireContainer(ContainerId containerId) thr return null; } + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + return null; + } + @Override public List postComplete(ContainerId containerId) throws ResourceHandlerException { allocator.cleanupAssignFpgas(containerId.toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java index 8ddc227316..587fcb4983 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/gpu/GpuResourceHandlerImpl.java @@ -164,6 +164,12 @@ public List reacquireContainer(ContainerId containerId) return null; } + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + return null; + } + @Override public synchronized List postComplete( ContainerId containerId) throws ResourceHandlerException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceHandlerImpl.java index 128dacaea4..8ffba24235 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/numa/NumaResourceHandlerImpl.java @@ -94,6 +94,12 @@ public List reacquireContainer(ContainerId containerId) return null; } + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + return null; + } + @Override public List postComplete(ContainerId containerId) throws ResourceHandlerException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 57368ab5ca..5cdcf414b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -34,6 +34,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor .ChangeMonitoringContainerResourceEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; @@ -105,6 +108,9 @@ public class ContainerScheduler extends AbstractService implements private Boolean usePauseEventForPreemption = false; + @VisibleForTesting + ResourceHandlerChain resourceHandlerChain = null; + /** * Instantiate a Container Scheduler. * @param context NodeManager Context. @@ -123,6 +129,24 @@ public ContainerScheduler(Context context, AsyncDispatcher dispatcher, @Override public void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); + try { + if (resourceHandlerChain == null) { + resourceHandlerChain = ResourceHandlerModule + .getConfiguredResourceHandlerChain(conf, context); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Resource handler chain enabled = " + (resourceHandlerChain + != null)); + } + if (resourceHandlerChain != null) { + LOG.debug("Bootstrapping resource handler chain"); + resourceHandlerChain.bootstrap(conf); + } + } catch (ResourceHandlerException e) { + LOG.error("Failed to bootstrap configured resource subsystems! ", e); + throw new IOException( + "Failed to bootstrap configured resource subsystems!"); + } this.usePauseEventForPreemption = conf.getBoolean( YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION, @@ -218,6 +242,12 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { updateEvent.getContainer()); } } + try { + resourceHandlerChain.updateContainer(updateEvent.getContainer()); + } catch (Exception ex) { + LOG.warn(String.format("Could not update resources on " + + "continer update of %s", containerId), ex); + } startPendingContainers(maxOppQueueLength <= 0); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsCpuResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsCpuResourceHandlerImpl.java index 006b0601ed..842fc6b437 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsCpuResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsCpuResourceHandlerImpl.java @@ -49,6 +49,7 @@ public class TestCGroupsCpuResourceHandlerImpl { @Before public void setup() { mockCGroupsHandler = mock(CGroupsHandler.class); + when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn("."); cGroupsCpuResourceHandler = new CGroupsCpuResourceHandlerImpl(mockCGroupsHandler); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java index 78ccc617a0..416b4fd491 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestCGroupsMemoryResourceHandlerImpl.java @@ -45,6 +45,7 @@ public class TestCGroupsMemoryResourceHandlerImpl { @Before public void setup() { mockCGroupsHandler = mock(CGroupsHandler.class); + when(mockCGroupsHandler.getPathForCGroup(any(), any())).thenReturn("."); cGroupsMemoryResourceHandler = new CGroupsMemoryResourceHandlerImpl(mockCGroupsHandler); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java index bcadf76e4b..6ed7c56889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java @@ -116,6 +116,12 @@ public List reacquireContainer(ContainerId containerId) return null; } + @Override + public List updateContainer(Container container) + throws ResourceHandlerException { + return null; + } + @Override public List postComplete(ContainerId containerId) throws ResourceHandlerException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java index 5c72e7e8b4..1da7e4afa8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; @@ -72,7 +73,11 @@ import org.junit.Test; import org.slf4j.LoggerFactory; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; /** * Tests to verify that the {@link ContainerScheduler} is able to queue and @@ -1183,6 +1188,8 @@ public void testPromotionOfOpportunisticContainers() throws Exception { ContainerScheduler containerScheduler = containerManager.getContainerScheduler(); + containerScheduler.resourceHandlerChain = + mock(ResourceHandlerChain.class); // Ensure two containers are properly queued. Assert.assertEquals(1, containerScheduler.getNumQueuedContainers()); Assert.assertEquals(0, @@ -1246,6 +1253,9 @@ public void testPromotionOfOpportunisticContainers() throws Exception { ContainerEventType.INIT_CONTAINER, ContainerEventType.UPDATE_CONTAINER_TOKEN, ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes); + verify(containerScheduler.resourceHandlerChain, + times(1)) + .updateContainer(any()); } @Test