MAPREDUCE-4810. Added new admin command options for MR AM. Contributed by Jerry Chen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1430707 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-01-09 06:31:36 +00:00
parent 2c5c8fdb80
commit 5f7d4d2b45
5 changed files with 95 additions and 0 deletions

View File

@ -175,6 +175,9 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4520. Added support for MapReduce applications to request for MAPREDUCE-4520. Added support for MapReduce applications to request for
CPU cores along-with memory post YARN-2. (acmurthy) CPU cores along-with memory post YARN-2. (acmurthy)
MAPREDUCE-4810. Added new admin command options for MR AM. (Jerry Chen via
vinodkv)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-3678. The Map tasks logs should have the value of input MAPREDUCE-3678. The Map tasks logs should have the value of input

View File

@ -365,6 +365,11 @@ public interface MRJobConfig {
MR_AM_PREFIX+"command-opts"; MR_AM_PREFIX+"command-opts";
public static final String DEFAULT_MR_AM_COMMAND_OPTS = "-Xmx1024m"; public static final String DEFAULT_MR_AM_COMMAND_OPTS = "-Xmx1024m";
/** Admin command opts passed to the MR app master.*/
public static final String MR_AM_ADMIN_COMMAND_OPTS =
MR_AM_PREFIX+"admin-command-opts";
public static final String DEFAULT_MR_AM_ADMIN_COMMAND_OPTS = "";
/** Root Logging level passed to the MR app master.*/ /** Root Logging level passed to the MR app master.*/
public static final String MR_AM_LOG_LEVEL = public static final String MR_AM_LOG_LEVEL =
MR_AM_PREFIX+"log.level"; MR_AM_PREFIX+"log.level";

View File

@ -874,6 +874,20 @@
</description> </description>
</property> </property>
<property>
<name>yarn.app.mapreduce.am.admin-command-opts</name>
<value></value>
<description>Java opts for the MR App Master processes for admin purposes.
It will appears before the opts set by yarn.app.mapreduce.am.command-opts and
thus its options can be overridden user.
Usage of -Djava.library.path can cause programs to no longer function if
hadoop native libraries are used. These values should instead be set as part
of LD_LIBRARY_PATH in the map / reduce JVM env using the mapreduce.map.env and
mapreduce.reduce.env config settings.
</description>
</property>
<property> <property>
<name>yarn.app.mapreduce.am.job.task.listener.thread-count</name> <name>yarn.app.mapreduce.am.job.task.listener.thread-count</name>
<value>30</value> <value>30</value>

View File

@ -394,6 +394,11 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL); MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
MRApps.addLog4jSystemProperties(logLevel, logSize, vargs); MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
// Add AM admin command opts before user command opts
// so that it can be overridden by user
vargs.add(conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS));
vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS, vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS)); MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));

View File

@ -29,6 +29,7 @@
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -39,11 +40,13 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClientCache; import org.apache.hadoop.mapred.ClientCache;
import org.apache.hadoop.mapred.ClientServiceDelegate; import org.apache.hadoop.mapred.ClientServiceDelegate;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.ResourceMgrDelegate; import org.apache.hadoop.mapred.ResourceMgrDelegate;
import org.apache.hadoop.mapred.YARNRunner; import org.apache.hadoop.mapred.YARNRunner;
import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.ClientRMProtocol;
@ -65,6 +68,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -240,4 +244,68 @@ public synchronized void start() {
delegate.getQueueAclsForCurrentUser(); delegate.getQueueAclsForCurrentUser();
verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)); verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
} }
@Test
public void testAMAdminCommandOpts() throws Exception {
JobConf jobConf = new JobConf();
jobConf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, "-Djava.net.preferIPv4Stack=true");
jobConf.set(MRJobConfig.MR_AM_COMMAND_OPTS, "-Xmx1024m");
YARNRunner yarnRunner = new YARNRunner(jobConf);
File jobxml = new File(testWorkDir, MRJobConfig.JOB_CONF_FILE);
OutputStream out = new FileOutputStream(jobxml);
conf.writeXml(out);
out.close();
File jobsplit = new File(testWorkDir, MRJobConfig.JOB_SPLIT);
out = new FileOutputStream(jobsplit);
out.close();
File jobsplitmetainfo = new File(testWorkDir, MRJobConfig.JOB_SPLIT_METAINFO);
out = new FileOutputStream(jobsplitmetainfo);
out.close();
File appTokens = new File(testWorkDir, MRJobConfig.APPLICATION_TOKENS_FILE);
out = new FileOutputStream(appTokens);
out.close();
ApplicationSubmissionContext submissionContext =
yarnRunner.createApplicationSubmissionContext(jobConf, testWorkDir.toString(), new Credentials());
ContainerLaunchContext containerSpec = submissionContext.getAMContainerSpec();
List<String> commands = containerSpec.getCommands();
int index = 0;
int adminIndex = 0;
int adminPos = -1;
int userIndex = 0;
int userPos = -1;
for(String command : commands) {
if(command != null) {
adminPos = command.indexOf("-Djava.net.preferIPv4Stack=true");
if(adminPos >= 0)
adminIndex = index;
userPos = command.indexOf("-Xmx1024m");
if(userPos >= 0)
userIndex = index;
}
index++;
}
// Check both admin java opts and user java opts are in the commands
assertTrue("AM admin command opts not in the commands.", adminPos > 0);
assertTrue("AM user command opts not in the commands.", userPos > 0);
// Check the admin java opts is before user java opts in the commands
if(adminIndex == userIndex) {
assertTrue("AM admin command opts is after user command opts.", adminPos < userPos);
} else {
assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex);
}
}
} }