From 21101c01f242439ec8ec40fb3a9ab1991ae0adc7 Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 6 Mar 2015 14:17:57 -0800 Subject: [PATCH] YARN-2190. Added CPU and memory limit options to the default container executor for Windows containers. Contributed by Chuan Liu --- BUILDING.txt | 9 +- .../hadoop-common/src/main/winutils/task.c | 144 +++++++++++++++--- .../src/main/winutils/win8sdk.props | 28 ++++ .../src/main/winutils/winutils.vcxproj | 3 + .../org/apache/hadoop/util/TestWinUtils.java | 62 ++++++++ hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 12 ++ .../src/main/resources/yarn-default.xml | 14 ++ .../server/nodemanager/ContainerExecutor.java | 49 +++++- .../nodemanager/DefaultContainerExecutor.java | 9 +- .../WindowsSecureContainerExecutor.java | 9 +- .../nodemanager/TestContainerExecutor.java | 53 +++++++ 12 files changed, 360 insertions(+), 35 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props diff --git a/BUILDING.txt b/BUILDING.txt index 6e38ad374a..b60da6c416 100644 --- a/BUILDING.txt +++ b/BUILDING.txt @@ -209,7 +209,8 @@ Requirements: * Findbugs 1.3.9 (if running findbugs) * ProtocolBuffer 2.5.0 * CMake 2.6 or newer -* Windows SDK or Visual Studio 2010 Professional +* Windows SDK 7.1 or Visual Studio 2010 Professional +* Windows SDK 8.1 (if building CPU rate control for the container executor) * zlib headers (if building native code bindings for zlib) * Internet connection for first build (to fetch all Maven and Hadoop dependencies) * Unix command-line tools from GnuWin32: sh, mkdir, rm, cp, tar, gzip. These @@ -220,11 +221,15 @@ can be downloaded from http://git-scm.com/download/win. If using Visual Studio, it must be Visual Studio 2010 Professional (not 2012). Do not use Visual Studio Express. It does not support compiling for 64-bit, -which is problematic if running a 64-bit system. The Windows SDK is free to +which is problematic if running a 64-bit system. The Windows SDK 7.1 is free to download here: http://www.microsoft.com/en-us/download/details.aspx?id=8279 +The Windows SDK 8.1 is available to download at: + +http://msdn.microsoft.com/en-us/windows/bg162891.aspx + Cygwin is neither required nor supported. ---------------------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/task.c b/hadoop-common-project/hadoop-common/src/main/winutils/task.c index 21b1893953..37c6ca136a 100644 --- a/hadoop-common-project/hadoop-common/src/main/winutils/task.c +++ b/hadoop-common-project/hadoop-common/src/main/winutils/task.c @@ -49,6 +49,31 @@ typedef enum TaskCommandOptionType TaskProcessList } TaskCommandOption; + //---------------------------------------------------------------------------- +// Function: GetLimit +// +// Description: +// Get the resource limit value in long type given the command line argument. +// +// Returns: +// TRUE: If successfully get the value +// FALSE: otherwise +static BOOL GetLimit(__in const wchar_t *str, __out long *value) +{ + wchar_t *end = NULL; + if (str == NULL || value == NULL) return FALSE; + *value = wcstol(str, &end, 10); + if (end == NULL || *end != '\0') + { + *value = -1; + return FALSE; + } + else + { + return TRUE; + } +} + //---------------------------------------------------------------------------- // Function: ParseCommandLine // @@ -61,7 +86,9 @@ typedef enum TaskCommandOptionType // FALSE: otherwise static BOOL ParseCommandLine(__in int argc, __in_ecount(argc) wchar_t *argv[], - __out TaskCommandOption *command) + __out TaskCommandOption *command, + __out_opt long *memory, + __out_opt long *vcore) { *command = TaskInvalid; @@ -88,9 +115,44 @@ static BOOL ParseCommandLine(__in int argc, } } - if (argc == 4) { + if (argc >= 4 && argc <= 8) { if (wcscmp(argv[1], L"create") == 0) { + int i; + for (i = 2; i < argc - 3; i++) + { + if (wcscmp(argv[i], L"-c") == 0) + { + if (vcore != NULL && !GetLimit(argv[i + 1], vcore)) + { + return FALSE; + } + else + { + i++; + continue; + } + } + else if (wcscmp(argv[i], L"-m") == 0) + { + if (memory != NULL && !GetLimit(argv[i + 1], memory)) + { + return FALSE; + } + else + { + i++; + continue; + } + } + else + { + break; + } + } + if (argc - i != 2) + return FALSE; + *command = TaskCreate; return TRUE; } @@ -573,7 +635,7 @@ done: // ERROR_SUCCESS: On success // GetLastError: otherwise DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PCWSTR cmdLine, - __in LPCWSTR userName) + __in LPCWSTR userName, __in long memory, __in long cpuRate) { DWORD dwErrorCode = ERROR_SUCCESS; DWORD exitCode = EXIT_FAILURE; @@ -616,6 +678,12 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PC return dwErrorCode; } jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; + if (memory > 0) + { + jeli.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_JOB_MEMORY; + jeli.ProcessMemoryLimit = ((SIZE_T) memory) * 1024 * 1024; + jeli.JobMemoryLimit = ((SIZE_T) memory) * 1024 * 1024; + } if(SetInformationJobObject(jobObject, JobObjectExtendedLimitInformation, &jeli, @@ -626,6 +694,24 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in PC CloseHandle(jobObject); return dwErrorCode; } +#ifdef NTDDI_WIN8 + if (cpuRate > 0) + { + JOBOBJECT_CPU_RATE_CONTROL_INFORMATION jcrci = { 0 }; + SYSTEM_INFO sysinfo; + GetSystemInfo(&sysinfo); + jcrci.ControlFlags = JOB_OBJECT_CPU_RATE_CONTROL_ENABLE | + JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP; + jcrci.CpuRate = min(10000, cpuRate); + if(SetInformationJobObject(jobObject, JobObjectCpuRateControlInformation, + &jcrci, sizeof(jcrci)) == 0) + { + dwErrorCode = GetLastError(); + CloseHandle(jobObject); + return dwErrorCode; + } + } +#endif if (logonHandle != NULL) { dwErrorCode = AddNodeManagerAndUserACEsToObject(jobObject, userName, JOB_OBJECT_ALL_ACCESS); @@ -809,10 +895,10 @@ create_process_done: // Returns: // ERROR_SUCCESS: On success // GetLastError: otherwise -DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) +DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine, __in long memory, __in long cpuRate) { // call with null logon in order to create tasks utilizing the current logon - return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL); + return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL, memory, cpuRate); } //---------------------------------------------------------------------------- @@ -893,7 +979,7 @@ DWORD CreateTaskAsUser(__in PCWSTR jobObjName, goto done; } - err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user); + err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user, -1, -1); done: if( profileIsLoaded ) { @@ -1095,6 +1181,8 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]) { DWORD dwErrorCode = ERROR_SUCCESS; TaskCommandOption command = TaskInvalid; + long memory = -1; + long cpuRate = -1; wchar_t* cmdLine = NULL; wchar_t buffer[16*1024] = L""; // 32K max command line size_t charCountBufferLeft = sizeof(buffer)/sizeof(wchar_t); @@ -1111,7 +1199,7 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]) ARGC_COMMAND_ARGS }; - if (!ParseCommandLine(argc, argv, &command)) { + if (!ParseCommandLine(argc, argv, &command, &memory, &cpuRate)) { dwErrorCode = ERROR_INVALID_COMMAND_LINE; fwprintf(stderr, L"Incorrect command line arguments.\n\n"); @@ -1123,7 +1211,7 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[]) { // Create the task jobobject // - dwErrorCode = CreateTask(argv[2], argv[3]); + dwErrorCode = CreateTask(argv[argc-2], argv[argc-1], memory, cpuRate); if (dwErrorCode != ERROR_SUCCESS) { ReportErrorCode(L"CreateTask", dwErrorCode); @@ -1238,18 +1326,30 @@ void TaskUsage() // jobobject's are being used. // ProcessTree.isSetsidSupported() fwprintf(stdout, L"\ - Usage: task create [TASKNAME] [COMMAND_LINE] |\n\ - task createAsUser [TASKNAME] [USERNAME] [PIDFILE] [COMMAND_LINE] |\n\ - task isAlive [TASKNAME] |\n\ - task kill [TASKNAME]\n\ - task processList [TASKNAME]\n\ - Creates a new task jobobject with taskname\n\ - Creates a new task jobobject with taskname as the user provided\n\ - Checks if task jobobject is alive\n\ - Kills task jobobject\n\ - Prints to stdout a list of processes in the task\n\ - along with their resource usage. One process per line\n\ - and comma separated info per process\n\ - ProcessId,VirtualMemoryCommitted(bytes),\n\ - WorkingSetSize(bytes),CpuTime(Millisec,Kernel+User)\n"); +Usage: task create [OPTOINS] [TASKNAME] [COMMAND_LINE]\n\ + Creates a new task job object with taskname and options to set CPU\n\ + and memory limits on the job object\n\ +\n\ + OPTIONS: -c [cup rate] set the cpu rate limit on the job object.\n\ + -m [memory] set the memory limit on the job object.\n\ + The cpu limit is an integral value of percentage * 100. The memory\n\ + limit is an integral number of memory in MB. \n\ + The limit will not be set if 0 or negative value is passed in as\n\ + parameter(s).\n\ +\n\ + task createAsUser [TASKNAME] [USERNAME] [PIDFILE] [COMMAND_LINE]\n\ + Creates a new task jobobject with taskname as the user provided\n\ +\n\ + task isAlive [TASKNAME]\n\ + Checks if task job object is alive\n\ +\n\ + task kill [TASKNAME]\n\ + Kills task job object\n\ +\n\ + task processList [TASKNAME]\n\ + Prints to stdout a list of processes in the task\n\ + along with their resource usage. One process per line\n\ + and comma separated info per process\n\ + ProcessId,VirtualMemoryCommitted(bytes),\n\ + WorkingSetSize(bytes),CpuTime(Millisec,Kernel+User)\n"); } diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props b/hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props new file mode 100644 index 0000000000..503b37ad5c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props @@ -0,0 +1,28 @@ + + + + + + + $(VCInstallDir)bin\x86_amd64;$(VCInstallDir)bin;$(WindowsSdkDir)bin\NETFX 4.0 Tools;$(MSBuildProgramFiles32)\Windows Kits\8.1\bin\x86;$(VSInstallDir)Common7\Tools\bin;$(VSInstallDir)Common7\tools;$(VSInstallDir)Common7\ide;$(MSBuildProgramFiles32)\HTML Help Workshop;$(FrameworkSDKDir)\bin;$(MSBuildToolsPath32);$(VSInstallDir);$(SystemRoot)\SysWow64;$(FxCopDir);$(PATH) + $(MSBuildProgramFiles32)\Windows Kits\8.1\Include\um;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\shared;$(VCInstallDir)include;$(VCInstallDir)atlmfc\include;$(FrameworkSDKDir)\include; + $(VCInstallDir)lib\amd64;$(VCInstallDir)atlmfc\lib\amd64;$(MSBuildProgramFiles32)\Windows Kits\8.1\lib\win8\um\x64;$(MSBuildProgramFiles32)\Windows Kits\8.1\Lib\winv6.3\um\x64;$(FrameworkSDKDir)\lib\x64 + $(VCInstallDir)include;$(VCInstallDir)atlmfc\include;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\um;$(MSBuildProgramFiles32)\Windows Kits\8.1\Include\shared;$(FrameworkSDKDir)\include;$(MSBuildToolsPath32);$(VCInstallDir)atlmfc\lib;$(VCInstallDir)lib; + + + diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj b/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj index 9ecba0a87a..76a74147e1 100644 --- a/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj +++ b/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj @@ -67,6 +67,9 @@ + + + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java index 8ac6e40a04..987c7068a8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java @@ -547,4 +547,66 @@ public void testTaskCreate() throws IOException { assertThat(outNumber, containsString(testNumber)); } + + @Test (timeout = 30000) + public void testTaskCreateWithLimits() throws IOException { + // Generate a unique job id + String jobId = String.format("%f", Math.random()); + + // Run a task without any options + String out = Shell.execCommand(Shell.WINUTILS, "task", "create", + "job" + jobId, "cmd /c echo job" + jobId); + assertTrue(out.trim().equals("job" + jobId)); + + // Run a task without any limits + jobId = String.format("%f", Math.random()); + out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-1", "-m", + "-1", "job" + jobId, "cmd /c echo job" + jobId); + assertTrue(out.trim().equals("job" + jobId)); + + // Run a task with limits (128MB should be enough for a cmd) + jobId = String.format("%f", Math.random()); + out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "10000", "-m", + "128", "job" + jobId, "cmd /c echo job" + jobId); + assertTrue(out.trim().equals("job" + jobId)); + + // Run a task without enough memory + try { + jobId = String.format("%f", Math.random()); + out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-m", "128", "job" + + jobId, "java -Xmx256m -version"); + fail("Failed to get Shell.ExitCodeException with insufficient memory"); + } catch (Shell.ExitCodeException ece) { + assertThat(ece.getExitCode(), is(1)); + } + + // Run tasks with wrong parameters + // + try { + jobId = String.format("%f", Math.random()); + Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-1", "-m", + "-1", "foo", "job" + jobId, "cmd /c echo job" + jobId); + fail("Failed to get Shell.ExitCodeException with bad parameters"); + } catch (Shell.ExitCodeException ece) { + assertThat(ece.getExitCode(), is(1639)); + } + + try { + jobId = String.format("%f", Math.random()); + Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-m", "-1", + "job" + jobId, "cmd /c echo job" + jobId); + fail("Failed to get Shell.ExitCodeException with bad parameters"); + } catch (Shell.ExitCodeException ece) { + assertThat(ece.getExitCode(), is(1639)); + } + + try { + jobId = String.format("%f", Math.random()); + Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "foo", + "job" + jobId, "cmd /c echo job" + jobId); + fail("Failed to get Shell.ExitCodeException with bad parameters"); + } catch (Shell.ExitCodeException ece) { + assertThat(ece.getExitCode(), is(1639)); + } + } } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d073169d38..c2aa2eff40 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -363,6 +363,9 @@ Release 2.7.0 - UNRELEASED YARN-1809. Synchronize RM and TimeLineServer Web-UIs. (Zhijie Shen and Xuan Gong via jianhe) + YARN-2190. Added CPU and memory limit options to the default container + executor for Windows containers. (Chuan Liu via jianhe) + OPTIMIZATIONS YARN-2990. FairScheduler's delay-scheduling always waits for node-local and diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 25b808e782..8c83fea9cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1027,6 +1027,18 @@ private static void addDeprecatedKeys() { public static final long DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY = 20; + /** + * Indicates if memory and CPU limits will be set for the Windows Job + * Object for the containers launched by the default container executor. + */ + public static final String NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED = + NM_PREFIX + "windows-container.memory-limit.enabled"; + public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED = false; + + public static final String NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED = + NM_PREFIX + "windows-container.cpu-limit.enabled"; + public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED = false; + /** /* The Windows group that the windows-secure-container-executor should run as. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index df730d565c..66400c8831 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1074,6 +1074,20 @@ false + + This flag determines whether memory limit will be set for the Windows Job + Object of the containers launched by the default container executor. + yarn.nodemanager.windows-container.memory-limit.enabled + false + + + + This flag determines whether CPU limit will be set for the Windows Job + Object of the containers launched by the default container executor. + yarn.nodemanager.windows-container.cpu-limit.enabled + false + + T-file compression types used to compress aggregated logs. yarn.nodemanager.log-aggregation.compression-type diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 77193df328..248a393719 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; @@ -298,6 +299,11 @@ protected Path getPidFilePath(ContainerId containerId) { readLock.unlock(); } } + + protected String[] getRunCommand(String command, String groupId, + String userName, Path pidFile, Configuration conf) { + return getRunCommand(command, groupId, userName, pidFile, conf, null); + } /** * Return a command to execute the given command in OS shell. @@ -306,7 +312,7 @@ protected Path getPidFilePath(ContainerId containerId) { * non-Windows, groupId is ignored. */ protected String[] getRunCommand(String command, String groupId, - String userName, Path pidFile, Configuration conf) { + String userName, Path pidFile, Configuration conf, Resource resource) { boolean containerSchedPriorityIsSet = false; int containerSchedPriorityAdjustment = YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY; @@ -320,7 +326,46 @@ protected String[] getRunCommand(String command, String groupId, } if (Shell.WINDOWS) { - return new String[] { Shell.WINUTILS, "task", "create", groupId, + int cpuRate = -1; + int memory = -1; + if (resource != null) { + if (conf + .getBoolean( + YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, + YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) { + memory = resource.getMemory(); + } + + if (conf.getBoolean( + YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, + YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) { + int containerVCores = resource.getVirtualCores(); + int nodeVCores = conf.getInt(YarnConfiguration.NM_VCORES, + YarnConfiguration.DEFAULT_NM_VCORES); + // cap overall usage to the number of cores allocated to YARN + int nodeCpuPercentage = Math + .min( + conf.getInt( + YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, + YarnConfiguration.DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT), + 100); + nodeCpuPercentage = Math.max(0, nodeCpuPercentage); + if (nodeCpuPercentage == 0) { + String message = "Illegal value for " + + YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT + + ". Value cannot be less than or equal to 0."; + throw new IllegalArgumentException(message); + } + float yarnVCores = (nodeCpuPercentage * nodeVCores) / 100.0f; + // CPU should be set to a percentage * 100, e.g. 20% cpu rate limit + // should be set as 20 * 100. The following setting is equal to: + // 100 * (100 * (vcores / Total # of cores allocated to YARN)) + cpuRate = Math.min(10000, + (int) ((containerVCores * 10000) / yarnVCores)); + } + } + return new String[] { Shell.WINUTILS, "task", "create", "-m", + String.valueOf(memory), "-c", String.valueOf(cpuRate), groupId, "cmd /c " + command }; } else { List retCommand = new ArrayList(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index f3d21210b8..e0ecea322e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -48,6 +48,7 @@ import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -202,7 +203,7 @@ public int launchContainer(Container container, setScriptExecutable(sb.getWrapperScriptPath(), user); shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(), - containerIdStr, user, pidFile, + containerIdStr, user, pidFile, container.getResource(), new File(containerWorkDir.toUri().getPath()), container.getLaunchContext().getEnvironment()); @@ -256,12 +257,12 @@ public int launchContainer(Container container, } protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, - String containerIdStr, String user, Path pidFile, File wordDir, - Map environment) + String containerIdStr, String user, Path pidFile, Resource resource, + File wordDir, Map environment) throws IOException { String[] command = getRunCommand(wrapperScriptPath, - containerIdStr, user, pidFile, this.getConf()); + containerIdStr, user, pidFile, this.getConf(), resource); LOG.info("launchContainer: " + Arrays.toString(command)); return new ShellCommandExecutor( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java index cd3e71a8d6..b7bec5f0cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java @@ -52,6 +52,7 @@ import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.CommandExecutor; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -727,11 +728,9 @@ public void startLocalizer(Path nmPrivateContainerTokens, } @Override - protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, - String containerIdStr, - String userName, Path pidFile,File wordDir, Map environment) - throws IOException { - + protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, + String containerIdStr, String userName, Path pidFile, Resource resource, + File wordDir, Map environment) throws IOException { return new WintuilsProcessStubExecutor( wordDir.toString(), containerIdStr, userName, pidFile.toString(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java index fd3634bd1c..dc3e941803 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java @@ -18,13 +18,21 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.util.Arrays; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; +import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.junit.Assert; import org.junit.Test; + import static org.junit.Assert.*; +import static org.junit.Assume.assumeTrue; public class TestContainerExecutor { @@ -69,4 +77,49 @@ public void testRunCommandwithPriority() throws Exception { } } + @Test (timeout = 5000) + public void testRunCommandWithNoResources() { + // Windows only test + assumeTrue(Shell.WINDOWS); + Configuration conf = new Configuration(); + String[] command = containerExecutor.getRunCommand("echo", "group1", null, null, + conf, Resource.newInstance(1024, 1)); + // Assert the cpu and memory limits are set correctly in the command + String[] expected = { Shell.WINUTILS, "task", "create", "-m", "-1", "-c", + "-1", "group1", "cmd /c " + "echo" }; + Assert.assertTrue(Arrays.equals(expected, command)); + } + + @Test (timeout = 5000) + public void testRunCommandWithMemoryOnlyResources() { + // Windows only test + assumeTrue(Shell.WINDOWS); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, "true"); + String[] command = containerExecutor.getRunCommand("echo", "group1", null, null, + conf, Resource.newInstance(1024, 1)); + // Assert the cpu and memory limits are set correctly in the command + String[] expected = { Shell.WINUTILS, "task", "create", "-m", "1024", "-c", + "-1", "group1", "cmd /c " + "echo" }; + Assert.assertTrue(Arrays.equals(expected, command)); + } + + @Test (timeout = 5000) + public void testRunCommandWithCpuAndMemoryResources() { + // Windows only test + assumeTrue(Shell.WINDOWS); + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, "true"); + conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, "true"); + String[] command = containerExecutor.getRunCommand("echo", "group1", null, null, + conf, Resource.newInstance(1024, 1)); + float yarnProcessors = NodeManagerHardwareUtils.getContainersCores( + ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf), + conf); + int cpuRate = Math.min(10000, (int) ((1 * 10000) / yarnProcessors)); + // Assert the cpu and memory limits are set correctly in the command + String[] expected = { Shell.WINUTILS, "task", "create", "-m", "1024", "-c", + String.valueOf(cpuRate), "group1", "cmd /c " + "echo" }; + Assert.assertTrue(Arrays.equals(expected, command)); + } }