From 74a61438ca01e2191b54000af73b654a2d0b8253 Mon Sep 17 00:00:00 2001 From: Daniel Templeton Date: Wed, 10 May 2017 10:45:02 -0700 Subject: [PATCH] YARN-6475. Fix some long function checkstyle issues (Contributed by Soumabrata Chakraborty via Daniel Templeton) --- .../nodemanager/LinuxContainerExecutor.java | 92 +++-- .../nodemanager/NodeStatusUpdaterImpl.java | 390 +++++++++--------- .../launcher/ContainerLaunch.java | 238 ++++++----- .../runtime/DockerLinuxContainerRuntime.java | 85 ++-- 4 files changed, 429 insertions(+), 376 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index cb1d53deb5..9a3b2d25cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -62,7 +62,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.regex.Pattern; import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntimeConstants.*; @@ -442,24 +441,11 @@ public void prepareContainer(ContainerPrepareContext ctx) throws IOException { public int launchContainer(ContainerStartContext ctx) throws IOException, ConfigurationException { Container container = ctx.getContainer(); - Path nmPrivateContainerScriptPath = ctx.getNmPrivateContainerScriptPath(); - Path nmPrivateTokensPath = ctx.getNmPrivateTokensPath(); String user = ctx.getUser(); - String appId = ctx.getAppId(); - Path containerWorkDir = ctx.getContainerWorkDir(); - List localDirs = ctx.getLocalDirs(); - List logDirs = ctx.getLogDirs(); - List filecacheDirs = ctx.getFilecacheDirs(); - List userLocalDirs = ctx.getUserLocalDirs(); - List containerLocalDirs = ctx.getContainerLocalDirs(); - List containerLogDirs = ctx.getContainerLogDirs(); - Map> localizedResources = ctx.getLocalizedResources(); verifyUsernamePattern(user); - String runAsUser = getRunAsUser(user); ContainerId containerId = container.getContainerId(); - String containerIdStr = containerId.toString(); resourcesHandler.preExecute(containerId, container.getResource()); @@ -514,39 +500,11 @@ public int launchContainer(ContainerStartContext ctx) try { Path pidFilePath = getPidFilePath(containerId); if (pidFilePath != null) { - List prefixCommands = new ArrayList<>(); - ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext - .Builder(container); - addSchedPriorityCommand(prefixCommands); - if (prefixCommands.size() > 0) { - builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS, - prefixCommands); - } + ContainerRuntimeContext runtimeContext = buildContainerRuntimeContext( + ctx, pidFilePath, resourcesOptions, tcCommandFile); - builder.setExecutionAttribute(LOCALIZED_RESOURCES, localizedResources) - .setExecutionAttribute(RUN_AS_USER, runAsUser) - .setExecutionAttribute(USER, user) - .setExecutionAttribute(APPID, appId) - .setExecutionAttribute(CONTAINER_ID_STR, containerIdStr) - .setExecutionAttribute(CONTAINER_WORK_DIR, containerWorkDir) - .setExecutionAttribute(NM_PRIVATE_CONTAINER_SCRIPT_PATH, - nmPrivateContainerScriptPath) - .setExecutionAttribute(NM_PRIVATE_TOKENS_PATH, nmPrivateTokensPath) - .setExecutionAttribute(PID_FILE_PATH, pidFilePath) - .setExecutionAttribute(LOCAL_DIRS, localDirs) - .setExecutionAttribute(LOG_DIRS, logDirs) - .setExecutionAttribute(FILECACHE_DIRS, filecacheDirs) - .setExecutionAttribute(USER_LOCAL_DIRS, userLocalDirs) - .setExecutionAttribute(CONTAINER_LOCAL_DIRS, containerLocalDirs) - .setExecutionAttribute(CONTAINER_LOG_DIRS, containerLogDirs) - .setExecutionAttribute(RESOURCES_OPTIONS, resourcesOptions); - - if (tcCommandFile != null) { - builder.setExecutionAttribute(TC_COMMAND_FILE, tcCommandFile); - } - - linuxContainerRuntime.launchContainer(builder.build()); + linuxContainerRuntime.launchContainer(runtimeContext); } else { LOG.info( "Container was marked as inactive. Returning terminated error"); @@ -617,6 +575,50 @@ public int launchContainer(ContainerStartContext ctx) return 0; } + private ContainerRuntimeContext buildContainerRuntimeContext( + ContainerStartContext ctx, Path pidFilePath, + String resourcesOptions, String tcCommandFile) { + + List prefixCommands = new ArrayList<>(); + addSchedPriorityCommand(prefixCommands); + + Container container = ctx.getContainer(); + + ContainerRuntimeContext.Builder builder = new ContainerRuntimeContext + .Builder(container); + if (prefixCommands.size() > 0) { + builder.setExecutionAttribute(CONTAINER_LAUNCH_PREFIX_COMMANDS, + prefixCommands); + } + + builder.setExecutionAttribute(LOCALIZED_RESOURCES, + ctx.getLocalizedResources()) + .setExecutionAttribute(RUN_AS_USER, getRunAsUser(ctx.getUser())) + .setExecutionAttribute(USER, ctx.getUser()) + .setExecutionAttribute(APPID, ctx.getAppId()) + .setExecutionAttribute(CONTAINER_ID_STR, + container.getContainerId().toString()) + .setExecutionAttribute(CONTAINER_WORK_DIR, ctx.getContainerWorkDir()) + .setExecutionAttribute(NM_PRIVATE_CONTAINER_SCRIPT_PATH, + ctx.getNmPrivateContainerScriptPath()) + .setExecutionAttribute(NM_PRIVATE_TOKENS_PATH, + ctx.getNmPrivateTokensPath()) + .setExecutionAttribute(PID_FILE_PATH, pidFilePath) + .setExecutionAttribute(LOCAL_DIRS, ctx.getLocalDirs()) + .setExecutionAttribute(LOG_DIRS, ctx.getLogDirs()) + .setExecutionAttribute(FILECACHE_DIRS, ctx.getFilecacheDirs()) + .setExecutionAttribute(USER_LOCAL_DIRS, ctx.getUserLocalDirs()) + .setExecutionAttribute(CONTAINER_LOCAL_DIRS, ctx.getContainerLocalDirs()) + .setExecutionAttribute(CONTAINER_LOG_DIRS, ctx.getContainerLogDirs()) + .setExecutionAttribute(RESOURCES_OPTIONS, resourcesOptions); + + if (tcCommandFile != null) { + builder.setExecutionAttribute(TC_COMMAND_FILE, tcCommandFile); + } + + return builder.build(); + } + @Override public String[] getIpAndHost(Container container) { return linuxContainerRuntime.getIpAndHost(container); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index dd5b279cf9..00073d85aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -761,200 +761,7 @@ private static Map parseCredentials( protected void startStatusUpdater() { - statusUpdaterRunnable = new Runnable() { - @Override - @SuppressWarnings("unchecked") - public void run() { - int lastHeartbeatID = 0; - while (!isStopped) { - // Send heartbeat - try { - NodeHeartbeatResponse response = null; - Set nodeLabelsForHeartbeat = - nodeLabelsHandler.getNodeLabelsForHeartbeat(); - NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); - NodeHeartbeatRequest request = - NodeHeartbeatRequest.newInstance(nodeStatus, - NodeStatusUpdaterImpl.this.context - .getContainerTokenSecretManager().getCurrentKey(), - NodeStatusUpdaterImpl.this.context - .getNMTokenSecretManager().getCurrentKey(), - nodeLabelsForHeartbeat, - NodeStatusUpdaterImpl.this.context - .getRegisteredCollectors()); - - if (logAggregationEnabled) { - // pull log aggregation status for application running in this NM - List logAggregationReports = - getLogAggregationReportsForApps(context - .getLogAggregationStatusForApps()); - if (logAggregationReports != null - && !logAggregationReports.isEmpty()) { - request.setLogAggregationReportsForApps(logAggregationReports); - } - } - - response = resourceTracker.nodeHeartbeat(request); - //get next heartbeat interval from response - nextHeartBeatInterval = response.getNextHeartBeatInterval(); - updateMasterKeys(response); - - if (!handleShutdownOrResyncCommand(response)) { - nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels( - response); - - // Explicitly put this method after checking the resync - // response. We - // don't want to remove the completed containers before resync - // because these completed containers will be reported back to RM - // when NM re-registers with RM. - // Only remove the cleanedup containers that are acked - removeOrTrackCompletedContainersFromContext(response - .getContainersToBeRemovedFromNM()); - - logAggregationReportForAppsTempList.clear(); - lastHeartbeatID = response.getResponseId(); - List containersToCleanup = response - .getContainersToCleanup(); - if (!containersToCleanup.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrCompletedContainersEvent(containersToCleanup, - CMgrCompletedContainersEvent.Reason - .BY_RESOURCEMANAGER)); - } - List appsToCleanup = - response.getApplicationsToCleanup(); - //Only start tracking for keepAlive on FINISH_APP - trackAppsForKeepAlive(appsToCleanup); - if (!appsToCleanup.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrCompletedAppsEvent(appsToCleanup, - CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); - } - Map systemCredentials = - response.getSystemCredentialsForApps(); - if (systemCredentials != null && !systemCredentials.isEmpty()) { - ((NMContext) context).setSystemCrendentialsForApps( - parseCredentials(systemCredentials)); - } - List - containersToDecrease = response.getContainersToDecrease(); - if (!containersToDecrease.isEmpty()) { - dispatcher.getEventHandler().handle( - new CMgrDecreaseContainersResourceEvent( - containersToDecrease) - ); - } - - // SignalContainer request originally comes from end users via - // ClientRMProtocol's SignalContainer. Forward the request to - // ContainerManager which will dispatch the event to - // ContainerLauncher. - List containersToSignal = response - .getContainersToSignalList(); - if (containersToSignal.size() != 0) { - dispatcher.getEventHandler().handle( - new CMgrSignalContainersEvent(containersToSignal)); - } - - // Update QueuingLimits if ContainerManager supports queuing - ContainerQueuingLimit queuingLimit = - response.getContainerQueuingLimit(); - if (queuingLimit != null) { - context.getContainerManager().updateQueuingLimit(queuingLimit); - } - } - // Handling node resource update case. - Resource newResource = response.getResource(); - if (newResource != null) { - updateNMResource(newResource); - if (LOG.isDebugEnabled()) { - LOG.debug("Node's resource is updated to " + - newResource.toString()); - } - } - if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) { - updateTimelineClientsAddress(response); - } - - } catch (ConnectException e) { - //catch and throw the exception if tried MAX wait time to connect RM - dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); - // failed to connect to RM. - failedToConnect = true; - throw new YarnRuntimeException(e); - } catch (Throwable e) { - - // TODO Better error handling. Thread can die with the rest of the - // NM still running. - LOG.error("Caught exception in status-updater", e); - } finally { - synchronized (heartbeatMonitor) { - nextHeartBeatInterval = nextHeartBeatInterval <= 0 ? - YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS : - nextHeartBeatInterval; - try { - heartbeatMonitor.wait(nextHeartBeatInterval); - } catch (InterruptedException e) { - // Do Nothing - } - } - } - } - } - - private void updateTimelineClientsAddress( - NodeHeartbeatResponse response) { - Map knownCollectorsMap = - response.getAppCollectorsMap(); - if (knownCollectorsMap == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("No collectors to update RM"); - } - } else { - Set> rmKnownCollectors = - knownCollectorsMap.entrySet(); - for (Map.Entry entry : rmKnownCollectors) { - ApplicationId appId = entry.getKey(); - String collectorAddr = entry.getValue(); - - // Only handle applications running on local node. - // Not include apps with timeline collectors running in local - Application application = context.getApplications().get(appId); - // TODO this logic could be problematic if the collector address - // gets updated due to NM restart or collector service failure - if (application != null && - !context.getRegisteredCollectors().containsKey(appId)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sync a new collector address: " + collectorAddr + - " for application: " + appId + " from RM."); - } - NMTimelinePublisher nmTimelinePublisher = - context.getNMTimelinePublisher(); - if (nmTimelinePublisher != null) { - nmTimelinePublisher.setTimelineServiceAddress( - application.getAppId(), collectorAddr); - } - } - } - } - } - - private void updateMasterKeys(NodeHeartbeatResponse response) { - // See if the master-key has rolled over - MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); - if (updatedMasterKey != null) { - // Will be non-null only on roll-over on RM side - context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey); - } - - updatedMasterKey = response.getNMTokenMasterKey(); - if (updatedMasterKey != null) { - context.getNMTokenSecretManager().setMasterKey(updatedMasterKey); - } - } - }; + statusUpdaterRunnable = new StatusUpdaterRunnable(); statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); statusUpdater.start(); @@ -1215,4 +1022,199 @@ public void verifyRMHeartbeatResponseForNodeLabels( } } } + + private class StatusUpdaterRunnable implements Runnable { + @Override + @SuppressWarnings("unchecked") + public void run() { + int lastHeartbeatID = 0; + while (!isStopped) { + // Send heartbeat + try { + NodeHeartbeatResponse response = null; + Set nodeLabelsForHeartbeat = + nodeLabelsHandler.getNodeLabelsForHeartbeat(); + NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); + NodeHeartbeatRequest request = + NodeHeartbeatRequest.newInstance(nodeStatus, + NodeStatusUpdaterImpl.this.context + .getContainerTokenSecretManager().getCurrentKey(), + NodeStatusUpdaterImpl.this.context + .getNMTokenSecretManager().getCurrentKey(), + nodeLabelsForHeartbeat, + NodeStatusUpdaterImpl.this.context + .getRegisteredCollectors()); + + if (logAggregationEnabled) { + // pull log aggregation status for application running in this NM + List logAggregationReports = + getLogAggregationReportsForApps(context + .getLogAggregationStatusForApps()); + if (logAggregationReports != null + && !logAggregationReports.isEmpty()) { + request.setLogAggregationReportsForApps(logAggregationReports); + } + } + + response = resourceTracker.nodeHeartbeat(request); + //get next heartbeat interval from response + nextHeartBeatInterval = response.getNextHeartBeatInterval(); + updateMasterKeys(response); + + if (!handleShutdownOrResyncCommand(response)) { + nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels( + response); + + // Explicitly put this method after checking the resync + // response. We + // don't want to remove the completed containers before resync + // because these completed containers will be reported back to RM + // when NM re-registers with RM. + // Only remove the cleanedup containers that are acked + removeOrTrackCompletedContainersFromContext(response + .getContainersToBeRemovedFromNM()); + + logAggregationReportForAppsTempList.clear(); + lastHeartbeatID = response.getResponseId(); + List containersToCleanup = response + .getContainersToCleanup(); + if (!containersToCleanup.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrCompletedContainersEvent(containersToCleanup, + CMgrCompletedContainersEvent.Reason + .BY_RESOURCEMANAGER)); + } + List appsToCleanup = + response.getApplicationsToCleanup(); + //Only start tracking for keepAlive on FINISH_APP + trackAppsForKeepAlive(appsToCleanup); + if (!appsToCleanup.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrCompletedAppsEvent(appsToCleanup, + CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER)); + } + Map systemCredentials = + response.getSystemCredentialsForApps(); + if (systemCredentials != null && !systemCredentials.isEmpty()) { + ((NMContext) context).setSystemCrendentialsForApps( + parseCredentials(systemCredentials)); + } + List + containersToDecrease = response.getContainersToDecrease(); + if (!containersToDecrease.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrDecreaseContainersResourceEvent( + containersToDecrease) + ); + } + + // SignalContainer request originally comes from end users via + // ClientRMProtocol's SignalContainer. Forward the request to + // ContainerManager which will dispatch the event to + // ContainerLauncher. + List containersToSignal = response + .getContainersToSignalList(); + if (!containersToSignal.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrSignalContainersEvent(containersToSignal)); + } + + // Update QueuingLimits if ContainerManager supports queuing + ContainerQueuingLimit queuingLimit = + response.getContainerQueuingLimit(); + if (queuingLimit != null) { + context.getContainerManager().updateQueuingLimit(queuingLimit); + } + } + // Handling node resource update case. + Resource newResource = response.getResource(); + if (newResource != null) { + updateNMResource(newResource); + if (LOG.isDebugEnabled()) { + LOG.debug("Node's resource is updated to " + + newResource.toString()); + } + } + if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) { + updateTimelineClientsAddress(response); + } + + } catch (ConnectException e) { + //catch and throw the exception if tried MAX wait time to connect RM + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); + // failed to connect to RM. + failedToConnect = true; + throw new YarnRuntimeException(e); + } catch (Exception e) { + + // TODO Better error handling. Thread can die with the rest of the + // NM still running. + LOG.error("Caught exception in status-updater", e); + } finally { + synchronized (heartbeatMonitor) { + nextHeartBeatInterval = nextHeartBeatInterval <= 0 ? + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS : + nextHeartBeatInterval; + try { + heartbeatMonitor.wait(nextHeartBeatInterval); + } catch (InterruptedException e) { + // Do Nothing + } + } + } + } + } + + private void updateTimelineClientsAddress( + NodeHeartbeatResponse response) { + Map knownCollectorsMap = + response.getAppCollectorsMap(); + if (knownCollectorsMap == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No collectors to update RM"); + } + } else { + Set> rmKnownCollectors = + knownCollectorsMap.entrySet(); + for (Map.Entry entry : rmKnownCollectors) { + ApplicationId appId = entry.getKey(); + String collectorAddr = entry.getValue(); + + // Only handle applications running on local node. + // Not include apps with timeline collectors running in local + Application application = context.getApplications().get(appId); + // TODO this logic could be problematic if the collector address + // gets updated due to NM restart or collector service failure + if (application != null && + !context.getRegisteredCollectors().containsKey(appId)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sync a new collector address: " + collectorAddr + + " for application: " + appId + " from RM."); + } + NMTimelinePublisher nmTimelinePublisher = + context.getNMTimelinePublisher(); + if (nmTimelinePublisher != null) { + nmTimelinePublisher.setTimelineServiceAddress( + application.getAppId(), collectorAddr); + } + } + } + } + } + + private void updateMasterKeys(NodeHeartbeatResponse response) { + // See if the master-key has rolled over + MasterKey updatedMasterKey = response.getContainerTokenMasterKey(); + if (updatedMasterKey != null) { + // Will be non-null only on roll-over on RM side + context.getContainerTokenSecretManager().setMasterKey(updatedMasterKey); + } + + updatedMasterKey = response.getNMTokenMasterKey(); + if (updatedMasterKey != null) { + context.getNMTokenSecretManager().setMasterKey(updatedMasterKey); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 1fcccdece6..0b599a873f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -213,12 +213,7 @@ public Integer call() { DataOutputStream tokensOutStream = null; // Select the working directory for the container - Path containerWorkDir = - dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE - + Path.SEPARATOR + user + Path.SEPARATOR - + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr - + Path.SEPARATOR + containerIdStr, - LocalDirAllocator.SIZE_UNKNOWN, false); + Path containerWorkDir = deriveContainerWorkDir(); recordContainerWorkDir(containerID, containerWorkDir.toString()); String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); @@ -259,12 +254,8 @@ public Integer call() { sanitizeEnv(environment, containerWorkDir, appDirs, userLocalDirs, containerLogDirs, localResources, nmPrivateClasspathJarDir); - exec.prepareContainer(new ContainerPrepareContext.Builder() - .setContainer(container) - .setLocalizedResources(localResources) - .setUser(user) - .setContainerLocalDirs(containerLocalDirs) - .setCommands(launchContext.getCommands()).build()); + prepareContainer(localResources, containerLocalDirs); + // Write out the environment exec.writeLaunchEnv(containerScriptOutStream, environment, localResources, launchContext.getCommands(), @@ -317,6 +308,39 @@ public Integer call() { return ret; } + private Path deriveContainerWorkDir() throws IOException { + + final String containerWorkDirPath = + ContainerLocalizer.USERCACHE + + Path.SEPARATOR + + container.getUser() + + Path.SEPARATOR + + ContainerLocalizer.APPCACHE + + Path.SEPARATOR + + app.getAppId().toString() + + Path.SEPARATOR + + container.getContainerId().toString(); + + final Path containerWorkDir = + dirsHandler.getLocalPathForWrite( + containerWorkDirPath, + LocalDirAllocator.SIZE_UNKNOWN, false); + + return containerWorkDir; + } + + private void prepareContainer(Map> localResources, + List containerLocalDirs) throws IOException { + + exec.prepareContainer(new ContainerPrepareContext.Builder() + .setContainer(container) + .setLocalizedResources(localResources) + .setUser(container.getUser()) + .setContainerLocalDirs(containerLocalDirs) + .setCommands(container.getLaunchContext().getCommands()) + .build()); + } + @SuppressWarnings("unchecked") protected boolean validateContainerState() { // CONTAINER_KILLED_ON_REQUEST should not be missed if the container @@ -1116,98 +1140,9 @@ public void sanitizeEnv(Map environment, Path pwd, // TODO: Remove Windows check and use this approach on all platforms after // additional testing. See YARN-358. if (Shell.WINDOWS) { - - String inputClassPath = environment.get(Environment.CLASSPATH.name()); - if (inputClassPath != null && !inputClassPath.isEmpty()) { - - //On non-windows, localized resources - //from distcache are available via the classpath as they were placed - //there but on windows they are not available when the classpath - //jar is created and so they "are lost" and have to be explicitly - //added to the classpath instead. This also means that their position - //is lost relative to other non-distcache classpath entries which will - //break things like mapreduce.job.user.classpath.first. An environment - //variable can be set to indicate that distcache entries should come - //first - - boolean preferLocalizedJars = Boolean.parseBoolean( - environment.get(Environment.CLASSPATH_PREPEND_DISTCACHE.name()) - ); - - boolean needsSeparator = false; - StringBuilder newClassPath = new StringBuilder(); - if (!preferLocalizedJars) { - newClassPath.append(inputClassPath); - needsSeparator = true; - } - - // Localized resources do not exist at the desired paths yet, because the - // container launch script has not run to create symlinks yet. This - // means that FileUtil.createJarWithClassPath can't automatically expand - // wildcards to separate classpath entries for each file in the manifest. - // To resolve this, append classpath entries explicitly for each - // resource. - for (Map.Entry> entry : resources.entrySet()) { - boolean targetIsDirectory = new File(entry.getKey().toUri().getPath()) - .isDirectory(); - - for (String linkName : entry.getValue()) { - // Append resource. - if (needsSeparator) { - newClassPath.append(File.pathSeparator); - } else { - needsSeparator = true; - } - newClassPath.append(pwd.toString()) - .append(Path.SEPARATOR).append(linkName); - - // FileUtil.createJarWithClassPath must use File.toURI to convert - // each file to a URI to write into the manifest's classpath. For - // directories, the classpath must have a trailing '/', but - // File.toURI only appends the trailing '/' if it is a directory that - // already exists. To resolve this, add the classpath entries with - // explicit trailing '/' here for any localized resource that targets - // a directory. Then, FileUtil.createJarWithClassPath will guarantee - // that the resulting entry in the manifest's classpath will have a - // trailing '/', and thus refer to a directory instead of a file. - if (targetIsDirectory) { - newClassPath.append(Path.SEPARATOR); - } - } - } - if (preferLocalizedJars) { - if (needsSeparator) { - newClassPath.append(File.pathSeparator); - } - newClassPath.append(inputClassPath); - } - - // When the container launches, it takes the parent process's environment - // and then adds/overwrites with the entries from the container launch - // context. Do the same thing here for correct substitution of - // environment variables in the classpath jar manifest. - Map mergedEnv = new HashMap( - System.getenv()); - mergedEnv.putAll(environment); - - // this is hacky and temporary - it's to preserve the windows secure - // behavior but enable non-secure windows to properly build the class - // path for access to job.jar/lib/xyz and friends (see YARN-2803) - Path jarDir; - if (exec instanceof WindowsSecureContainerExecutor) { - jarDir = nmPrivateClasspathJarDir; - } else { - jarDir = pwd; - } - String[] jarCp = FileUtil.createJarWithClassPath( - newClassPath.toString(), jarDir, pwd, mergedEnv); - // In a secure cluster the classpath jar must be localized to grant access - Path localizedClassPathJar = exec.localizeClasspathJar( - new Path(jarCp[0]), pwd, container.getUser()); - String replacementClassPath = localizedClassPathJar.toString() + jarCp[1]; - environment.put(Environment.CLASSPATH.name(), replacementClassPath); - } + sanitizeWindowsEnv(environment, pwd, + resources, nmPrivateClasspathJarDir); } // put AuxiliaryService data to environment for (Map.Entry meta : containerManager @@ -1217,6 +1152,103 @@ public void sanitizeEnv(Map environment, Path pwd, } } + private void sanitizeWindowsEnv(Map environment, Path pwd, + Map> resources, Path nmPrivateClasspathJarDir) + throws IOException { + + String inputClassPath = environment.get(Environment.CLASSPATH.name()); + + if (inputClassPath != null && !inputClassPath.isEmpty()) { + + //On non-windows, localized resources + //from distcache are available via the classpath as they were placed + //there but on windows they are not available when the classpath + //jar is created and so they "are lost" and have to be explicitly + //added to the classpath instead. This also means that their position + //is lost relative to other non-distcache classpath entries which will + //break things like mapreduce.job.user.classpath.first. An environment + //variable can be set to indicate that distcache entries should come + //first + + boolean preferLocalizedJars = Boolean.parseBoolean( + environment.get(Environment.CLASSPATH_PREPEND_DISTCACHE.name()) + ); + + boolean needsSeparator = false; + StringBuilder newClassPath = new StringBuilder(); + if (!preferLocalizedJars) { + newClassPath.append(inputClassPath); + needsSeparator = true; + } + + // Localized resources do not exist at the desired paths yet, because the + // container launch script has not run to create symlinks yet. This + // means that FileUtil.createJarWithClassPath can't automatically expand + // wildcards to separate classpath entries for each file in the manifest. + // To resolve this, append classpath entries explicitly for each + // resource. + for (Map.Entry> entry : resources.entrySet()) { + boolean targetIsDirectory = new File(entry.getKey().toUri().getPath()) + .isDirectory(); + + for (String linkName : entry.getValue()) { + // Append resource. + if (needsSeparator) { + newClassPath.append(File.pathSeparator); + } else { + needsSeparator = true; + } + newClassPath.append(pwd.toString()) + .append(Path.SEPARATOR).append(linkName); + + // FileUtil.createJarWithClassPath must use File.toURI to convert + // each file to a URI to write into the manifest's classpath. For + // directories, the classpath must have a trailing '/', but + // File.toURI only appends the trailing '/' if it is a directory that + // already exists. To resolve this, add the classpath entries with + // explicit trailing '/' here for any localized resource that targets + // a directory. Then, FileUtil.createJarWithClassPath will guarantee + // that the resulting entry in the manifest's classpath will have a + // trailing '/', and thus refer to a directory instead of a file. + if (targetIsDirectory) { + newClassPath.append(Path.SEPARATOR); + } + } + } + if (preferLocalizedJars) { + if (needsSeparator) { + newClassPath.append(File.pathSeparator); + } + newClassPath.append(inputClassPath); + } + + // When the container launches, it takes the parent process's environment + // and then adds/overwrites with the entries from the container launch + // context. Do the same thing here for correct substitution of + // environment variables in the classpath jar manifest. + Map mergedEnv = new HashMap( + System.getenv()); + mergedEnv.putAll(environment); + + // this is hacky and temporary - it's to preserve the windows secure + // behavior but enable non-secure windows to properly build the class + // path for access to job.jar/lib/xyz and friends (see YARN-2803) + Path jarDir; + if (exec instanceof WindowsSecureContainerExecutor) { + jarDir = nmPrivateClasspathJarDir; + } else { + jarDir = pwd; + } + String[] jarCp = FileUtil.createJarWithClassPath( + newClassPath.toString(), jarDir, pwd, mergedEnv); + // In a secure cluster the classpath jar must be localized to grant access + Path localizedClassPathJar = exec.localizeClasspathJar( + new Path(jarCp[0]), pwd, container.getUser()); + String replacementClassPath = localizedClassPathJar.toString() + jarCp[1]; + environment.put(Environment.CLASSPATH.name(), replacementClassPath); + } + } + public static String getExitCodeFile(String pidFile) { return pidFile + EXIT_CODE_FILE_SUFFIX; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java index b70a4e19a6..ed81331679 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/runtime/DockerLinuxContainerRuntime.java @@ -424,10 +424,6 @@ public void launchContainer(ContainerRuntimeContext ctx) //List -> stored as List -> fetched/converted to List //we can't do better here thanks to type-erasure @SuppressWarnings("unchecked") - List localDirs = ctx.getExecutionAttribute(LOCAL_DIRS); - @SuppressWarnings("unchecked") - List logDirs = ctx.getExecutionAttribute(LOG_DIRS); - @SuppressWarnings("unchecked") List filecacheDirs = ctx.getExecutionAttribute(FILECACHE_DIRS); @SuppressWarnings("unchecked") List containerLocalDirs = ctx.getExecutionAttribute( @@ -489,9 +485,6 @@ public void launchContainer(ContainerRuntimeContext ctx) addCGroupParentIfRequired(resourcesOpts, containerIdStr, runCommand); - Path nmPrivateContainerScriptPath = ctx.getExecutionAttribute( - NM_PRIVATE_CONTAINER_SCRIPT_PATH); - String disableOverride = environment.get( ENV_DOCKER_CONTAINER_RUN_OVERRIDE_DISABLE); @@ -511,33 +504,8 @@ public void launchContainer(ContainerRuntimeContext ctx) String commandFile = dockerClient.writeCommandToTempFile(runCommand, containerIdStr); - PrivilegedOperation launchOp = new PrivilegedOperation( - PrivilegedOperation.OperationType.LAUNCH_DOCKER_CONTAINER); - - launchOp.appendArgs(runAsUser, ctx.getExecutionAttribute(USER), - Integer.toString(PrivilegedOperation - .RunAsUserCommand.LAUNCH_DOCKER_CONTAINER.getValue()), - ctx.getExecutionAttribute(APPID), - containerIdStr, containerWorkDir.toString(), - nmPrivateContainerScriptPath.toUri().getPath(), - ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH).toUri().getPath(), - ctx.getExecutionAttribute(PID_FILE_PATH).toString(), - StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR, - localDirs), - StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR, - logDirs), - commandFile, - resourcesOpts); - - String tcCommandFile = ctx.getExecutionAttribute(TC_COMMAND_FILE); - - if (tcCommandFile != null) { - launchOp.appendArgs(tcCommandFile); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Launching container with cmd: " + runCommand - .getCommandWithArguments()); - } + PrivilegedOperation launchOp = buildLaunchOp(ctx, + commandFile, runCommand); try { privilegedOperationExecutor.executePrivilegedOperation(null, @@ -635,4 +603,53 @@ public String[] getIpAndHost(Container container) { } return null; } + + + + private PrivilegedOperation buildLaunchOp(ContainerRuntimeContext ctx, + String commandFile, DockerRunCommand runCommand) { + + String runAsUser = ctx.getExecutionAttribute(RUN_AS_USER); + String containerIdStr = ctx.getContainer().getContainerId().toString(); + Path nmPrivateContainerScriptPath = ctx.getExecutionAttribute( + NM_PRIVATE_CONTAINER_SCRIPT_PATH); + Path containerWorkDir = ctx.getExecutionAttribute(CONTAINER_WORK_DIR); + //we can't do better here thanks to type-erasure + @SuppressWarnings("unchecked") + List localDirs = ctx.getExecutionAttribute(LOCAL_DIRS); + @SuppressWarnings("unchecked") + List logDirs = ctx.getExecutionAttribute(LOG_DIRS); + String resourcesOpts = ctx.getExecutionAttribute(RESOURCES_OPTIONS); + + PrivilegedOperation launchOp = new PrivilegedOperation( + PrivilegedOperation.OperationType.LAUNCH_DOCKER_CONTAINER); + + launchOp.appendArgs(runAsUser, ctx.getExecutionAttribute(USER), + Integer.toString(PrivilegedOperation + .RunAsUserCommand.LAUNCH_DOCKER_CONTAINER.getValue()), + ctx.getExecutionAttribute(APPID), + containerIdStr, + containerWorkDir.toString(), + nmPrivateContainerScriptPath.toUri().getPath(), + ctx.getExecutionAttribute(NM_PRIVATE_TOKENS_PATH).toUri().getPath(), + ctx.getExecutionAttribute(PID_FILE_PATH).toString(), + StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR, + localDirs), + StringUtils.join(PrivilegedOperation.LINUX_FILE_PATH_SEPARATOR, + logDirs), + commandFile, + resourcesOpts); + + String tcCommandFile = ctx.getExecutionAttribute(TC_COMMAND_FILE); + + if (tcCommandFile != null) { + launchOp.appendArgs(tcCommandFile); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Launching container with cmd: " + runCommand + .getCommandWithArguments()); + } + + return launchOp; + } }