diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index a04f57b270..3b5896177e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -24,6 +24,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.StringReader; +import java.io.UncheckedIOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.URI; import java.net.URISyntaxException; @@ -57,6 +58,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; @@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -232,6 +235,9 @@ public enum DSEntity { @VisibleForTesting protected ApplicationAttemptId appAttemptID; + private ApplicationId appId; + private String appName; + // TODO // For status update for clients - yet to be implemented // Hostname of the container @@ -316,6 +322,8 @@ public enum DSEntity { private int containrRetryInterval = 0; private long containerFailuresValidityInterval = -1; + private List localizableFiles = new ArrayList<>(); + // Timeline domain ID private String domainId = null; @@ -447,6 +455,8 @@ public ApplicationMaster() { */ public boolean init(String[] args) throws ParseException, IOException { Options opts = new Options(); + opts.addOption("appname", true, + "Application Name. Default value - DistributedShell"); opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes"); opts.addOption("shell_env", true, @@ -493,6 +503,7 @@ public boolean init(String[] args) throws ParseException, IOException { + " application attempt fails and these containers will be " + "retrieved by" + " the new application attempt "); + opts.addOption("localized_files", true, "List of localized files"); opts.addOption("help", false, "Print usage"); CommandLine cliParser = new GnuParser().parse(opts, args); @@ -513,6 +524,8 @@ public boolean init(String[] args) throws ParseException, IOException { } } + appName = cliParser.getOptionValue("appname", "DistributedShell"); + if (cliParser.hasOption("help")) { printUsage(opts); return false; @@ -553,6 +566,7 @@ public boolean init(String[] args) throws ParseException, IOException { ContainerId containerId = ContainerId.fromString(envs .get(Environment.CONTAINER_ID.name())); appAttemptID = containerId.getApplicationAttemptId(); + appId = appAttemptID.getApplicationId(); } if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) { @@ -698,6 +712,16 @@ public boolean init(String[] args) throws ParseException, IOException { LOG.warn("Timeline service is not enabled"); } + if (cliParser.hasOption("localized_files")) { + String localizedFilesArg = cliParser.getOptionValue("localized_files"); + if (localizedFilesArg.contains(",")) { + String[] files = localizedFilesArg.split(","); + localizableFiles = Arrays.asList(files); + } else { + localizableFiles.add(localizedFilesArg); + } + } + return true; } @@ -1002,6 +1026,11 @@ protected boolean finish() { return success; } + public static String getRelativePath(String appName, + String appId, String fileDstPath) { + return appName + "/" + appId + "/" + fileDstPath; + } + @VisibleForTesting class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler { @SuppressWarnings("unchecked") @@ -1422,6 +1451,35 @@ public void run() { shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; } + // Set up localization for the container which runs the command + if (localizableFiles.size() > 0) { + FileSystem fs; + try { + fs = FileSystem.get(conf); + } catch (IOException e) { + throw new UncheckedIOException("Cannot get FileSystem", e); + } + + localizableFiles.stream().forEach(fileName -> { + try { + String relativePath = + getRelativePath(appName, appId.toString(), fileName); + Path dst = + new Path(fs.getHomeDirectory(), relativePath); + FileStatus fileStatus = fs.getFileStatus(dst); + LocalResource localRes = LocalResource.newInstance( + URL.fromURI(dst.toUri()), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + fileStatus.getLen(), fileStatus.getModificationTime()); + LOG.info("Setting up file for localization: " + dst); + localResources.put(fileName, localRes); + } catch (IOException e) { + throw new UncheckedIOException( + "Error during localization setup", e); + } + }); + } + // Set the necessary command to execute on the allocated container Vector vargs = new Vector(5); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index e8b69fe186..369d94b806 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.applications.distributedshell; +import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -32,6 +34,7 @@ import java.util.Base64; import com.google.common.base.Joiner; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -235,6 +238,8 @@ public class Client { // Application tags private Set applicationTags = new HashSet<>(); + private List filesToLocalize = new ArrayList<>(); + // Command line options private Options opts; @@ -392,6 +397,8 @@ public Client(Configuration conf) throws Exception { + " The \"num_containers\" option will be ignored. All requested" + " containers will be of type GUARANTEED" ); opts.addOption("application_tags", true, "Application tags."); + opts.addOption("localize_files", true, "List of files, separated by comma" + + " to be localized for the command"); } /** @@ -621,6 +628,17 @@ public boolean init(String[] args) throws ParseException { this.applicationTags.add(appTag.trim()); } } + + if (cliParser.hasOption("localize_files")) { + String filesStr = cliParser.getOptionValue("localize_files"); + if (filesStr.contains(",")) { + String[] files = filesStr.split(","); + filesToLocalize = Arrays.asList(files); + } else { + filesToLocalize.add(filesStr); + } + } + return true; } @@ -714,7 +732,7 @@ public boolean run() throws IOException, YarnException { + ", specified=" + amMemory + ", max=" + maxMem); amMemory = maxMem; - } + } int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores(); LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores); @@ -776,7 +794,42 @@ public boolean run() throws IOException, YarnException { if (!log4jPropFile.isEmpty()) { addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), localResources, null); - } + } + + // Process local files for localization + // Here we just upload the files, the AM + // will set up localization later. + StringBuilder localizableFiles = new StringBuilder(); + filesToLocalize.stream().forEach(path -> { + File f = new File(path); + + if (!f.exists()) { + throw new UncheckedIOException( + new IOException(path + " does not exist")); + } + + if (!f.canRead()) { + throw new UncheckedIOException( + new IOException(path + " cannot be read")); + } + + if (f.isDirectory()) { + throw new UncheckedIOException( + new IOException(path + " is a directory")); + } + + try { + String fileName = f.getName(); + uploadFile(fs, path, fileName, appId.toString()); + if (localizableFiles.length() == 0) { + localizableFiles.append(fileName); + } else { + localizableFiles.append(",").append(fileName); + } + } catch (IOException e) { + throw new UncheckedIOException("Cannot upload file: " + path, e); + } + }); // The shell script has to be made available on the final container(s) // where it will be executed. @@ -790,7 +843,9 @@ public boolean run() throws IOException, YarnException { if (!shellScriptPath.isEmpty()) { Path shellSrc = new Path(shellScriptPath); String shellPathSuffix = - appName + "/" + appId.toString() + "/" + SCRIPT_PATH; + ApplicationMaster.getRelativePath(appName, + appId.toString(), + SCRIPT_PATH); Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix); fs.copyFromLocalFile(false, true, shellSrc, shellDst); @@ -908,6 +963,10 @@ public boolean run() throws IOException, YarnException { if (debugFlag) { vargs.add("--debug"); } + if (localizableFiles.length() > 0) { + vargs.add("--localized_files " + localizableFiles.toString()); + } + vargs.add("--appname " + appName); vargs.addAll(containerRetryOptions); @@ -1088,7 +1147,7 @@ private void addToLocalResources(FileSystem fs, String fileSrcPath, String fileDstPath, String appId, Map localResources, String resources) throws IOException { String suffix = - appName + "/" + appId + "/" + fileDstPath; + ApplicationMaster.getRelativePath(appName, appId, fileDstPath); Path dst = new Path(fs.getHomeDirectory(), suffix); if (fileSrcPath == null) { @@ -1112,6 +1171,16 @@ private void addToLocalResources(FileSystem fs, String fileSrcPath, localResources.put(fileDstPath, scRsrc); } + private void uploadFile(FileSystem fs, String fileSrcPath, + String fileDstPath, String appId) throws IOException { + String relativePath = + ApplicationMaster.getRelativePath(appName, appId, fileDstPath); + Path dst = + new Path(fs.getHomeDirectory(), relativePath); + LOG.info("Uploading file: " + fileSrcPath + " to " + dst); + fs.copyFromLocalFile(new Path(fileSrcPath), dst); + } + private void prepareTimelineDomain() { TimelineClient timelineClient = null; if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 49d8f3d9db..80c1e208c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.yarn.applications.distributedshell; - +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; +import java.io.UncheckedIOException; import java.net.InetAddress; import java.net.URI; import java.net.URL; @@ -1630,4 +1631,67 @@ public void testDistributedShellAMResourcesWithUnknownResource() client.init(args); client.run(); } + + @Test + public void testDistributedShellWithSingleFileLocalization() + throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "type" : "cat", + "--localize_files", + "./src/test/resources/a.txt", + "--shell_args", + "a.txt" + }; + + Client client = new Client(new Configuration(yarnCluster.getConfig())); + client.init(args); + assertTrue("Client exited with an error", client.run()); + } + + @Test + public void testDistributedShellWithMultiFileLocalization() + throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "type" : "cat", + "--localize_files", + "./src/test/resources/a.txt,./src/test/resources/b.txt", + "--shell_args", + "a.txt b.txt" + }; + + Client client = new Client(new Configuration(yarnCluster.getConfig())); + client.init(args); + assertTrue("Client exited with an error", client.run()); + } + + @Test(expected=UncheckedIOException.class) + public void testDistributedShellWithNonExistentFileLocalization() + throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + Shell.WINDOWS ? "type" : "cat", + "--localize_files", + "/non/existing/path/file.txt", + "--shell_args", + "file.txt" + }; + + Client client = new Client(new Configuration(yarnCluster.getConfig())); + client.init(args); + client.run(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/a.txt b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/a.txt new file mode 100644 index 0000000000..231a9db376 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/a.txt @@ -0,0 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Sample file for testing + +aaaaaaaaaaaaaaaaaa \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/b.txt b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/b.txt new file mode 100644 index 0000000000..8646431456 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/resources/b.txt @@ -0,0 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Sample file for testing + +bbbbbbbbbbbbbbbbbbbb \ No newline at end of file