From 76b653a36738a4f420f14c53c7a0a4006dbf066e Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 31 Jan 2012 02:23:38 +0000 Subject: [PATCH] MAPREDUCE-3716. Fixing YARN+MR to allow MR jobs to be able to use java.io.File.createTempFile to create temporary files as part of their tasks. Contributed by Jonathan Eagles. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1238136 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../hadoop/mapred/MapReduceChildJVM.java | 4 +- .../hadoop/mapred/TestMiniMRChildTask.java | 193 +++++++++--------- .../hadoop/yarn/conf/YarnConfiguration.java | 3 + .../nodemanager/DefaultContainerExecutor.java | 5 + .../impl/container-executor.c | 28 ++- .../impl/container-executor.h | 1 + .../src/java/mapred-default.xml | 12 -- 8 files changed, 139 insertions(+), 111 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d32baf12d7..03ebbb4d29 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -611,6 +611,10 @@ Release 0.23.1 - Unreleased MAPREDUCE-3703. ResourceManager should provide node lists in JMX output. (Eric Payne via mahadev) + MAPREDUCE-3716. Fixing YARN+MR to allow MR jobs to be able to use + java.io.File.createTempFile to create temporary files as part of their + tasks. (Jonathan Eagles via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java index 6c098259a5..375c69fd0c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.Apps; @SuppressWarnings("deprecation") @@ -201,7 +202,8 @@ public class MapReduceChildJVM { vargs.add(javaOptsSplit[i]); } - String childTmpDir = Environment.PWD.$() + Path.SEPARATOR + "tmp"; + Path childTmpDir = new Path(Environment.PWD.$(), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); vargs.add("-Djava.io.tmpdir=" + childTmpDir); // Setup the log4j prop diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRChildTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRChildTask.java index 7ae63c8c0c..49c899cf1d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRChildTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMiniMRChildTask.java @@ -17,10 +17,17 @@ */ package org.apache.hadoop.mapred; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.*; import java.util.Iterator; -import junit.framework.TestCase; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,20 +36,21 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.Ignore; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; /** * Class to test mapred task's * - temp directory * - child env */ -@Ignore -public class TestMiniMRChildTask extends TestCase { +public class TestMiniMRChildTask { private static final Log LOG = LogFactory.getLog(TestMiniMRChildTask.class.getName()); @@ -51,10 +59,24 @@ public class TestMiniMRChildTask extends TestCase { private final static String MAP_OPTS_VAL = "-Xmx200m"; private final static String REDUCE_OPTS_VAL = "-Xmx300m"; - private MiniMRCluster mr; - private MiniDFSCluster dfs; - private FileSystem fileSys; + private static MiniMRYarnCluster mr; + private static MiniDFSCluster dfs; + private static FileSystem fileSys; + private static Configuration conf = new Configuration(); + private static FileSystem localFs; + static { + try { + localFs = FileSystem.getLocal(conf); + } catch (IOException io) { + throw new RuntimeException("problem getting local fs", io); + } + } + + private static Path TEST_ROOT_DIR = new Path("target", + TestMiniMRChildTask.class.getName() + "-tmpDir").makeQualified(localFs); + static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar"); + /** * Map class which checks whether temp directory exists * and check the value of java.io.tmpdir @@ -62,34 +84,26 @@ public class TestMiniMRChildTask extends TestCase { * temp directory specified. */ public static class MapClass extends MapReduceBase - implements Mapper { - Path tmpDir; - FileSystem localFs; - public void map (LongWritable key, Text value, - OutputCollector output, - Reporter reporter) throws IOException { - String tmp = null; - if (localFs.exists(tmpDir)) { - tmp = tmpDir.makeQualified(localFs).toString(); - - assertEquals(tmp, new Path(System.getProperty("java.io.tmpdir")). - makeQualified(localFs).toString()); - } else { - fail("Temp directory "+tmpDir +" doesnt exist."); - } - File tmpFile = File.createTempFile("test", ".tmp"); - assertEquals(tmp, new Path(tmpFile.getParent()). - makeQualified(localFs).toString()); - } - public void configure(JobConf job) { - tmpDir = new Path(job.get(JobContext.TASK_TEMP_DIR, "./tmp")); - try { - localFs = FileSystem.getLocal(job); - } catch (IOException ioe) { - ioe.printStackTrace(); - fail("IOException in getting localFS"); - } - } + implements Mapper { + Path tmpDir; + public void map (LongWritable key, Text value, + OutputCollector output, + Reporter reporter) throws IOException { + if (localFs.exists(tmpDir)) { + } else { + fail("Temp directory " + tmpDir +" doesnt exist."); + } + File tmpFile = File.createTempFile("test", ".tmp"); + } + public void configure(JobConf job) { + tmpDir = new Path(System.getProperty("java.io.tmpdir")); + try { + localFs = FileSystem.getLocal(job); + } catch (IOException ioe) { + ioe.printStackTrace(); + fail("IOException in getting localFS"); + } + } } // configure a job @@ -136,7 +150,7 @@ public class TestMiniMRChildTask extends TestCase { Path inDir, Path outDir, String input) - throws IOException { + throws IOException, InterruptedException, ClassNotFoundException { configure(conf, inDir, outDir, input, MapClass.class, IdentityReducer.class); @@ -144,48 +158,13 @@ public class TestMiniMRChildTask extends TestCase { // Launch job with default option for temp dir. // i.e. temp dir is ./tmp - JobClient.runJob(conf); - outFs.delete(outDir, true); - - final String DEFAULT_ABS_TMP_PATH = "/tmp"; - final String DEFAULT_REL_TMP_PATH = "../temp"; - - String absoluteTempPath = null; - String relativeTempPath = null; - - for (String key : new String[] { "test.temp.dir", "test.tmp.dir" }) { - String p = conf.get(key); - if (p == null || p.isEmpty()) { - continue; - } - if (new Path(p).isAbsolute()) { - if (absoluteTempPath == null) { - absoluteTempPath = p; - } - } else { - if (relativeTempPath == null) { - relativeTempPath = p; - } - } - } - - if (absoluteTempPath == null) { - absoluteTempPath = DEFAULT_ABS_TMP_PATH; - } - if (relativeTempPath == null) { - relativeTempPath = DEFAULT_REL_TMP_PATH; - } - - // Launch job by giving relative path to temp dir. - LOG.info("Testing with relative temp dir = "+relativeTempPath); - conf.set("mapred.child.tmp", relativeTempPath); - JobClient.runJob(conf); - outFs.delete(outDir, true); - - // Launch job by giving absolute path to temp dir - LOG.info("Testing with absolute temp dir = "+absoluteTempPath); - conf.set("mapred.child.tmp", absoluteTempPath); - JobClient.runJob(conf); + Job job = new Job(conf); + job.addFileToClassPath(APP_JAR); + job.setJarByClass(TestMiniMRChildTask.class); + job.setMaxMapAttempts(1); // speed up failures + job.waitForCompletion(true); + boolean succeeded = job.waitForCompletion(true); + assertTrue(succeeded); outFs.delete(outDir, true); } @@ -311,20 +290,33 @@ public class TestMiniMRChildTask extends TestCase { } - @Override - public void setUp() { - try { - // create configuration, dfs, file system and mapred cluster - dfs = new MiniDFSCluster(new Configuration(), 1, true, null); - fileSys = dfs.getFileSystem(); - mr = new MiniMRCluster(2, fileSys.getUri().toString(), 1); - } catch (IOException ioe) { - tearDown(); + @BeforeClass + public static void setup() throws IOException { + // create configuration, dfs, file system and mapred cluster + dfs = new MiniDFSCluster(conf, 1, true, null); + fileSys = dfs.getFileSystem(); + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; } + + if (mr == null) { + mr = new MiniMRYarnCluster(TestMiniMRChildTask.class.getName()); + Configuration conf = new Configuration(); + mr.init(conf); + mr.start(); + } + + // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to + // workaround the absent public discache. + localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR); + localFs.setPermission(APP_JAR, new FsPermission("700")); } - @Override - public void tearDown() { + @AfterClass + public static void tearDown() { // close file system and shut down dfs and mapred cluster try { if (fileSys != null) { @@ -334,7 +326,8 @@ public class TestMiniMRChildTask extends TestCase { dfs.shutdown(); } if (mr != null) { - mr.shutdown(); + mr.stop(); + mr = null; } } catch (IOException ioe) { LOG.info("IO exception in closing file system)" ); @@ -351,9 +344,10 @@ public class TestMiniMRChildTask extends TestCase { * the directory specified. We create a temp file and check if is is * created in the directory specified. */ + @Test public void testTaskTempDir(){ try { - JobConf conf = mr.createJobConf(); + JobConf conf = new JobConf(mr.getConfig()); // intialize input, output directories Path inDir = new Path("testing/wc/input"); @@ -375,9 +369,10 @@ public class TestMiniMRChildTask extends TestCase { * - x=y (x can be a already existing env variable or a new variable) * - x=$x:y (replace $x with the current value of x) */ + public void testTaskEnv(){ try { - JobConf conf = mr.createJobConf(); + JobConf conf = new JobConf(mr.getConfig()); // initialize input, output directories Path inDir = new Path("testing/wc/input1"); Path outDir = new Path("testing/wc/output1"); @@ -399,7 +394,7 @@ public class TestMiniMRChildTask extends TestCase { */ public void testTaskOldEnv(){ try { - JobConf conf = mr.createJobConf(); + JobConf conf = new JobConf(mr.getConfig()); // initialize input, output directories Path inDir = new Path("testing/wc/input1"); Path outDir = new Path("testing/wc/output1"); @@ -414,7 +409,7 @@ public class TestMiniMRChildTask extends TestCase { } void runTestTaskEnv(JobConf conf, Path inDir, Path outDir, boolean oldConfigs) - throws IOException { + throws IOException, InterruptedException, ClassNotFoundException { String input = "The input"; configure(conf, inDir, outDir, input, EnvCheckMapper.class, EnvCheckReducer.class); @@ -445,8 +440,14 @@ public class TestMiniMRChildTask extends TestCase { conf.set("path", System.getenv("PATH")); conf.set(mapTaskJavaOptsKey, mapTaskJavaOpts); conf.set(reduceTaskJavaOptsKey, reduceTaskJavaOpts); - RunningJob job = JobClient.runJob(conf); - assertTrue("The environment checker job failed.", job.isSuccessful()); + + Job job = new Job(conf); + job.addFileToClassPath(APP_JAR); + job.setJarByClass(TestMiniMRChildTask.class); + job.setMaxMapAttempts(1); // speed up failures + job.waitForCompletion(true); + boolean succeeded = job.waitForCompletion(true); + assertTrue("The environment checker job failed.", succeeded); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f4cbf6e65b..9d5f891dd9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -512,6 +512,9 @@ public class YarnConfiguration extends Configuration { public static final String YARN_APPLICATION_CLASSPATH = YARN_PREFIX + "application.classpath"; + /** Container temp directory */ + public static final String DEFAULT_CONTAINER_TEMP_DIR = "./tmp"; + public YarnConfiguration() { super(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 3d46053115..76be5ca097 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; @@ -128,6 +129,10 @@ public class DefaultContainerExecutor extends ContainerExecutor { // Create the container log-dirs on all disks createContainerLogDirs(appIdStr, containerIdStr, logDirs); + Path tmpDir = new Path(containerWorkDir, + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); + lfs.mkdir(tmpDir, null, false); + // copy launch script to work dir Path launchDst = new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c index c4bde44a26..4725683396 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c @@ -314,6 +314,13 @@ char* get_app_log_directory(const char *log_root, const char* app_id) { app_id); } +/** + * Get the tmp directory under the working directory + */ +char *get_tmp_directory(const char *work_dir) { + return concatenate("%s/%s", "tmp dir", 2, work_dir, TMP_DIR); +} + /** * Ensure that the given path and all of the parent directories are created * with the desired permissions. @@ -357,7 +364,7 @@ int mkdirs(const char* path, mode_t perm) { * It creates the container work and log directories. */ static int create_container_directories(const char* user, const char *app_id, - const char *container_id, char* const* local_dir, char* const* log_dir) { + const char *container_id, char* const* local_dir, char* const* log_dir, const char *work_dir) { // create dirs as 0750 const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP; if (app_id == NULL || container_id == NULL || user == NULL) { @@ -409,6 +416,23 @@ static int create_container_directories(const char* user, const char *app_id, } free(combined_name); } + + if (result != 0) { + return result; + } + + result = -1; + // also make the tmp directory + char *tmp_dir = get_tmp_directory(work_dir); + + if (tmp_dir == NULL) { + return -1; + } + if (mkdirs(tmp_dir, perms) == 0) { + result = 0; + } + free(tmp_dir); + return result; } @@ -823,7 +847,7 @@ int launch_container_as_user(const char *user, const char *app_id, } if (create_container_directories(user, app_id, container_id, local_dirs, - log_dirs) != 0) { + log_dirs, work_dir) != 0) { fprintf(LOGFILE, "Could not create container dirs"); goto cleanup; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h index baf677a319..ac8b07dac7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h @@ -64,6 +64,7 @@ enum errorcodes { #define CREDENTIALS_FILENAME "container_tokens" #define MIN_USERID_KEY "min.user.id" #define BANNED_USERS_KEY "banned.users" +#define TMP_DIR "tmp" extern struct passwd *user_detail; diff --git a/hadoop-mapreduce-project/src/java/mapred-default.xml b/hadoop-mapreduce-project/src/java/mapred-default.xml index 79605eb169..0d0a91d778 100644 --- a/hadoop-mapreduce-project/src/java/mapred-default.xml +++ b/hadoop-mapreduce-project/src/java/mapred-default.xml @@ -433,18 +433,6 @@ - - mapreduce.task.tmp.dir - ./tmp - To set the value of tmp directory for map and reduce tasks. - If the value is an absolute path, it is directly assigned. Otherwise, it is - prepended with task's working directory. The java tasks are executed with - option -Djava.io.tmpdir='the absolute path of the tmp dir'. Pipes and - streaming are set with environment variable, - TMPDIR='the absolute path of the tmp dir' - - - mapreduce.map.log.level INFO