diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java index 1ce81fd6a2..d3a291c4a4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java @@ -63,7 +63,7 @@ public abstract class HAAdmin extends Configured implements Tool { private int rpcTimeoutForChecks = -1; - private static Map USAGE = + protected final static Map USAGE = ImmutableMap.builder() .put("-transitionToActive", new UsageInfo("", "Transitions the service into Active state")) @@ -91,6 +91,14 @@ public abstract class HAAdmin extends Configured implements Tool { protected PrintStream out = System.out; private RequestSource requestSource = RequestSource.REQUEST_BY_USER; + protected HAAdmin() { + super(); + } + + protected HAAdmin(Configuration conf) { + super(conf); + } + protected abstract HAServiceTarget resolveTarget(String string); protected String getUsageString() { @@ -461,9 +469,9 @@ private int help(String[] argv) { return 0; } - private static class UsageInfo { - private final String args; - private final String help; + protected static class UsageInfo { + public final String args; + public final String help; public UsageInfo(String args, String help) { this.args = args; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 352f050080..f7a828a9e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -381,10 +381,6 @@ Release 2.3.0 - UNRELEASED HDFS-5267. Remove volatile from LightWeightHashSet. (Junping Du via llu) - HDFS-4657. Limit the number of blocks logged by the NN after a block - report to a configurable value. (Aaron T. Myers via Colin Patrick - McCabe) - HDFS-4278. Log an ERROR when DFS_BLOCK_ACCESS_TOKEN_ENABLE config is disabled but security is turned on. (Kousuke Saruta via harsh) @@ -514,6 +510,10 @@ Release 2.2.1 - UNRELEASED HDFS-5331. make SnapshotDiff.java to a o.a.h.util.Tool interface implementation. (Vinayakumar B via umamahesh) + HDFS-4657. Limit the number of blocks logged by the NN after a block + report to a configurable value. (Aaron T. Myers via Colin Patrick + McCabe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index d81fc50246..08d1e35697 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -212,6 +212,9 @@ Release 2.2.1 - UNRELEASED MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write out text files without separators (Sandy Ryza) + MAPREDUCE-5596. Allow configuring the number of threads used to serve + shuffle connections (Sandy Ryza via jlowe) + OPTIMIZATIONS MAPREDUCE-4680. Job history cleaner should only check timestamps of files in @@ -231,6 +234,9 @@ Release 2.2.1 - UNRELEASED MAPREDUCE-5561. org.apache.hadoop.mapreduce.v2.app.job.impl.TestJobImpl testcase failing on trunk (Karthik Kambatla via jlowe) + MAPREDUCE-5598. TestUserDefinedCounters.testMapReduceJob is flakey + (Robert Kanter via jlowe) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 598d106ce9..29facecb6b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -304,6 +304,16 @@ + + mapreduce.shuffle.max.threads + 0 + Max allowed threads for serving shuffle connections. Set to zero + to indicate the default of 2 times the number of available + processors (as reported by Runtime.availableProcessors()). Netty is used to + serve requests, so a thread is not needed for each connection. + + + mapreduce.reduce.markreset.buffer.percent 0.0 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestUserDefinedCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestUserDefinedCounters.java index e1f1a40897..3c2cf215fb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestUserDefinedCounters.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestUserDefinedCounters.java @@ -40,7 +40,8 @@ public class TestUserDefinedCounters extends TestCase { private static String TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp")).toURI() - .toString().replace(' ', '+'); + .toString().replace(' ', '+') + + "/" + TestUserDefinedCounters.class.getName(); private final Path INPUT_DIR = new Path(TEST_ROOT_DIR + "/input"); private final Path OUTPUT_DIR = new Path(TEST_ROOT_DIR + "/out"); @@ -61,7 +62,7 @@ public void map(K key, V value, } private void cleanAndCreateInput(FileSystem fs) throws IOException { - fs.delete(INPUT_FILE, true); + fs.delete(INPUT_DIR, true); fs.delete(OUTPUT_DIR, true); OutputStream os = fs.create(INPUT_FILE); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java index 0b44ca610a..d36b30f863 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineTextInputFormat.java @@ -72,7 +72,7 @@ public class TestCombineTextInputFormat { new Path(new Path(System.getProperty("test.build.data", "."), "data"), "TestCombineTextInputFormat"); - @Test(timeout=10000) + @Test//(timeout=10000) public void testFormat() throws Exception { Job job = Job.getInstance(new Configuration(defaultConf)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 82fd59e551..9f377e23ac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -163,6 +163,10 @@ public class ShuffleHandler extends AuxiliaryService { public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections"; public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit + + public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads"; + // 0 implies Netty default of 2 * number of available processors + public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0; @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @@ -282,6 +286,11 @@ protected void serviceInit(Configuration conf) throws Exception { maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS, DEFAULT_MAX_SHUFFLE_CONNECTIONS); + int maxShuffleThreads = conf.getInt(MAX_SHUFFLE_THREADS, + DEFAULT_MAX_SHUFFLE_THREADS); + if (maxShuffleThreads == 0) { + maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors(); + } ThreadFactory bossFactory = new ThreadFactoryBuilder() .setNameFormat("ShuffleHandler Netty Boss #%d") @@ -292,7 +301,8 @@ protected void serviceInit(Configuration conf) throws Exception { selector = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory)); + Executors.newCachedThreadPool(workerFactory), + maxShuffleThreads); super.serviceInit(new Configuration(conf)); } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0874e75d77..e29873162e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -32,6 +32,11 @@ Release 2.3.0 - UNRELEASED YARN-1253. Changes to LinuxContainerExecutor to run containers as a single dedicated user in non-secure mode. (rvs via tucu) + YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas) + + YARN-1068. Add admin support for HA operations (Karthik Kambatla via + bikas) + IMPROVEMENTS YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) @@ -41,8 +46,6 @@ Release 2.3.0 - UNRELEASED YARN-1098. Separate out RM services into Always On and Active (Karthik Kambatla via bikas) - YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas) - YARN-353. Add Zookeeper-based store implementation for RMStateStore. (Bikas Saha, Jian He and Karthik Kambatla via hitesh) @@ -114,6 +117,9 @@ Release 2.2.1 - UNRELEASED YARN-1333. Support blacklisting in the Fair Scheduler (Tsuyoshi Ozawa via Sandy Ryza) + YARN-1109. Demote NodeManager "Sending out status for container" logs to + debug (haosdent via Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java index 4678082e3e..9afc5cd698 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java @@ -41,7 +41,9 @@ public class HAUtil { YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.RM_ADMIN_ADDRESS, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, - YarnConfiguration.RM_WEBAPP_ADDRESS)); + YarnConfiguration.RM_WEBAPP_ADDRESS, + // TODO Remove after YARN-1318 + YarnConfiguration.RM_HA_ADMIN_ADDRESS)); public static final String BAD_CONFIG_MESSAGE_PREFIX = "Invalid configuration! "; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index be9e301b91..09f6b6e6dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; @@ -278,10 +279,22 @@ public class YarnConfiguration extends Configuration { public static final String RM_HA_PREFIX = RM_PREFIX + "ha."; public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled"; public static final boolean DEFAULT_RM_HA_ENABLED = false; - + public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; + @org.apache.hadoop.classification.InterfaceAudience.Private + // TODO Remove after YARN-1318 + public static final String RM_HA_ADMIN_ADDRESS = + RM_HA_PREFIX + "admin.address"; + public static final int DEFAULT_RM_HA_ADMIN_PORT = 8034; + public static String DEFAULT_RM_HA_ADMIN_ADDRESS = + "0.0.0.0:" + DEFAULT_RM_HA_ADMIN_PORT; + public static final String RM_HA_ADMIN_CLIENT_THREAD_COUNT = + RM_HA_PREFIX + "admin.client.thread-count"; + public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1; + // end @Private + //////////////////////////////// // RM state store configs //////////////////////////////// @@ -753,6 +766,11 @@ public class YarnConfiguration extends Configuration { public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER = "security.resourcelocalizer.protocol.acl"; + @org.apache.hadoop.classification.InterfaceAudience.Private + // TODO Remove after YARN-1318 + public static final String + YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL = + CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL; /** No. of milliseconds to wait between sending a SIGTERM and SIGKILL * to a running container */ @@ -911,4 +929,14 @@ public InetSocketAddress getSocketAddr( } return NetUtils.createSocketAddr(address, defaultPort, name); } + + @Override + public InetSocketAddress updateConnectAddr(String name, + InetSocketAddress addr) { + String prefix = name; + if (HAUtil.isHAEnabled(this)) { + prefix = HAUtil.addSuffix(prefix, HAUtil.getRMHAId(this)); + } + return super.updateConnectAddr(prefix, addr); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 5d1b5d66b5..807841b2cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -20,18 +20,26 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.ha.HAAdmin; +import org.apache.hadoop.ha.HAServiceTarget; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.RMHAServiceTarget; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; @@ -44,11 +52,35 @@ @Private @Unstable -public class RMAdminCLI extends Configured implements Tool { +public class RMAdminCLI extends HAAdmin { private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + protected final static Map ADMIN_USAGE = + ImmutableMap.builder() + .put("-refreshQueues", new UsageInfo("", + "Reload the queues' acls, states and scheduler specific " + + "properties. \n\t\tResourceManager will reload the " + + "mapred-queues configuration file.")) + .put("-refreshNodes", new UsageInfo("", + "Refresh the hosts information at the ResourceManager.")) + .put("-refreshSuperUserGroupsConfiguration", new UsageInfo("", + "Refresh superuser proxy groups mappings")) + .put("-refreshUserToGroupsMappings", new UsageInfo("", + "Refresh user-to-groups mappings")) + .put("-refreshAdminAcls", new UsageInfo("", + "Refresh acls for administration of ResourceManager")) + .put("-refreshServiceAcl", new UsageInfo("", + "Reload the service-level authorization policy file. \n\t\t" + + "ResoureceManager will reload the authorization policy file.")) + .put("-getGroups", new UsageInfo("[username]", + "Get the groups which given user belongs to.")) + .put("-help", new UsageInfo("[cmd]", + "Displays help for the given command or all commands if none " + + "is specified.")) + .build(); + public RMAdminCLI() { super(); } @@ -57,10 +89,64 @@ public RMAdminCLI(Configuration conf) { super(conf); } + private static void appendHAUsage(final StringBuilder usageBuilder) { + for (String cmdKey : USAGE.keySet()) { + if (cmdKey.equals("-help")) { + continue; + } + UsageInfo usageInfo = USAGE.get(cmdKey); + usageBuilder.append(" [" + cmdKey + " " + usageInfo.args + "]"); + } + } + + private static void buildHelpMsg(String cmd, StringBuilder builder) { + UsageInfo usageInfo = ADMIN_USAGE.get(cmd); + if (usageInfo == null) { + usageInfo = USAGE.get(cmd); + if (usageInfo == null) { + return; + } + } + String space = (usageInfo.args == "") ? "" : " "; + builder.append(" " + cmd + space + usageInfo.args + ": " + + usageInfo.help); + } + + private static void buildIndividualUsageMsg(String cmd, + StringBuilder builder ) { + UsageInfo usageInfo = ADMIN_USAGE.get(cmd); + if (usageInfo == null) { + usageInfo = USAGE.get(cmd); + if (usageInfo == null) { + return; + } + } + String space = (usageInfo.args == "") ? "" : " "; + builder.append("Usage: java RMAdmin [" + + cmd + space + usageInfo.args + + "]\n"); + } + + private static void buildUsageMsg(StringBuilder builder) { + builder.append("Usage: java RMAdmin"); + for (String cmdKey : ADMIN_USAGE.keySet()) { + UsageInfo usageInfo = ADMIN_USAGE.get(cmdKey); + builder.append(" " + cmdKey + " " + usageInfo.args + "\n"); + } + for (String cmdKey : USAGE.keySet()) { + if (!cmdKey.equals("-help")) { + UsageInfo usageInfo = USAGE.get(cmdKey); + builder.append(" " + cmdKey + " " + usageInfo.args + "\n"); + } + } + } + private static void printHelp(String cmd) { - String summary = "rmadmin is the command to execute Map-Reduce administrative commands.\n" + - "The full syntax is: \n\n" + - "hadoop rmadmin" + + StringBuilder summary = new StringBuilder(); + summary.append("rmadmin is the command to execute YARN administrative " + + "commands.\n"); + summary.append("The full syntax is: \n\n" + + "yarn rmadmin" + " [-refreshQueues]" + " [-refreshNodes]" + " [-refreshSuperUserGroupsConfiguration]" + @@ -68,64 +154,25 @@ private static void printHelp(String cmd) { " [-refreshAdminAcls]" + " [-refreshServiceAcl]" + " [-getGroup [username]]" + - " [-help [cmd]]\n"; + " [-help [cmd]]"); + appendHAUsage(summary); + summary.append("\n"); - String refreshQueues = - "-refreshQueues: Reload the queues' acls, states and " - + "scheduler specific properties.\n" - + "\t\tResourceManager will reload the mapred-queues configuration file.\n"; - - String refreshNodes = - "-refreshNodes: Refresh the hosts information at the ResourceManager.\n"; - - String refreshUserToGroupsMappings = - "-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n"; - - String refreshSuperUserGroupsConfiguration = - "-refreshSuperUserGroupsConfiguration: Refresh superuser proxy groups mappings\n"; - - String refreshAdminAcls = - "-refreshAdminAcls: Refresh acls for administration of ResourceManager\n"; - - String refreshServiceAcl = - "-refreshServiceAcl: Reload the service-level authorization policy file\n" + - "\t\tResoureceManager will reload the authorization policy file.\n"; - - String getGroups = - "-getGroups [username]: Get the groups which given user belongs to\n"; - - String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" + - "\t\tis specified.\n"; - - if ("refreshQueues".equals(cmd)) { - System.out.println(refreshQueues); - } else if ("refreshNodes".equals(cmd)) { - System.out.println(refreshNodes); - } else if ("refreshUserToGroupsMappings".equals(cmd)) { - System.out.println(refreshUserToGroupsMappings); - } else if ("refreshSuperUserGroupsConfiguration".equals(cmd)) { - System.out.println(refreshSuperUserGroupsConfiguration); - } else if ("refreshAdminAcls".equals(cmd)) { - System.out.println(refreshAdminAcls); - } else if ("refreshServiceAcl".equals(cmd)) { - System.out.println(refreshServiceAcl); - } else if ("getGroups".equals(cmd)) { - System.out.println(getGroups); - } else if ("help".equals(cmd)) { - System.out.println(help); - } else { - System.out.println(summary); - System.out.println(refreshQueues); - System.out.println(refreshNodes); - System.out.println(refreshUserToGroupsMappings); - System.out.println(refreshSuperUserGroupsConfiguration); - System.out.println(refreshAdminAcls); - System.out.println(refreshServiceAcl); - System.out.println(getGroups); - System.out.println(help); - System.out.println(); - ToolRunner.printGenericCommandUsage(System.out); + StringBuilder helpBuilder = new StringBuilder(); + System.out.println(summary); + for (String cmdKey : ADMIN_USAGE.keySet()) { + buildHelpMsg(cmdKey, helpBuilder); + helpBuilder.append("\n"); } + for (String cmdKey : USAGE.keySet()) { + if (!cmdKey.equals("-help")) { + buildHelpMsg(cmdKey, helpBuilder); + helpBuilder.append("\n"); + } + } + System.out.println(helpBuilder); + System.out.println(); + ToolRunner.printGenericCommandUsage(System.out); } /** @@ -133,33 +180,15 @@ private static void printHelp(String cmd) { * @param cmd The command that is being executed. */ private static void printUsage(String cmd) { - if ("-refreshQueues".equals(cmd)) { - System.err.println("Usage: java RMAdmin" + " [-refreshQueues]"); - } else if ("-refreshNodes".equals(cmd)){ - System.err.println("Usage: java RMAdmin" + " [-refreshNodes]"); - } else if ("-refreshUserToGroupsMappings".equals(cmd)){ - System.err.println("Usage: java RMAdmin" + " [-refreshUserToGroupsMappings]"); - } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)){ - System.err.println("Usage: java RMAdmin" + " [-refreshSuperUserGroupsConfiguration]"); - } else if ("-refreshAdminAcls".equals(cmd)){ - System.err.println("Usage: java RMAdmin" + " [-refreshAdminAcls]"); - } else if ("-refreshService".equals(cmd)){ - System.err.println("Usage: java RMAdmin" + " [-refreshServiceAcl]"); - } else if ("-getGroups".equals(cmd)){ - System.err.println("Usage: java RMAdmin" + " [-getGroups [username]]"); + StringBuilder usageBuilder = new StringBuilder(); + if (ADMIN_USAGE.containsKey(cmd) || USAGE.containsKey(cmd)) { + buildIndividualUsageMsg(cmd, usageBuilder); } else { - System.err.println("Usage: java RMAdmin"); - System.err.println(" [-refreshQueues]"); - System.err.println(" [-refreshNodes]"); - System.err.println(" [-refreshUserToGroupsMappings]"); - System.err.println(" [-refreshSuperUserGroupsConfiguration]"); - System.err.println(" [-refreshAdminAcls]"); - System.err.println(" [-refreshServiceAcl]"); - System.err.println(" [-getGroups [username]]"); - System.err.println(" [-help [cmd]]"); - System.err.println(); - ToolRunner.printGenericCommandUsage(System.err); + buildUsageMsg(usageBuilder); } + System.err.println(usageBuilder); + ToolRunner.printGenericCommandUsage(System.err); + } protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException { @@ -255,6 +284,21 @@ public int run(String[] args) throws Exception { int exitCode = -1; int i = 0; String cmd = args[i++]; + + exitCode = 0; + if ("-help".equals(cmd)) { + if (i < args.length) { + printUsage(args[i]); + } else { + printHelp(""); + } + return exitCode; + } + + if (USAGE.containsKey(cmd)) { + return super.run(args); + } + // // verify that we have enough command line parameters // @@ -268,7 +312,6 @@ public int run(String[] args) throws Exception { } } - exitCode = 0; try { if ("-refreshQueues".equals(cmd)) { exitCode = refreshQueues(); @@ -285,12 +328,6 @@ public int run(String[] args) throws Exception { } else if ("-getGroups".equals(cmd)) { String[] usernames = Arrays.copyOfRange(args, i, args.length); exitCode = getGroups(usernames); - } else if ("-help".equals(cmd)) { - if (i < args.length) { - printUsage(args[i]); - } else { - printHelp(""); - } } else { exitCode = -1; System.err.println(cmd.substring(1) + ": Unknown command"); @@ -324,6 +361,40 @@ public int run(String[] args) throws Exception { return exitCode; } + @Override + public void setConf(Configuration conf) { + if (conf != null) { + if (!(conf instanceof YarnConfiguration)) { + conf = new YarnConfiguration(conf); + } + } + super.setConf(conf); + } + + @Override + protected HAServiceTarget resolveTarget(String rmId) { + Collection rmIds = HAUtil.getRMHAIds(getConf()); + if (!rmIds.contains(rmId)) { + StringBuilder msg = new StringBuilder(); + msg.append(rmId + " is not a valid serviceId. It should be one of "); + for (String id : rmIds) { + msg.append(id + " "); + } + throw new IllegalArgumentException(msg.toString()); + } + try { + YarnConfiguration conf = new YarnConfiguration(getConf()); + conf.set(YarnConfiguration.RM_HA_ID, rmId); + return new RMHAServiceTarget(conf); + } catch (IllegalArgumentException iae) { + throw new YarnRuntimeException("Could not connect to " + rmId + + "; the configuration for it might be missing"); + } catch (IOException ioe) { + throw new YarnRuntimeException( + "Could not connect to RM HA Admin for node " + rmId); + } + } + public static void main(String[] args) throws Exception { int result = ToolRunner.run(new RMAdminCLI(), args); System.exit(result); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java index 675a9be0ee..94c87515f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -31,7 +32,12 @@ import java.io.IOException; import java.io.PrintStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceStatus; +import org.apache.hadoop.ha.HAServiceTarget; import org.apache.hadoop.yarn.client.cli.RMAdminCLI; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -46,11 +52,20 @@ public class TestRMAdminCLI { private ResourceManagerAdministrationProtocol admin; + private HAServiceProtocol haadmin; private RMAdminCLI rmAdminCLI; @Before - public void configure() { + public void configure() throws IOException { admin = mock(ResourceManagerAdministrationProtocol.class); + + haadmin = mock(HAServiceProtocol.class); + when(haadmin.getServiceStatus()).thenReturn(new HAServiceStatus( + HAServiceProtocol.HAServiceState.INITIALIZING)); + + final HAServiceTarget haServiceTarget = mock(HAServiceTarget.class); + when(haServiceTarget.getProxy(any(Configuration.class), anyInt())) + .thenReturn(haadmin); rmAdminCLI = new RMAdminCLI() { @Override @@ -58,7 +73,11 @@ protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException { return admin; } - + + @Override + protected HAServiceTarget resolveTarget(String rmId) { + return haServiceTarget; + } }; } @@ -128,6 +147,36 @@ public boolean matches(Object argument) { } } + @Test(timeout = 500) + public void testTransitionToActive() throws Exception { + String[] args = {"-transitionToActive", "rm1"}; + assertEquals(0, rmAdminCLI.run(args)); + verify(haadmin).transitionToActive( + any(HAServiceProtocol.StateChangeRequestInfo.class)); + } + + @Test(timeout = 500) + public void testTransitionToStandby() throws Exception { + String[] args = {"-transitionToStandby", "rm1"}; + assertEquals(0, rmAdminCLI.run(args)); + verify(haadmin).transitionToStandby( + any(HAServiceProtocol.StateChangeRequestInfo.class)); + } + + @Test(timeout = 500) + public void testGetServiceState() throws Exception { + String[] args = {"-getServiceState", "rm1"}; + assertEquals(0, rmAdminCLI.run(args)); + verify(haadmin).getServiceStatus(); + } + + @Test(timeout = 500) + public void testCheckHealth() throws Exception { + String[] args = {"-checkHealth", "rm1"}; + assertEquals(0, rmAdminCLI.run(args)); + verify(haadmin).monitorHealth(); + } + /** * Test printing of help messages */ @@ -142,18 +191,22 @@ public void testHelp() throws Exception { try { String[] args = { "-help" }; assertEquals(0, rmAdminCLI.run(args)); + oldOutPrintStream.println(dataOut); assertTrue(dataOut .toString() .contains( - "rmadmin is the command to execute Map-Reduce" + - " administrative commands.")); + "rmadmin is the command to execute YARN administrative commands.")); assertTrue(dataOut .toString() .contains( - "hadoop rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" + + "yarn rmadmin [-refreshQueues] [-refreshNodes] [-refreshSuper" + "UserGroupsConfiguration] [-refreshUserToGroupsMappings] " + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup" + - " [username]] [-help [cmd]]")); + " [username]] [-help [cmd]] [-transitionToActive ]" + + " [-transitionToStandby ] [-failover [--forcefence] " + + "[--forceactive] ] " + + "[-getServiceState ] [-checkHealth ]" + )); assertTrue(dataOut .toString() .contains( @@ -184,7 +237,7 @@ public void testHelp() throws Exception { assertTrue(dataOut .toString() .contains( - "-help [cmd]: \tDisplays help for the given command or all " + + "-help [cmd]: Displays help for the given command or all " + "commands if none")); testError(new String[] { "-help", "-refreshQueues" }, @@ -199,12 +252,24 @@ public void testHelp() throws Exception { dataErr, 0); testError(new String[] { "-help", "-refreshAdminAcls" }, "Usage: java RMAdmin [-refreshAdminAcls]", dataErr, 0); - testError(new String[] { "-help", "-refreshService" }, + testError(new String[] { "-help", "-refreshServiceAcl" }, "Usage: java RMAdmin [-refreshServiceAcl]", dataErr, 0); testError(new String[] { "-help", "-getGroups" }, "Usage: java RMAdmin [-getGroups [username]]", dataErr, 0); + testError(new String[] { "-help", "-transitionToActive" }, + "Usage: java RMAdmin [-transitionToActive ]", dataErr, 0); + testError(new String[] { "-help", "-transitionToStandby" }, + "Usage: java RMAdmin [-transitionToStandby ]", dataErr, 0); + testError(new String[] { "-help", "-getServiceState" }, + "Usage: java RMAdmin [-getServiceState ]", dataErr, 0); + testError(new String[] { "-help", "-checkHealth" }, + "Usage: java RMAdmin [-checkHealth ]", dataErr, 0); + testError(new String[] { "-help", "-failover" }, + "Usage: java RMAdmin " + + "[-failover [--forcefence] [--forceactive] " + + " ]", + dataErr, 0); - testError(new String[] { "-help", "-badParameter" }, "Usage: java RMAdmin", dataErr, 0); testError(new String[] { "-badParameter" }, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerStatusPBImpl.java index 9cb28f4d0d..8e3e2cca38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerStatusPBImpl.java @@ -74,7 +74,14 @@ public boolean equals(Object other) { @Override public String toString() { - return TextFormat.shortDebugString(getProto()); + StringBuilder sb = new StringBuilder(); + sb.append("ContainerStatus: ["); + sb.append("ContainerId: ").append(getContainerId()).append(", "); + sb.append("State: ").append(getState()).append(", "); + sb.append("Diagnostics: ").append(getDiagnostics()).append(", "); + sb.append("ExitStatus: ").append(getExitStatus()).append(", "); + sb.append("]"); + return sb.toString(); } private void mergeLocalToBuilder() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java new file mode 100644 index 0000000000..bb07bf8108 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client; + +import org.apache.hadoop.ha.BadFencingConfigurationException; +import org.apache.hadoop.ha.HAServiceTarget; +import org.apache.hadoop.ha.NodeFencer; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import java.io.IOException; +import java.net.InetSocketAddress; + +public class RMHAServiceTarget extends HAServiceTarget { + private InetSocketAddress haAdminServiceAddress; + + public RMHAServiceTarget(YarnConfiguration conf) + throws IOException { + haAdminServiceAddress = conf.getSocketAddr( + YarnConfiguration.RM_HA_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT); + } + + @Override + public InetSocketAddress getAddress() { + return haAdminServiceAddress; + } + + @Override + public InetSocketAddress getZKFCAddress() { + // TODO (YARN-1177): Hook up ZKFC information + return null; + } + + @Override + public NodeFencer getFencer() { + // TODO (YARN-1026): Hook up fencing implementation + return null; + } + + @Override + public void checkFencingConfigured() + throws BadFencingConfigurationException { + // TODO (YARN-1026): Based on fencing implementation + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index d6af3fe7bd..8a06418846 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -341,7 +341,9 @@ public NodeStatus getNodeStatusAndUpdateContainersInContext() { container.cloneAndGetContainerStatus(); containersStatuses.add(containerStatus); ++numActiveContainers; - LOG.info("Sending out status for container: " + containerStatus); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending out status for container: " + containerStatus); + } if (containerStatus.getState() == ContainerState.COMPLETE) { // Remove diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index b1ac7ac446..f5ae5e8243 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; @@ -52,7 +51,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; @@ -135,36 +133,11 @@ protected void serviceStop() throws Exception { } private UserGroupInformation checkAcls(String method) throws YarnException { - UserGroupInformation user; try { - user = UserGroupInformation.getCurrentUser(); + return RMServerUtils.verifyAccess(adminAcl, method, LOG); } catch (IOException ioe) { - LOG.warn("Couldn't get current user", ioe); - - RMAuditLogger.logFailure("UNKNOWN", method, - adminAcl.toString(), "AdminService", - "Couldn't get current user"); throw RPCUtil.getRemoteException(ioe); } - - if (!adminAcl.isUserAllowed(user)) { - LOG.warn("User " + user.getShortUserName() + " doesn't have permission" + - " to call '" + method + "'"); - - RMAuditLogger.logFailure(user.getShortUserName(), method, - adminAcl.toString(), "AdminService", - AuditConstants.UNAUTHORIZED_USER); - - throw RPCUtil.getRemoteException( - new AccessControlException("User " + user.getShortUserName() + - " doesn't have permission" + - " to call '" + method + "'") - ); - } - LOG.info("RM Admin: " + method + " invoked by user " + - user.getShortUserName()); - - return user; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java index 82e5ea254a..c74b2826c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java @@ -20,20 +20,41 @@ import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.HealthCheckFailedException; +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.WritableRpcEngine; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import java.io.IOException; +import java.net.InetSocketAddress; +/** + * Internal class to handle HA related aspects of the {@link ResourceManager}. + * + * TODO (YARN-1318): Some/ all of this functionality should be merged with + * {@link AdminService}. Currently, marking this as Private and Unstable for + * those reasons. + */ @InterfaceAudience.Private @InterfaceStability.Unstable public class RMHAProtocolService extends AbstractService implements @@ -44,6 +65,8 @@ public class RMHAProtocolService extends AbstractService implements private ResourceManager rm; @VisibleForTesting protected HAServiceState haState = HAServiceState.INITIALIZING; + private AccessControlList adminAcl; + private Server haAdminServer; private boolean haEnabled; public RMHAProtocolService(ResourceManager resourceManager) { @@ -59,6 +82,9 @@ protected synchronized void serviceInit(Configuration conf) throws if (haEnabled) { HAUtil.verifyAndSetConfiguration(conf); rm.setConf(this.conf); + adminAcl = new AccessControlList(conf.get( + YarnConfiguration.YARN_ADMIN_ACL, + YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); } rm.createAndInitActiveServices(); super.serviceInit(this.conf); @@ -68,6 +94,7 @@ protected synchronized void serviceInit(Configuration conf) throws protected synchronized void serviceStart() throws Exception { if (haEnabled) { transitionToStandby(true); + startHAAdminServer(); } else { transitionToActive(); } @@ -77,13 +104,70 @@ protected synchronized void serviceStart() throws Exception { @Override protected synchronized void serviceStop() throws Exception { + if (haEnabled) { + stopHAAdminServer(); + } transitionToStandby(false); haState = HAServiceState.STOPPING; super.serviceStop(); } + + protected void startHAAdminServer() throws Exception { + InetSocketAddress haAdminServiceAddress = conf.getSocketAddr( + YarnConfiguration.RM_HA_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT); + + RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, + ProtobufRpcEngine.class); + + HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = + new HAServiceProtocolServerSideTranslatorPB(this); + BlockingService haPbService = + HAServiceProtocolProtos.HAServiceProtocolService + .newReflectiveBlockingService(haServiceProtocolXlator); + + WritableRpcEngine.ensureInitialized(); + + String bindHost = haAdminServiceAddress.getHostName(); + + int serviceHandlerCount = conf.getInt( + YarnConfiguration.RM_HA_ADMIN_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT); + + haAdminServer = new RPC.Builder(conf) + .setProtocol(HAServiceProtocolPB.class) + .setInstance(haPbService) + .setBindAddress(bindHost) + .setPort(haAdminServiceAddress.getPort()) + .setNumHandlers(serviceHandlerCount) + .setVerbose(false) + .build(); + + // Enable service authorization? + if (conf.getBoolean( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { + haAdminServer.refreshServiceAcl(conf, new RMPolicyProvider()); + } + + haAdminServer.start(); + conf.updateConnectAddr(YarnConfiguration.RM_HA_ADMIN_ADDRESS, + haAdminServer.getListenerAddress()); + } + + private void stopHAAdminServer() throws Exception { + if (haAdminServer != null) { + haAdminServer.stop(); + haAdminServer.join(); + haAdminServer = null; + } + } + @Override - public synchronized void monitorHealth() throws HealthCheckFailedException { + public synchronized void monitorHealth() + throws IOException { + checkAccess("monitorHealth"); if (haState == HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) { throw new HealthCheckFailedException( "Active ResourceManager services are not running!"); @@ -103,14 +187,21 @@ private synchronized void transitionToActive() throws Exception { } @Override - public synchronized void transitionToActive(StateChangeRequestInfo reqInfo) { + public synchronized void transitionToActive(StateChangeRequestInfo reqInfo) + throws IOException { + UserGroupInformation user = checkAccess("transitionToActive"); // TODO (YARN-1177): When automatic failover is enabled, // check if transition should be allowed for this request try { transitionToActive(); + RMAuditLogger.logSuccess(user.getShortUserName(), + "transitionToActive", "RMHAProtocolService"); } catch (Exception e) { - LOG.error("Error when transitioning to Active mode", e); - throw new YarnRuntimeException(e); + RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", + adminAcl.toString(), "RMHAProtocolService", + "Exception transitioning to active"); + throw new ServiceFailedException( + "Error when transitioning to Active mode", e); } } @@ -133,19 +224,27 @@ private synchronized void transitionToStandby(boolean initialize) } @Override - public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo) { + public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo) + throws IOException { + UserGroupInformation user = checkAccess("transitionToStandby"); // TODO (YARN-1177): When automatic failover is enabled, // check if transition should be allowed for this request try { transitionToStandby(true); + RMAuditLogger.logSuccess(user.getShortUserName(), + "transitionToStandby", "RMHAProtocolService"); } catch (Exception e) { - LOG.error("Error when transitioning to Standby mode", e); - throw new YarnRuntimeException(e); + RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby", + adminAcl.toString(), "RMHAProtocolService", + "Exception transitioning to standby"); + throw new ServiceFailedException( + "Error when transitioning to Standby mode", e); } } @Override public synchronized HAServiceStatus getServiceStatus() throws IOException { + checkAccess("getServiceState"); HAServiceStatus ret = new HAServiceStatus(haState); if (haState == HAServiceState.ACTIVE || haState == HAServiceState.STANDBY) { ret.setReadyToBecomeActive(); @@ -154,4 +253,8 @@ public synchronized HAServiceStatus getServiceStatus() throws IOException { } return ret; } + + private UserGroupInformation checkAccess(String method) throws IOException { + return RMServerUtils.verifyAccess(adminAcl, method, LOG); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 370040ae74..8c734435a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -18,22 +18,24 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -115,4 +117,44 @@ public static void validateBlacklistRequest(ResourceBlacklistRequest blacklistRe } } } + + /** + * Utility method to verify if the current user has access based on the + * passed {@link AccessControlList} + * @param acl the {@link AccessControlList} to check against + * @param method the method name to be logged + * @param LOG the logger to use + * @return {@link UserGroupInformation} of the current user + * @throws IOException + */ + public static UserGroupInformation verifyAccess( + AccessControlList acl, String method, final Log LOG) + throws IOException { + UserGroupInformation user; + try { + user = UserGroupInformation.getCurrentUser(); + } catch (IOException ioe) { + LOG.warn("Couldn't get current user", ioe); + RMAuditLogger.logFailure("UNKNOWN", method, acl.toString(), + "AdminService", "Couldn't get current user"); + throw ioe; + } + + if (!acl.isUserAllowed(user)) { + LOG.warn("User " + user.getShortUserName() + " doesn't have permission" + + " to call '" + method + "'"); + + RMAuditLogger.logFailure(user.getShortUserName(), method, + acl.toString(), "AdminService", + RMAuditLogger.AuditConstants.UNAUTHORIZED_USER); + + throw new AccessControlException("User " + user.getShortUserName() + + " doesn't have permission" + + " to call '" + method + "'"); + } + if (LOG.isTraceEnabled()) { + LOG.trace(method + " invoked by user " + user.getShortUserName()); + } + return user; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index e46c2bf1d5..f9ee0974a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -182,7 +182,7 @@ protected void serviceInit(Configuration conf) throws Exception { validateConfigs(conf); this.conf = conf; - haService = new RMHAProtocolService(this); + haService = createRMHAProtocolService(); addService(haService); super.serviceInit(conf); } @@ -198,6 +198,10 @@ protected void setRMStateStore(RMStateStore rmStore) { ((RMContextImpl) rmContext).setStateStore(rmStore); } + protected RMHAProtocolService createRMHAProtocolService() { + return new RMHAProtocolService(this); + } + protected RMContainerTokenSecretManager createContainerTokenSecretManager( Configuration conf) { return new RMContainerTokenSecretManager(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java index c5df91ff06..b5b590bffe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java @@ -19,6 +19,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; @@ -52,6 +53,9 @@ public class RMPolicyProvider extends PolicyProvider { new Service( YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL, ContainerManagementProtocolPB.class), + new Service( + YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL, + HAServiceProtocol.class), }; @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index d9ff1b0d49..aba334ad94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -296,6 +296,16 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) .handle(new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed")); } + @Override + protected RMHAProtocolService createRMHAProtocolService() { + return new RMHAProtocolService(this) { + @Override + protected void startHAAdminServer() { + // do nothing + } + }; + } + @Override protected ClientRMService createClientRMService() { return new ClientRMService(getRMContext(), getResourceScheduler(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 869526e97c..c783c74bb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -60,7 +60,7 @@ public void setUp() throws Exception { rm.init(conf); } - private void checkMonitorHealth() { + private void checkMonitorHealth() throws IOException { try { rm.haService.monitorHealth(); } catch (HealthCheckFailedException e) {