diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 42cbc0e3ae..338a160b16 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -334,6 +334,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5674. Missing start and finish time in mapred.JobStatus. (Chuan Liu via cnauroth) + MAPREDUCE-5650. Job fails when hprof mapreduce.task.profile.map/reduce.params + is specified (Gera Shegalov via Sandy Ryza) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java index 2302490aaa..20de915356 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java @@ -212,19 +212,11 @@ public static List getVMCommand( if (conf.getProfileEnabled()) { if (conf.getProfileTaskRange(task.isMapTask() ).isIncluded(task.getPartition())) { - vargs.add( - String.format( - conf.getProfileParams(), - getTaskLogFile(TaskLog.LogName.PROFILE) - ) - ); - if (task.isMapTask()) { - vargs.add(conf.get(MRJobConfig.TASK_MAP_PROFILE_PARAMS, "")); - } - else { - vargs.add(conf.get(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "")); - } - + final String profileParams = conf.get(task.isMapTask() + ? MRJobConfig.TASK_MAP_PROFILE_PARAMS + : MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, conf.getProfileParams()); + vargs.add(String.format(profileParams, + getTaskLogFile(TaskLog.LogName.PROFILE))); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 4f1b20cfea..dea2adf135 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -602,6 +602,31 @@ + + mapreduce.task.profile.params + + JVM profiler parameters used to profile map and reduce task + attempts. This string may contain a single format specifier %s that will + be replaced by the path to profile.out in the task attempt log directory. + To specify different profiling options for map tasks and reduce tasks, + more specific parameters mapreduce.task.profile.map.params and + mapreduce.task.profile.reduce.params should be used. + + + + mapreduce.task.profile.map.params + ${mapreduce.task.profile.params} + Map-task-specific JVM profiler parameters. See + mapreduce.task.profile.params + + + + mapreduce.task.profile.reduce.params + ${mapreduce.task.profile.params} + Reduce-task-specific JVM profiler parameters. See + mapreduce.task.profile.params + + mapreduce.task.skip.start.attempts 2 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java new file mode 100644 index 0000000000..de17528d7f --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithProfiler.java @@ -0,0 +1,244 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2; + +import java.io.*; +import java.util.*; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.SleepJob; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestMRJobsWithProfiler { + + private static final Log LOG = + LogFactory.getLog(TestMRJobsWithProfiler.class); + + private static final EnumSet TERMINAL_RM_APP_STATES = + EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED); + + private static MiniMRYarnCluster mrCluster; + + private static final Configuration CONF = new Configuration(); + private static final FileSystem localFs; + static { + try { + localFs = FileSystem.getLocal(CONF); + } catch (IOException io) { + throw new RuntimeException("problem getting local fs", io); + } + } + + private static final Path TEST_ROOT_DIR = + new Path("target", TestMRJobs.class.getName() + "-tmpDir"). + makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); + + private static final Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar"); + + @Before + public void setup() throws InterruptedException, IOException { + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + if (mrCluster == null) { + mrCluster = new MiniMRYarnCluster(getClass().getName()); + mrCluster.init(CONF); + mrCluster.start(); + } + + // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to + // workaround the absent public discache. + localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR); + localFs.setPermission(APP_JAR, new FsPermission("700")); + } + + @After + public void tearDown() { + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + if (mrCluster != null) { + mrCluster.stop(); + } + } + + + @Test (timeout = 120000) + public void testProfiler() throws IOException, InterruptedException, + ClassNotFoundException { + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + + final SleepJob sleepJob = new SleepJob(); + final JobConf sleepConf = new JobConf(mrCluster.getConfig()); + + sleepConf.setProfileEnabled(true); + // profile map split 1 + sleepConf.setProfileTaskRange(true, "1"); + // profile reduce of map output partitions 1 + sleepConf.setProfileTaskRange(false, "1"); + + // use hprof for map to profile.out + sleepConf.set(MRJobConfig.TASK_MAP_PROFILE_PARAMS, + "-agentlib:hprof=cpu=times,heap=sites,force=n,thread=y,verbose=n," + + "file=%s"); + + // use Xprof for reduce to stdout + sleepConf.set(MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, "-Xprof"); + sleepJob.setConf(sleepConf); + + // 2-map-2-reduce SleepJob + final Job job = sleepJob.createJob(2, 2, 500, 1, 500, 1); + job.setJarByClass(SleepJob.class); + job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. + job.waitForCompletion(true); + final JobId jobId = TypeConverter.toYarn(job.getJobID()); + final ApplicationId appID = jobId.getAppId(); + int pollElapsed = 0; + while (true) { + Thread.sleep(1000); + pollElapsed += 1000; + + if (TERMINAL_RM_APP_STATES.contains( + mrCluster.getResourceManager().getRMContext().getRMApps().get(appID) + .getState())) { + break; + } + + if (pollElapsed >= 60000) { + LOG.warn("application did not reach terminal state within 60 seconds"); + break; + } + } + Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager() + .getRMContext().getRMApps().get(appID).getState()); + + // Job finished, verify logs + // + final Configuration nmConf = mrCluster.getNodeManager(0).getConfig(); + + final String appIdStr = appID.toString(); + final String appIdSuffix = appIdStr.substring( + "application_".length(), appIdStr.length()); + final String containerGlob = "container_" + appIdSuffix + "_*_*"; + + final Map taLogDirs = new HashMap(); + final Pattern taskPattern = Pattern.compile( + ".*Task:(attempt_" + + appIdSuffix + "_[rm]_" + "[0-9]+_[0-9]+).*"); + for (String logDir : + nmConf.getTrimmedStrings(YarnConfiguration.NM_LOG_DIRS)) + { + // filter out MRAppMaster and create attemptId->logDir map + // + for (FileStatus fileStatus : + localFs.globStatus(new Path(logDir + + Path.SEPARATOR + appIdStr + + Path.SEPARATOR + containerGlob + + Path.SEPARATOR + TaskLog.LogName.SYSLOG))) + { + final BufferedReader br = new BufferedReader( + new InputStreamReader(localFs.open(fileStatus.getPath()))); + String line; + while ((line = br.readLine()) != null) { + final Matcher m = taskPattern.matcher(line); + if (m.matches()) { + // found Task done message + taLogDirs.put(TaskAttemptID.forName(m.group(1)), + fileStatus.getPath().getParent()); + break; + } + } + br.close(); + } + } + + Assert.assertEquals(4, taLogDirs.size()); // all 4 attempts found + + for (Map.Entry dirEntry : taLogDirs.entrySet()) { + final TaskAttemptID tid = dirEntry.getKey(); + final Path profilePath = new Path(dirEntry.getValue(), + TaskLog.LogName.PROFILE.toString()); + final Path stdoutPath = new Path(dirEntry.getValue(), + TaskLog.LogName.STDOUT.toString()); + if (tid.getTaskType() == TaskType.MAP) { + if (tid.getTaskID().getId() == 1) { + // verify profile.out + final BufferedReader br = new BufferedReader(new InputStreamReader( + localFs.open(profilePath))); + final String line = br.readLine(); + Assert.assertTrue("No hprof content found!", + line !=null && line.startsWith("JAVA PROFILE")); + br.close(); + Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen()); + } else { + Assert.assertFalse("hprof file should not exist", + localFs.exists(profilePath)); + } + } else { + Assert.assertFalse("hprof file should not exist", + localFs.exists(profilePath)); + if (tid.getTaskID().getId() == 1) { + final BufferedReader br = new BufferedReader(new InputStreamReader( + localFs.open(stdoutPath))); + boolean flatProfFound = false; + String line; + while ((line = br.readLine()) != null) { + if (line.startsWith("Flat profile")) { + flatProfFound = true; + break; + } + } + br.close(); + Assert.assertTrue("Xprof flat profile not found!", flatProfFound); + } else { + Assert.assertEquals(0L, localFs.getFileStatus(stdoutPath).getLen()); + } + } + } + } +}