Merging r1536573 through r1536889 from trunk to branch HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1536890 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-10-29 21:05:15 +00:00
commit 73cf16ce48
22 changed files with 574 additions and 160 deletions

View File

@ -63,7 +63,7 @@ public abstract class HAAdmin extends Configured implements Tool {
private int rpcTimeoutForChecks = -1;
private static Map<String, UsageInfo> USAGE =
protected final static Map<String, UsageInfo> USAGE =
ImmutableMap.<String, UsageInfo>builder()
.put("-transitionToActive",
new UsageInfo("<serviceId>", "Transitions the service into Active state"))
@ -91,6 +91,14 @@ public abstract class HAAdmin extends Configured implements Tool {
protected PrintStream out = System.out;
private RequestSource requestSource = RequestSource.REQUEST_BY_USER;
protected HAAdmin() {
super();
}
protected HAAdmin(Configuration conf) {
super(conf);
}
protected abstract HAServiceTarget resolveTarget(String string);
protected String getUsageString() {
@ -461,9 +469,9 @@ private int help(String[] argv) {
return 0;
}
private static class UsageInfo {
private final String args;
private final String help;
protected static class UsageInfo {
public final String args;
public final String help;
public UsageInfo(String args, String help) {
this.args = args;

View File

@ -381,10 +381,6 @@ Release 2.3.0 - UNRELEASED
HDFS-5267. Remove volatile from LightWeightHashSet. (Junping Du via llu)
HDFS-4657. Limit the number of blocks logged by the NN after a block
report to a configurable value. (Aaron T. Myers via Colin Patrick
McCabe)
HDFS-4278. Log an ERROR when DFS_BLOCK_ACCESS_TOKEN_ENABLE config is
disabled but security is turned on. (Kousuke Saruta via harsh)
@ -514,6 +510,10 @@ Release 2.2.1 - UNRELEASED
HDFS-5331. make SnapshotDiff.java to a o.a.h.util.Tool interface implementation.
(Vinayakumar B via umamahesh)
HDFS-4657. Limit the number of blocks logged by the NN after a block
report to a configurable value. (Aaron T. Myers via Colin Patrick
McCabe)
OPTIMIZATIONS
BUG FIXES

View File

@ -212,6 +212,9 @@ Release 2.2.1 - UNRELEASED
MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write
out text files without separators (Sandy Ryza)
MAPREDUCE-5596. Allow configuring the number of threads used to serve
shuffle connections (Sandy Ryza via jlowe)
OPTIMIZATIONS
MAPREDUCE-4680. Job history cleaner should only check timestamps of files in
@ -231,6 +234,9 @@ Release 2.2.1 - UNRELEASED
MAPREDUCE-5561. org.apache.hadoop.mapreduce.v2.app.job.impl.TestJobImpl
testcase failing on trunk (Karthik Kambatla via jlowe)
MAPREDUCE-5598. TestUserDefinedCounters.testMapReduceJob is flakey
(Robert Kanter via jlowe)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -304,6 +304,16 @@
</description>
</property>
<property>
<name>mapreduce.shuffle.max.threads</name>
<value>0</value>
<description>Max allowed threads for serving shuffle connections. Set to zero
to indicate the default of 2 times the number of available
processors (as reported by Runtime.availableProcessors()). Netty is used to
serve requests, so a thread is not needed for each connection.
</description>
</property>
<property>
<name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value>

View File

@ -40,7 +40,8 @@ public class TestUserDefinedCounters extends TestCase {
private static String TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp")).toURI()
.toString().replace(' ', '+');
.toString().replace(' ', '+')
+ "/" + TestUserDefinedCounters.class.getName();
private final Path INPUT_DIR = new Path(TEST_ROOT_DIR + "/input");
private final Path OUTPUT_DIR = new Path(TEST_ROOT_DIR + "/out");
@ -61,7 +62,7 @@ public void map(K key, V value,
}
private void cleanAndCreateInput(FileSystem fs) throws IOException {
fs.delete(INPUT_FILE, true);
fs.delete(INPUT_DIR, true);
fs.delete(OUTPUT_DIR, true);
OutputStream os = fs.create(INPUT_FILE);

View File

@ -72,7 +72,7 @@ public class TestCombineTextInputFormat {
new Path(new Path(System.getProperty("test.build.data", "."), "data"),
"TestCombineTextInputFormat");
@Test(timeout=10000)
@Test//(timeout=10000)
public void testFormat() throws Exception {
Job job = Job.getInstance(new Configuration(defaultConf));

View File

@ -164,6 +164,10 @@ public class ShuffleHandler extends AuxiliaryService {
public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit
public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
// 0 implies Netty default of 2 * number of available processors
public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
@Metrics(about="Shuffle output metrics", context="mapred")
static class ShuffleMetrics implements ChannelFutureListener {
@Metric("Shuffle output in bytes")
@ -282,6 +286,11 @@ protected void serviceInit(Configuration conf) throws Exception {
maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
DEFAULT_MAX_SHUFFLE_CONNECTIONS);
int maxShuffleThreads = conf.getInt(MAX_SHUFFLE_THREADS,
DEFAULT_MAX_SHUFFLE_THREADS);
if (maxShuffleThreads == 0) {
maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
}
ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Boss #%d")
@ -292,7 +301,8 @@ protected void serviceInit(Configuration conf) throws Exception {
selector = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(bossFactory),
Executors.newCachedThreadPool(workerFactory));
Executors.newCachedThreadPool(workerFactory),
maxShuffleThreads);
super.serviceInit(new Configuration(conf));
}

View File

@ -32,6 +32,11 @@ Release 2.3.0 - UNRELEASED
YARN-1253. Changes to LinuxContainerExecutor to run containers as a single
dedicated user in non-secure mode. (rvs via tucu)
YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
YARN-1068. Add admin support for HA operations (Karthik Kambatla via
bikas)
IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
@ -41,8 +46,6 @@ Release 2.3.0 - UNRELEASED
YARN-1098. Separate out RM services into Always On and Active (Karthik
Kambatla via bikas)
YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
YARN-353. Add Zookeeper-based store implementation for RMStateStore.
(Bikas Saha, Jian He and Karthik Kambatla via hitesh)
@ -114,6 +117,9 @@ Release 2.2.1 - UNRELEASED
YARN-1333. Support blacklisting in the Fair Scheduler (Tsuyoshi Ozawa via
Sandy Ryza)
YARN-1109. Demote NodeManager "Sending out status for container" logs to
debug (haosdent via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES

View File

@ -41,7 +41,9 @@ public class HAUtil {
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.RM_WEBAPP_ADDRESS));
YarnConfiguration.RM_WEBAPP_ADDRESS,
// TODO Remove after YARN-1318
YarnConfiguration.RM_HA_ADMIN_ADDRESS));
public static final String BAD_CONFIG_MESSAGE_PREFIX =
"Invalid configuration! ";

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
@ -282,6 +283,18 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
public static final String RM_HA_ID = RM_HA_PREFIX + "id";
@org.apache.hadoop.classification.InterfaceAudience.Private
// TODO Remove after YARN-1318
public static final String RM_HA_ADMIN_ADDRESS =
RM_HA_PREFIX + "admin.address";
public static final int DEFAULT_RM_HA_ADMIN_PORT = 8034;
public static String DEFAULT_RM_HA_ADMIN_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_HA_ADMIN_PORT;
public static final String RM_HA_ADMIN_CLIENT_THREAD_COUNT =
RM_HA_PREFIX + "admin.client.thread-count";
public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1;
// end @Private
////////////////////////////////
// RM state store configs
////////////////////////////////
@ -753,6 +766,11 @@ public class YarnConfiguration extends Configuration {
public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER =
"security.resourcelocalizer.protocol.acl";
@org.apache.hadoop.classification.InterfaceAudience.Private
// TODO Remove after YARN-1318
public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL =
CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL;
/** No. of milliseconds to wait between sending a SIGTERM and SIGKILL
* to a running container */
@ -911,4 +929,14 @@ public InetSocketAddress getSocketAddr(
}
return NetUtils.createSocketAddr(address, defaultPort, name);
}
@Override
public InetSocketAddress updateConnectAddr(String name,
InetSocketAddress addr) {
String prefix = name;
if (HAUtil.isHAEnabled(this)) {
prefix = HAUtil.addSuffix(prefix, HAUtil.getRMHAId(this));
}
return super.updateConnectAddr(prefix, addr);
}
}

View File

@ -20,18 +20,26 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ha.HAAdmin;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.RMHAServiceTarget;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
@ -44,11 +52,35 @@
@Private
@Unstable
public class RMAdminCLI extends Configured implements Tool {
public class RMAdminCLI extends HAAdmin {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
protected final static Map<String, UsageInfo> ADMIN_USAGE =
ImmutableMap.<String, UsageInfo>builder()
.put("-refreshQueues", new UsageInfo("",
"Reload the queues' acls, states and scheduler specific " +
"properties. \n\t\tResourceManager will reload the " +
"mapred-queues configuration file."))
.put("-refreshNodes", new UsageInfo("",
"Refresh the hosts information at the ResourceManager."))
.put("-refreshSuperUserGroupsConfiguration", new UsageInfo("",
"Refresh superuser proxy groups mappings"))
.put("-refreshUserToGroupsMappings", new UsageInfo("",
"Refresh user-to-groups mappings"))
.put("-refreshAdminAcls", new UsageInfo("",
"Refresh acls for administration of ResourceManager"))
.put("-refreshServiceAcl", new UsageInfo("",
"Reload the service-level authorization policy file. \n\t\t" +
"ResoureceManager will reload the authorization policy file."))
.put("-getGroups", new UsageInfo("[username]",
"Get the groups which given user belongs to."))
.put("-help", new UsageInfo("[cmd]",
"Displays help for the given command or all commands if none " +
"is specified."))
.build();
public RMAdminCLI() {
super();
}
@ -57,10 +89,64 @@ public RMAdminCLI(Configuration conf) {
super(conf);
}
private static void appendHAUsage(final StringBuilder usageBuilder) {
for (String cmdKey : USAGE.keySet()) {
if (cmdKey.equals("-help")) {
continue;
}
UsageInfo usageInfo = USAGE.get(cmdKey);
usageBuilder.append(" [" + cmdKey + " " + usageInfo.args + "]");
}
}
private static void buildHelpMsg(String cmd, StringBuilder builder) {
UsageInfo usageInfo = ADMIN_USAGE.get(cmd);
if (usageInfo == null) {
usageInfo = USAGE.get(cmd);
if (usageInfo == null) {
return;
}
}
String space = (usageInfo.args == "") ? "" : " ";
builder.append(" " + cmd + space + usageInfo.args + ": " +
usageInfo.help);
}
private static void buildIndividualUsageMsg(String cmd,
StringBuilder builder ) {
UsageInfo usageInfo = ADMIN_USAGE.get(cmd);
if (usageInfo == null) {
usageInfo = USAGE.get(cmd);
if (usageInfo == null) {
return;
}
}
String space = (usageInfo.args == "") ? "" : " ";
builder.append("Usage: java RMAdmin ["
+ cmd + space + usageInfo.args
+ "]\n");
}
private static void buildUsageMsg(StringBuilder builder) {
builder.append("Usage: java RMAdmin");
for (String cmdKey : ADMIN_USAGE.keySet()) {
UsageInfo usageInfo = ADMIN_USAGE.get(cmdKey);
builder.append(" " + cmdKey + " " + usageInfo.args + "\n");
}
for (String cmdKey : USAGE.keySet()) {
if (!cmdKey.equals("-help")) {
UsageInfo usageInfo = USAGE.get(cmdKey);
builder.append(" " + cmdKey + " " + usageInfo.args + "\n");
}
}
}
private static void printHelp(String cmd) {
String summary = "rmadmin is the command to execute Map-Reduce administrative commands.\n" +
"The full syntax is: \n\n" +
"hadoop rmadmin" +
StringBuilder summary = new StringBuilder();
summary.append("rmadmin is the command to execute YARN administrative " +
"commands.\n");
summary.append("The full syntax is: \n\n" +
"yarn rmadmin" +
" [-refreshQueues]" +
" [-refreshNodes]" +
" [-refreshSuperUserGroupsConfiguration]" +
@ -68,64 +154,25 @@ private static void printHelp(String cmd) {
" [-refreshAdminAcls]" +
" [-refreshServiceAcl]" +
" [-getGroup [username]]" +
" [-help [cmd]]\n";
" [-help [cmd]]");
appendHAUsage(summary);
summary.append("\n");
String refreshQueues =
"-refreshQueues: Reload the queues' acls, states and "
+ "scheduler specific properties.\n"
+ "\t\tResourceManager will reload the mapred-queues configuration file.\n";
String refreshNodes =
"-refreshNodes: Refresh the hosts information at the ResourceManager.\n";
String refreshUserToGroupsMappings =
"-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
String refreshSuperUserGroupsConfiguration =
"-refreshSuperUserGroupsConfiguration: Refresh superuser proxy groups mappings\n";
String refreshAdminAcls =
"-refreshAdminAcls: Refresh acls for administration of ResourceManager\n";
String refreshServiceAcl =
"-refreshServiceAcl: Reload the service-level authorization policy file\n" +
"\t\tResoureceManager will reload the authorization policy file.\n";
String getGroups =
"-getGroups [username]: Get the groups which given user belongs to\n";
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
"\t\tis specified.\n";
if ("refreshQueues".equals(cmd)) {
System.out.println(refreshQueues);
} else if ("refreshNodes".equals(cmd)) {
System.out.println(refreshNodes);
} else if ("refreshUserToGroupsMappings".equals(cmd)) {
System.out.println(refreshUserToGroupsMappings);
} else if ("refreshSuperUserGroupsConfiguration".equals(cmd)) {
System.out.println(refreshSuperUserGroupsConfiguration);
} else if ("refreshAdminAcls".equals(cmd)) {
System.out.println(refreshAdminAcls);
} else if ("refreshServiceAcl".equals(cmd)) {
System.out.println(refreshServiceAcl);
} else if ("getGroups".equals(cmd)) {
System.out.println(getGroups);
} else if ("help".equals(cmd)) {
System.out.println(help);
} else {
System.out.println(summary);
System.out.println(refreshQueues);
System.out.println(refreshNodes);
System.out.println(refreshUserToGroupsMappings);
System.out.println(refreshSuperUserGroupsConfiguration);
System.out.println(refreshAdminAcls);
System.out.println(refreshServiceAcl);
System.out.println(getGroups);
System.out.println(help);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
StringBuilder helpBuilder = new StringBuilder();
System.out.println(summary);
for (String cmdKey : ADMIN_USAGE.keySet()) {
buildHelpMsg(cmdKey, helpBuilder);
helpBuilder.append("\n");
}
for (String cmdKey : USAGE.keySet()) {
if (!cmdKey.equals("-help")) {
buildHelpMsg(cmdKey, helpBuilder);
helpBuilder.append("\n");
}
}
System.out.println(helpBuilder);
System.out.println();
ToolRunner.printGenericCommandUsage(System.out);
}
/**
@ -133,33 +180,15 @@ private static void printHelp(String cmd) {
* @param cmd The command that is being executed.
*/
private static void printUsage(String cmd) {
if ("-refreshQueues".equals(cmd)) {
System.err.println("Usage: java RMAdmin" + " [-refreshQueues]");
} else if ("-refreshNodes".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-refreshNodes]");
} else if ("-refreshUserToGroupsMappings".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-refreshUserToGroupsMappings]");
} else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-refreshSuperUserGroupsConfiguration]");
} else if ("-refreshAdminAcls".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-refreshAdminAcls]");
} else if ("-refreshService".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-refreshServiceAcl]");
} else if ("-getGroups".equals(cmd)){
System.err.println("Usage: java RMAdmin" + " [-getGroups [username]]");
StringBuilder usageBuilder = new StringBuilder();
if (ADMIN_USAGE.containsKey(cmd) || USAGE.containsKey(cmd)) {
buildIndividualUsageMsg(cmd, usageBuilder);
} else {
System.err.println("Usage: java RMAdmin");
System.err.println(" [-refreshQueues]");
System.err.println(" [-refreshNodes]");
System.err.println(" [-refreshUserToGroupsMappings]");
System.err.println(" [-refreshSuperUserGroupsConfiguration]");
System.err.println(" [-refreshAdminAcls]");
System.err.println(" [-refreshServiceAcl]");
System.err.println(" [-getGroups [username]]");
System.err.println(" [-help [cmd]]");
System.err.println();
ToolRunner.printGenericCommandUsage(System.err);
buildUsageMsg(usageBuilder);
}
System.err.println(usageBuilder);
ToolRunner.printGenericCommandUsage(System.err);
}
protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException {
@ -255,6 +284,21 @@ public int run(String[] args) throws Exception {
int exitCode = -1;
int i = 0;
String cmd = args[i++];
exitCode = 0;
if ("-help".equals(cmd)) {
if (i < args.length) {
printUsage(args[i]);
} else {
printHelp("");
}
return exitCode;
}
if (USAGE.containsKey(cmd)) {
return super.run(args);
}
//
// verify that we have enough command line parameters
//
@ -268,7 +312,6 @@ public int run(String[] args) throws Exception {
}
}
exitCode = 0;
try {
if ("-refreshQueues".equals(cmd)) {
exitCode = refreshQueues();
@ -285,12 +328,6 @@ public int run(String[] args) throws Exception {
} else if ("-getGroups".equals(cmd)) {
String[] usernames = Arrays.copyOfRange(args, i, args.length);
exitCode = getGroups(usernames);
} else if ("-help".equals(cmd)) {
if (i < args.length) {
printUsage(args[i]);
} else {
printHelp("");
}
} else {
exitCode = -1;
System.err.println(cmd.substring(1) + ": Unknown command");
@ -324,6 +361,40 @@ public int run(String[] args) throws Exception {
return exitCode;
}
@Override
public void setConf(Configuration conf) {
if (conf != null) {
if (!(conf instanceof YarnConfiguration)) {
conf = new YarnConfiguration(conf);
}
}
super.setConf(conf);
}
@Override
protected HAServiceTarget resolveTarget(String rmId) {
Collection<String> rmIds = HAUtil.getRMHAIds(getConf());
if (!rmIds.contains(rmId)) {
StringBuilder msg = new StringBuilder();
msg.append(rmId + " is not a valid serviceId. It should be one of ");
for (String id : rmIds) {
msg.append(id + " ");
}
throw new IllegalArgumentException(msg.toString());
}
try {
YarnConfiguration conf = new YarnConfiguration(getConf());
conf.set(YarnConfiguration.RM_HA_ID, rmId);
return new RMHAServiceTarget(conf);
} catch (IllegalArgumentException iae) {
throw new YarnRuntimeException("Could not connect to " + rmId +
"; the configuration for it might be missing");
} catch (IOException ioe) {
throw new YarnRuntimeException(
"Could not connect to RM HA Admin for node " + rmId);
}
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new RMAdminCLI(), args);
System.exit(result);

View File

@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@ -31,7 +32,12 @@
import java.io.IOException;
import java.io.PrintStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@ -46,11 +52,20 @@
public class TestRMAdminCLI {
private ResourceManagerAdministrationProtocol admin;
private HAServiceProtocol haadmin;
private RMAdminCLI rmAdminCLI;
@Before
public void configure() {
public void configure() throws IOException {
admin = mock(ResourceManagerAdministrationProtocol.class);
haadmin = mock(HAServiceProtocol.class);
when(haadmin.getServiceStatus()).thenReturn(new HAServiceStatus(
HAServiceProtocol.HAServiceState.INITIALIZING));
final HAServiceTarget haServiceTarget = mock(HAServiceTarget.class);
when(haServiceTarget.getProxy(any(Configuration.class), anyInt()))
.thenReturn(haadmin);
rmAdminCLI = new RMAdminCLI() {
@Override
@ -59,6 +74,10 @@ protected ResourceManagerAdministrationProtocol createAdminProtocol()
return admin;
}
@Override
protected HAServiceTarget resolveTarget(String rmId) {
return haServiceTarget;
}
};
}
@ -128,6 +147,36 @@ public boolean matches(Object argument) {
}
}
@Test(timeout = 500)
public void testTransitionToActive() throws Exception {
String[] args = {"-transitionToActive", "rm1"};
assertEquals(0, rmAdminCLI.run(args));
verify(haadmin).transitionToActive(
any(HAServiceProtocol.StateChangeRequestInfo.class));
}
@Test(timeout = 500)
public void testTransitionToStandby() throws Exception {
String[] args = {"-transitionToStandby", "rm1"};
assertEquals(0, rmAdminCLI.run(args));
verify(haadmin).transitionToStandby(
any(HAServiceProtocol.StateChangeRequestInfo.class));
}
@Test(timeout = 500)
public void testGetServiceState() throws Exception {
String[] args = {"-getServiceState", "rm1"};
assertEquals(0, rmAdminCLI.run(args));
verify(haadmin).getServiceStatus();
}
@Test(timeout = 500)
public void testCheckHealth() throws Exception {
String[] args = {"-checkHealth", "rm1"};
assertEquals(0, rmAdminCLI.run(args));
verify(haadmin).monitorHealth();
}
/**
* Test printing of help messages
*/
@ -142,18 +191,22 @@ public void testHelp() throws Exception {
try {
String[] args = { "-help" };
assertEquals(0, rmAdminCLI.run(args));
oldOutPrintStream.println(dataOut);
assertTrue(dataOut
.toString()
.contains(
"rmadmin is the command to execute Map-Reduce" +
" administrative commands."));
"rmadmin is the command to execute YARN administrative commands."));
assertTrue(dataOut
.toString()
.contains(
"hadoop rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" +
"yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" +
"UserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
"[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" +
" [username]] [-help [cmd]]"));
" [username]] [-help [cmd]] [-transitionToActive <serviceId>]" +
" [-transitionToStandby <serviceId>] [-failover [--forcefence] " +
"[--forceactive] <serviceId> <serviceId>] " +
"[-getServiceState <serviceId>] [-checkHealth <serviceId>]"
));
assertTrue(dataOut
.toString()
.contains(
@ -184,7 +237,7 @@ public void testHelp() throws Exception {
assertTrue(dataOut
.toString()
.contains(
"-help [cmd]: \tDisplays help for the given command or all " +
"-help [cmd]: Displays help for the given command or all " +
"commands if none"));
testError(new String[] { "-help", "-refreshQueues" },
@ -199,11 +252,23 @@ public void testHelp() throws Exception {
dataErr, 0);
testError(new String[] { "-help", "-refreshAdminAcls" },
"Usage: java RMAdmin [-refreshAdminAcls]", dataErr, 0);
testError(new String[] { "-help", "-refreshService" },
testError(new String[] { "-help", "-refreshServiceAcl" },
"Usage: java RMAdmin [-refreshServiceAcl]", dataErr, 0);
testError(new String[] { "-help", "-getGroups" },
"Usage: java RMAdmin [-getGroups [username]]", dataErr, 0);
testError(new String[] { "-help", "-transitionToActive" },
"Usage: java RMAdmin [-transitionToActive <serviceId>]", dataErr, 0);
testError(new String[] { "-help", "-transitionToStandby" },
"Usage: java RMAdmin [-transitionToStandby <serviceId>]", dataErr, 0);
testError(new String[] { "-help", "-getServiceState" },
"Usage: java RMAdmin [-getServiceState <serviceId>]", dataErr, 0);
testError(new String[] { "-help", "-checkHealth" },
"Usage: java RMAdmin [-checkHealth <serviceId>]", dataErr, 0);
testError(new String[] { "-help", "-failover" },
"Usage: java RMAdmin " +
"[-failover [--forcefence] [--forceactive] " +
"<serviceId> <serviceId>]",
dataErr, 0);
testError(new String[] { "-help", "-badParameter" },
"Usage: java RMAdmin", dataErr, 0);

View File

@ -74,7 +74,14 @@ public boolean equals(Object other) {
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
StringBuilder sb = new StringBuilder();
sb.append("ContainerStatus: [");
sb.append("ContainerId: ").append(getContainerId()).append(", ");
sb.append("State: ").append(getState()).append(", ");
sb.append("Diagnostics: ").append(getDiagnostics()).append(", ");
sb.append("ExitStatus: ").append(getExitStatus()).append(", ");
sb.append("]");
return sb.toString();
}
private void mergeLocalToBuilder() {

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import org.apache.hadoop.ha.BadFencingConfigurationException;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.NodeFencer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.IOException;
import java.net.InetSocketAddress;
public class RMHAServiceTarget extends HAServiceTarget {
private InetSocketAddress haAdminServiceAddress;
public RMHAServiceTarget(YarnConfiguration conf)
throws IOException {
haAdminServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT);
}
@Override
public InetSocketAddress getAddress() {
return haAdminServiceAddress;
}
@Override
public InetSocketAddress getZKFCAddress() {
// TODO (YARN-1177): Hook up ZKFC information
return null;
}
@Override
public NodeFencer getFencer() {
// TODO (YARN-1026): Hook up fencing implementation
return null;
}
@Override
public void checkFencingConfigured()
throws BadFencingConfigurationException {
// TODO (YARN-1026): Based on fencing implementation
}
}

View File

@ -341,7 +341,9 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() {
container.cloneAndGetContainerStatus();
containersStatuses.add(containerStatus);
++numActiveContainers;
LOG.info("Sending out status for container: " + containerStatus);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending out status for container: " + containerStatus);
}
if (containerStatus.getState() == ContainerState.COMPLETE) {
// Remove

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@ -52,7 +51,6 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
@ -135,36 +133,11 @@ protected void serviceStop() throws Exception {
}
private UserGroupInformation checkAcls(String method) throws YarnException {
UserGroupInformation user;
try {
user = UserGroupInformation.getCurrentUser();
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
} catch (IOException ioe) {
LOG.warn("Couldn't get current user", ioe);
RMAuditLogger.logFailure("UNKNOWN", method,
adminAcl.toString(), "AdminService",
"Couldn't get current user");
throw RPCUtil.getRemoteException(ioe);
}
if (!adminAcl.isUserAllowed(user)) {
LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
" to call '" + method + "'");
RMAuditLogger.logFailure(user.getShortUserName(), method,
adminAcl.toString(), "AdminService",
AuditConstants.UNAUTHORIZED_USER);
throw RPCUtil.getRemoteException(
new AccessControlException("User " + user.getShortUserName() +
" doesn't have permission" +
" to call '" + method + "'")
);
}
LOG.info("RM Admin: " + method + " invoked by user " +
user.getShortUserName());
return user;
}
@Override

View File

@ -20,20 +20,41 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* Internal class to handle HA related aspects of the {@link ResourceManager}.
*
* TODO (YARN-1318): Some/ all of this functionality should be merged with
* {@link AdminService}. Currently, marking this as Private and Unstable for
* those reasons.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RMHAProtocolService extends AbstractService implements
@ -44,6 +65,8 @@ public class RMHAProtocolService extends AbstractService implements
private ResourceManager rm;
@VisibleForTesting
protected HAServiceState haState = HAServiceState.INITIALIZING;
private AccessControlList adminAcl;
private Server haAdminServer;
private boolean haEnabled;
public RMHAProtocolService(ResourceManager resourceManager) {
@ -59,6 +82,9 @@ protected synchronized void serviceInit(Configuration conf) throws
if (haEnabled) {
HAUtil.verifyAndSetConfiguration(conf);
rm.setConf(this.conf);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
}
rm.createAndInitActiveServices();
super.serviceInit(this.conf);
@ -68,6 +94,7 @@ protected synchronized void serviceInit(Configuration conf) throws
protected synchronized void serviceStart() throws Exception {
if (haEnabled) {
transitionToStandby(true);
startHAAdminServer();
} else {
transitionToActive();
}
@ -77,13 +104,70 @@ protected synchronized void serviceStart() throws Exception {
@Override
protected synchronized void serviceStop() throws Exception {
if (haEnabled) {
stopHAAdminServer();
}
transitionToStandby(false);
haState = HAServiceState.STOPPING;
super.serviceStop();
}
protected void startHAAdminServer() throws Exception {
InetSocketAddress haAdminServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT);
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService =
HAServiceProtocolProtos.HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
WritableRpcEngine.ensureInitialized();
String bindHost = haAdminServiceAddress.getHostName();
int serviceHandlerCount = conf.getInt(
YarnConfiguration.RM_HA_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT);
haAdminServer = new RPC.Builder(conf)
.setProtocol(HAServiceProtocolPB.class)
.setInstance(haPbService)
.setBindAddress(bindHost)
.setPort(haAdminServiceAddress.getPort())
.setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.build();
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
haAdminServer.refreshServiceAcl(conf, new RMPolicyProvider());
}
haAdminServer.start();
conf.updateConnectAddr(YarnConfiguration.RM_HA_ADMIN_ADDRESS,
haAdminServer.getListenerAddress());
}
private void stopHAAdminServer() throws Exception {
if (haAdminServer != null) {
haAdminServer.stop();
haAdminServer.join();
haAdminServer = null;
}
}
@Override
public synchronized void monitorHealth() throws HealthCheckFailedException {
public synchronized void monitorHealth()
throws IOException {
checkAccess("monitorHealth");
if (haState == HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
throw new HealthCheckFailedException(
"Active ResourceManager services are not running!");
@ -103,14 +187,21 @@ private synchronized void transitionToActive() throws Exception {
}
@Override
public synchronized void transitionToActive(StateChangeRequestInfo reqInfo) {
public synchronized void transitionToActive(StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToActive");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToActive();
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) {
LOG.error("Error when transitioning to Active mode", e);
throw new YarnRuntimeException(e);
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
}
}
@ -133,19 +224,27 @@ private synchronized void transitionToStandby(boolean initialize)
}
@Override
public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo) {
public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToStandby");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToStandby", "RMHAProtocolService");
} catch (Exception e) {
LOG.error("Error when transitioning to Standby mode", e);
throw new YarnRuntimeException(e);
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to standby");
throw new ServiceFailedException(
"Error when transitioning to Standby mode", e);
}
}
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
HAServiceStatus ret = new HAServiceStatus(haState);
if (haState == HAServiceState.ACTIVE || haState == HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
@ -154,4 +253,8 @@ public synchronized HAServiceStatus getServiceStatus() throws IOException {
}
return ret;
}
private UserGroupInformation checkAccess(String method) throws IOException {
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
}
}

View File

@ -18,22 +18,24 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@ -115,4 +117,44 @@ public static void validateBlacklistRequest(ResourceBlacklistRequest blacklistRe
}
}
}
/**
* Utility method to verify if the current user has access based on the
* passed {@link AccessControlList}
* @param acl the {@link AccessControlList} to check against
* @param method the method name to be logged
* @param LOG the logger to use
* @return {@link UserGroupInformation} of the current user
* @throws IOException
*/
public static UserGroupInformation verifyAccess(
AccessControlList acl, String method, final Log LOG)
throws IOException {
UserGroupInformation user;
try {
user = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
LOG.warn("Couldn't get current user", ioe);
RMAuditLogger.logFailure("UNKNOWN", method, acl.toString(),
"AdminService", "Couldn't get current user");
throw ioe;
}
if (!acl.isUserAllowed(user)) {
LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
" to call '" + method + "'");
RMAuditLogger.logFailure(user.getShortUserName(), method,
acl.toString(), "AdminService",
RMAuditLogger.AuditConstants.UNAUTHORIZED_USER);
throw new AccessControlException("User " + user.getShortUserName() +
" doesn't have permission" +
" to call '" + method + "'");
}
if (LOG.isTraceEnabled()) {
LOG.trace(method + " invoked by user " + user.getShortUserName());
}
return user;
}
}

View File

@ -182,7 +182,7 @@ protected void serviceInit(Configuration conf) throws Exception {
validateConfigs(conf);
this.conf = conf;
haService = new RMHAProtocolService(this);
haService = createRMHAProtocolService();
addService(haService);
super.serviceInit(conf);
}
@ -198,6 +198,10 @@ protected void setRMStateStore(RMStateStore rmStore) {
((RMContextImpl) rmContext).setStateStore(rmStore);
}
protected RMHAProtocolService createRMHAProtocolService() {
return new RMHAProtocolService(this);
}
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
Configuration conf) {
return new RMContainerTokenSecretManager(conf);

View File

@ -19,6 +19,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
@ -52,6 +53,9 @@ public class RMPolicyProvider extends PolicyProvider {
new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL,
ContainerManagementProtocolPB.class),
new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL,
HAServiceProtocol.class),
};
@Override

View File

@ -296,6 +296,16 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId)
.handle(new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed"));
}
@Override
protected RMHAProtocolService createRMHAProtocolService() {
return new RMHAProtocolService(this) {
@Override
protected void startHAAdminServer() {
// do nothing
}
};
}
@Override
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),

View File

@ -60,7 +60,7 @@ public void setUp() throws Exception {
rm.init(conf);
}
private void checkMonitorHealth() {
private void checkMonitorHealth() throws IOException {
try {
rm.haService.monitorHealth();
} catch (HealthCheckFailedException e) {