From 82567664988b673f1b819a42a4baf31cb0dcb331 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sat, 18 Oct 2014 12:07:40 -0700 Subject: [PATCH] YARN-2504. Enhanced RM Admin CLI to support management of node-labels. Contribyted by Wangda Tan. --- hadoop-yarn-project/CHANGES.txt | 5 +- ...ResourceManagerAdministrationProtocol.java | 40 +++ ...ourcemanager_administration_protocol.proto | 5 + .../hadoop/yarn/client/cli/RMAdminCLI.java | 259 +++++++++++++++++- .../yarn/client/{ => cli}/TestRMAdminCLI.java | 158 ++++++++++- .../nodelabels/CommonNodeLabelsManager.java | 6 +- ...gerAdministrationProtocolPBClientImpl.java | 97 ++++++- ...erAdministrationProtocolPBServiceImpl.java | 107 ++++++++ .../DummyCommonNodeLabelsManager.java | 5 + .../server/resourcemanager/AdminService.java | 112 ++++++++ 10 files changed, 780 insertions(+), 14 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/{ => cli}/TestRMAdminCLI.java (71%) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2207a59702..52485c8fe8 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -172,9 +172,12 @@ Release 2.6.0 - UNRELEASED YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating resources based on node-labels. (Wangda Tan via vinodkv) - YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources + YARN-2500. Enhaced ResourceManager to support schedulers allocating resources based on node-labels. (Wangda Tan via vinodkv) + YARN-2504. Enhanced RM Admin CLI to support management of node-labels. + (Wangda Tan via vinodkv) + IMPROVEMENTS YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java index 4b777eaac5..36d851ce1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java @@ -30,6 +30,12 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -42,6 +48,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; @@ -110,4 +120,34 @@ public RefreshServiceAclsResponse refreshServiceAcls( public UpdateNodeResourceResponse updateNodeResource( UpdateNodeResourceRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request) + throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException; + + @Public + @Evolving + @Idempotent + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto index 47a6cf75c8..99dd4fd385 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto @@ -39,4 +39,9 @@ service ResourceManagerAdministrationProtocolService { rpc refreshServiceAcls(RefreshServiceAclsRequestProto) returns (RefreshServiceAclsResponseProto); rpc getGroupsForUser(GetGroupsForUserRequestProto) returns (GetGroupsForUserResponseProto); rpc updateNodeResource (UpdateNodeResourceRequestProto) returns (UpdateNodeResourceResponseProto); + rpc addToClusterNodeLabels(AddToClusterNodeLabelsRequestProto) returns (AddToClusterNodeLabelsResponseProto); + rpc removeFromClusterNodeLabels(RemoveFromClusterNodeLabelsRequestProto) returns (RemoveFromClusterNodeLabelsResponseProto); + rpc replaceLabelsOnNodes(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto); + rpc getNodeToLabels(GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto); + rpc getClusterNodeLabels(GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 50e5825da7..5e3f2a849b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -19,11 +19,17 @@ package org.apache.hadoop.yarn.client.cli; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; -import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -33,6 +39,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.RMHAServiceTarget; import org.apache.hadoop.yarn.conf.HAUtil; @@ -41,13 +48,21 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; + +import com.google.common.collect.ImmutableMap; @Private @Unstable @@ -55,6 +70,8 @@ public class RMAdminCLI extends HAAdmin { private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + private boolean directlyAccessNodeLabelStore = false; + static CommonNodeLabelsManager localNodeLabelsManager = null; protected final static Map ADMIN_USAGE = ImmutableMap.builder() @@ -78,7 +95,30 @@ public class RMAdminCLI extends HAAdmin { .put("-help", new UsageInfo("[cmd]", "Displays help for the given command or all commands if none " + "is specified.")) - .build(); + .put("-addToClusterNodeLabels", + new UsageInfo("[label1,label2,label3] (label splitted by \",\")", + "add to cluster node labels ")) + .put("-removeFromClusterNodeLabels", + new UsageInfo("[label1,label2,label3] (label splitted by \",\")", + "remove from cluster node labels")) + .put("-replaceLabelsOnNode", + new UsageInfo("[node1:port,label1,label2 node2:port,label1,label2]", + "replace labels on nodes")) + .put("-getNodeToLabels", new UsageInfo("", + "Get node to label mappings")) + .put("-getClusterNodeLabels", + new UsageInfo("", "Get node labels in the cluster")) + .put("-directlyAccessNodeLabelStore", + new UsageInfo("", "Directly access node label store, " + + "with this option, all node label related operations" + + " will not connect RM. Instead, they will" + + " access/modify stored node labels directly." + + " By default, it is false (access via RM)." + + " AND PLEASE NOTE: if you configured" + + " yarn.node-labels.fs-store.uri to a local directory" + + " (instead of NFS or HDFS), this option will only work" + + " when the command run on the machine where RM is running.")) + .build(); public RMAdminCLI() { super(); @@ -201,11 +241,13 @@ private static void printUsage(String cmd, boolean isHAEnabled) { ToolRunner.printGenericCommandUsage(System.err); } - - protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException { + + protected ResourceManagerAdministrationProtocol createAdminProtocol() + throws IOException { // Get the current configuration final YarnConfiguration conf = new YarnConfiguration(getConf()); - return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class); + return ClientRMProxy.createRMProxy(conf, + ResourceManagerAdministrationProtocol.class); } private int refreshQueues() throws IOException, YarnException { @@ -285,8 +327,187 @@ private int getGroups(String[] usernames) throws IOException { return 0; } + // Make it protected to make unit test can change it. + protected static synchronized CommonNodeLabelsManager + getNodeLabelManagerInstance(Configuration conf) { + if (localNodeLabelsManager == null) { + localNodeLabelsManager = new CommonNodeLabelsManager(); + localNodeLabelsManager.init(conf); + localNodeLabelsManager.start(); + } + return localNodeLabelsManager; + } + + private int addToClusterNodeLabels(String args) throws IOException, + YarnException { + Set labels = new HashSet(); + for (String p : args.split(",")) { + labels.add(p); + } + + return addToClusterNodeLabels(labels); + } + + private int addToClusterNodeLabels(Set labels) throws IOException, + YarnException { + if (directlyAccessNodeLabelStore) { + getNodeLabelManagerInstance(getConf()).addToCluserNodeLabels(labels); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + AddToClusterNodeLabelsRequest request = + AddToClusterNodeLabelsRequest.newInstance(labels); + adminProtocol.addToClusterNodeLabels(request); + } + return 0; + } + + private int removeFromClusterNodeLabels(String args) throws IOException, + YarnException { + Set labels = new HashSet(); + for (String p : args.split(",")) { + labels.add(p); + } + + if (directlyAccessNodeLabelStore) { + getNodeLabelManagerInstance(getConf()).removeFromClusterNodeLabels(labels); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + RemoveFromClusterNodeLabelsRequest request = + RemoveFromClusterNodeLabelsRequest.newInstance(labels); + adminProtocol.removeFromClusterNodeLabels(request); + } + + return 0; + } + + private int getNodeToLabels() throws IOException, YarnException { + Map> nodeToLabels = null; + + if (directlyAccessNodeLabelStore) { + nodeToLabels = getNodeLabelManagerInstance(getConf()).getNodeLabels(); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + + nodeToLabels = + adminProtocol.getNodeToLabels(GetNodesToLabelsRequest.newInstance()) + .getNodeToLabels(); + } + for (NodeId host : sortNodeIdSet(nodeToLabels.keySet())) { + System.out.println(String.format("Host=%s, Node-labels=[%s]", + (host.getPort() == 0 ? host.getHost() : host.toString()), + StringUtils.join(sortStrSet(nodeToLabels.get(host)), ","))); + } + return 0; + } + + private int getClusterNodeLabels() throws IOException, YarnException { + Set labels = null; + if (directlyAccessNodeLabelStore) { + labels = getNodeLabelManagerInstance(getConf()).getClusterNodeLabels(); + } else { + ResourceManagerAdministrationProtocol adminProto = createAdminProtocol(); + labels = + adminProto.getClusterNodeLabels( + GetClusterNodeLabelsRequest.newInstance()).getNodeLabels(); + } + + System.out.println(String.format("Node-labels=%s", + StringUtils.join(sortStrSet(labels).iterator(), ","))); + return 0; + } + + private List sortNodeIdSet(Set nodes) { + List list = new ArrayList(); + list.addAll(nodes); + Collections.sort(list); + return list; + } + + private List sortStrSet(Set labels) { + List list = new ArrayList(); + list.addAll(labels); + Collections.sort(list); + return list; + } + + private Map> buildNodeLabelsFromStr(String args) + throws IOException { + Map> map = new HashMap>(); + + for (String nodeToLabels : args.split("[ \n]")) { + nodeToLabels = nodeToLabels.trim(); + if (nodeToLabels.isEmpty() || nodeToLabels.startsWith("#")) { + continue; + } + + String[] splits = nodeToLabels.split(","); + String nodeIdStr = splits[0]; + + if (nodeIdStr.trim().isEmpty()) { + throw new IOException("node name cannot be empty"); + } + + String nodeName; + int port; + if (nodeIdStr.contains(":")) { + nodeName = nodeIdStr.substring(0, nodeIdStr.indexOf(":")); + port = Integer.valueOf(nodeIdStr.substring(nodeIdStr.indexOf(":"))); + } else { + nodeName = nodeIdStr; + port = 0; + } + + NodeId nodeId = NodeId.newInstance(nodeName, port); + + map.put(nodeId, new HashSet()); + + for (int i = 1; i < splits.length; i++) { + if (!splits[i].trim().isEmpty()) { + map.get(nodeId).add(splits[i].trim().toLowerCase()); + } + } + } + + return map; + } + + private int replaceLabelsOnNodes(String args) throws IOException, + YarnException { + Map> map = buildNodeLabelsFromStr(args); + return replaceLabelsOnNodes(map); + } + + private int replaceLabelsOnNodes(Map> map) + throws IOException, YarnException { + if (directlyAccessNodeLabelStore) { + getNodeLabelManagerInstance(getConf()).replaceLabelsOnNode(map); + } else { + ResourceManagerAdministrationProtocol adminProtocol = + createAdminProtocol(); + ReplaceLabelsOnNodeRequest request = + ReplaceLabelsOnNodeRequest.newInstance(map); + adminProtocol.replaceLabelsOnNode(request); + } + return 0; + } + @Override public int run(String[] args) throws Exception { + // -directlyAccessNodeLabelStore is a additional option for node label + // access, so just search if we have specified this option, and remove it + List argsList = new ArrayList(); + for (int i = 0; i < args.length; i++) { + if (args[i].equals("-directlyAccessNodeLabelStore")) { + directlyAccessNodeLabelStore = true; + } else { + argsList.add(args[i]); + } + } + args = argsList.toArray(new String[0]); + YarnConfiguration yarnConf = getConf() == null ? new YarnConfiguration() : new YarnConfiguration( getConf()); @@ -351,6 +572,31 @@ public int run(String[] args) throws Exception { } else if ("-getGroups".equals(cmd)) { String[] usernames = Arrays.copyOfRange(args, i, args.length); exitCode = getGroups(usernames); + } else if ("-addToClusterNodeLabels".equals(cmd)) { + if (i >= args.length) { + System.err.println("No cluster node-labels are specified"); + exitCode = -1; + } else { + exitCode = addToClusterNodeLabels(args[i]); + } + } else if ("-removeFromClusterNodeLabels".equals(cmd)) { + if (i >= args.length) { + System.err.println("No cluster node-labels are specified"); + exitCode = -1; + } else { + exitCode = removeFromClusterNodeLabels(args[i]); + } + } else if ("-replaceLabelsOnNode".equals(cmd)) { + if (i >= args.length) { + System.err.println("No cluster node-labels are specified"); + exitCode = -1; + } else { + exitCode = replaceLabelsOnNodes(args[i]); + } + } else if ("-getNodeToLabels".equals(cmd)) { + exitCode = getNodeToLabels(); + } else if ("-getClusterNodeLabels".equals(cmd)) { + exitCode = getClusterNodeLabels(); } else { exitCode = -1; System.err.println(cmd.substring(1) + ": Unknown command"); @@ -380,6 +626,9 @@ public int run(String[] args) throws Exception { System.err.println(cmd.substring(1) + ": " + e.getLocalizedMessage()); } + if (null != localNodeLabelsManager) { + localNodeLabelsManager.stop(); + } return exitCode; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java similarity index 71% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java index 419b9ae486..b4d29b6820 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java @@ -16,18 +16,17 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.client; +package org.apache.hadoop.yarn.client.cli; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.never; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -37,9 +36,16 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.HAServiceTarget; +import org.apache.hadoop.service.Service.STATE; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.client.cli.RMAdminCLI; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.nodelabels.DummyCommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest; @@ -49,6 +55,10 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.collect.ImmutableSet; public class TestRMAdminCLI { @@ -56,10 +66,25 @@ public class TestRMAdminCLI { private HAServiceProtocol haadmin; private RMAdminCLI rmAdminCLI; private RMAdminCLI rmAdminCLIWithHAEnabled; + private CommonNodeLabelsManager dummyNodeLabelsManager; + private boolean remoteAdminServiceAccessed = false; + @SuppressWarnings("static-access") @Before - public void configure() throws IOException { + public void configure() throws IOException, YarnException { + remoteAdminServiceAccessed = false; + dummyNodeLabelsManager = new DummyCommonNodeLabelsManager(); admin = mock(ResourceManagerAdministrationProtocol.class); + when(admin.addToClusterNodeLabels(any(AddToClusterNodeLabelsRequest.class))) + .thenAnswer(new Answer() { + + @Override + public AddToClusterNodeLabelsResponse answer( + InvocationOnMock invocation) throws Throwable { + remoteAdminServiceAccessed = true; + return AddToClusterNodeLabelsResponse.newInstance(); + } + }); haadmin = mock(HAServiceProtocol.class); when(haadmin.getServiceStatus()).thenReturn(new HAServiceStatus( @@ -69,7 +94,6 @@ public void configure() throws IOException { when(haServiceTarget.getProxy(any(Configuration.class), anyInt())) .thenReturn(haadmin); rmAdminCLI = new RMAdminCLI(new Configuration()) { - @Override protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException { @@ -81,6 +105,7 @@ protected HAServiceTarget resolveTarget(String rmId) { return haServiceTarget; } }; + rmAdminCLI.localNodeLabelsManager = dummyNodeLabelsManager; YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); @@ -360,6 +385,127 @@ public void testException() throws Exception { System.setErr(oldErrPrintStream); } } + + @Test + public void testAccessLocalNodeLabelManager() throws Exception { + assertFalse(dummyNodeLabelsManager.getServiceState() == STATE.STOPPED); + + String[] args = + { "-addToClusterNodeLabels", "x,y", "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll( + ImmutableSet.of("x", "y"))); + + // reset localNodeLabelsManager + dummyNodeLabelsManager.removeFromClusterNodeLabels(ImmutableSet.of("x", "y")); + + // change the sequence of "-directlyAccessNodeLabelStore" and labels, + // should not matter + args = + new String[] { "-addToClusterNodeLabels", + "-directlyAccessNodeLabelStore", "x,y" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll( + ImmutableSet.of("x", "y"))); + + // local node labels manager will be close after running + assertTrue(dummyNodeLabelsManager.getServiceState() == STATE.STOPPED); + } + + @Test + public void testAccessRemoteNodeLabelManager() throws Exception { + String[] args = + { "-addToClusterNodeLabels", "x,y" }; + assertEquals(0, rmAdminCLI.run(args)); + + // localNodeLabelsManager shouldn't accessed + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().isEmpty()); + + // remote node labels manager accessed + assertTrue(remoteAdminServiceAccessed); + } + + @Test + public void testAddToClusterNodeLabels() throws Exception { + // successfully add labels + String[] args = + { "-addToClusterNodeLabels", "x", "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll( + ImmutableSet.of("x"))); + + // no labels, should fail + args = new String[] { "-addToClusterNodeLabels" }; + assertTrue(0 != rmAdminCLI.run(args)); + + // no labels, should fail + args = + new String[] { "-addToClusterNodeLabels", + "-directlyAccessNodeLabelStore" }; + assertTrue(0 != rmAdminCLI.run(args)); + } + + @Test + public void testRemoveFromClusterNodeLabels() throws Exception { + // Successfully remove labels + dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x")); + String[] args = + { "-removeFromClusterNodeLabels", "x", "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().isEmpty()); + + // no labels, should fail + args = new String[] { "-removeFromClusterNodeLabels" }; + assertTrue(0 != rmAdminCLI.run(args)); + + // no labels, should fail + args = + new String[] { "-removeFromClusterNodeLabels", + "-directlyAccessNodeLabelStore" }; + assertTrue(0 != rmAdminCLI.run(args)); + } + + @Test + public void testReplaceLabelsOnNode() throws Exception { + // Successfully replace labels + dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x", "y")); + String[] args = + { "-replaceLabelsOnNode", "node1,x,y node2,y", + "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey( + NodeId.newInstance("node1", 0))); + assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey( + NodeId.newInstance("node2", 0))); + + // no labels, should fail + args = new String[] { "-replaceLabelsOnNode" }; + assertTrue(0 != rmAdminCLI.run(args)); + + // no labels, should fail + args = + new String[] { "-replaceLabelsOnNode", + "-directlyAccessNodeLabelStore" }; + assertTrue(0 != rmAdminCLI.run(args)); + } + + @Test + public void testGetClusterNodeLabels() throws Exception { + // Successfully get labels + String[] args = + { "-getClusterNodeLabels", + "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + } + + @Test + public void testGetNodeToLabels() throws Exception { + // Successfully get node-to-labels + String[] args = + { "-getNodeToLabels", + "-directlyAccessNodeLabelStore" }; + assertEquals(0, rmAdminCLI.run(args)); + } private void testError(String[] args, String template, ByteArrayOutputStream data, int resultCode) throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index d68503555f..ee1b945c92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -238,7 +238,11 @@ protected void stopDispatcher() { protected void serviceStop() throws Exception { // finalize store stopDispatcher(); - store.close(); + + // only close store when we enabled store persistent + if (null != store) { + store.close(); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java index ccffaed77f..e4828e2020 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java @@ -29,17 +29,28 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -52,8 +63,18 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl; @@ -66,6 +87,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl; @@ -205,5 +230,75 @@ public UpdateNodeResourceResponse updateNodeResource( return null; } } - + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels( + AddToClusterNodeLabelsRequest request) throws YarnException, IOException { + AddToClusterNodeLabelsRequestProto requestProto = + ((AddToClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new AddToClusterNodeLabelsResponsePBImpl( + proxy.addToClusterNodeLabels(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, + IOException { + RemoveFromClusterNodeLabelsRequestProto requestProto = + ((RemoveFromClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new RemoveFromClusterNodeLabelsResponsePBImpl( + proxy.removeFromClusterNodeLabels(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + ReplaceLabelsOnNodeRequestProto requestProto = + ((ReplaceLabelsOnNodeRequestPBImpl) request).getProto(); + try { + return new ReplaceLabelsOnNodeResponsePBImpl(proxy.replaceLabelsOnNodes( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request) + throws YarnException, IOException { + GetNodesToLabelsRequestProto requestProto = + ((GetNodesToLabelsRequestPBImpl) request).getProto(); + try { + return new GetNodesToLabelsResponsePBImpl(proxy.getNodeToLabels( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + GetClusterNodeLabelsRequestProto requestProto = + ((GetClusterNodeLabelsRequestPBImpl) request).getProto(); + try { + return new GetClusterNodeLabelsResponsePBImpl(proxy.getClusterNodeLabels( + null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java index d1f71feb27..7cfecadee9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java @@ -22,8 +22,14 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto; @@ -36,17 +42,32 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl; @@ -59,6 +80,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl; @@ -204,4 +229,86 @@ public UpdateNodeResourceResponseProto updateNodeResource(RpcController controll } } + @Override + public AddToClusterNodeLabelsResponseProto addToClusterNodeLabels( + RpcController controller, AddToClusterNodeLabelsRequestProto proto) + throws ServiceException { + AddToClusterNodeLabelsRequestPBImpl request = + new AddToClusterNodeLabelsRequestPBImpl(proto); + try { + AddToClusterNodeLabelsResponse response = + real.addToClusterNodeLabels(request); + return ((AddToClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RemoveFromClusterNodeLabelsResponseProto removeFromClusterNodeLabels( + RpcController controller, RemoveFromClusterNodeLabelsRequestProto proto) + throws ServiceException { + RemoveFromClusterNodeLabelsRequestPBImpl request = + new RemoveFromClusterNodeLabelsRequestPBImpl(proto); + try { + RemoveFromClusterNodeLabelsResponse response = + real.removeFromClusterNodeLabels(request); + return ((RemoveFromClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ReplaceLabelsOnNodeResponseProto replaceLabelsOnNodes( + RpcController controller, ReplaceLabelsOnNodeRequestProto proto) + throws ServiceException { + ReplaceLabelsOnNodeRequestPBImpl request = + new ReplaceLabelsOnNodeRequestPBImpl(proto); + try { + ReplaceLabelsOnNodeResponse response = real.replaceLabelsOnNode(request); + return ((ReplaceLabelsOnNodeResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetNodesToLabelsResponseProto getNodeToLabels( + RpcController controller, GetNodesToLabelsRequestProto proto) + throws ServiceException { + GetNodesToLabelsRequestPBImpl request = + new GetNodesToLabelsRequestPBImpl(proto); + try { + GetNodesToLabelsResponse response = real.getNodeToLabels(request); + return ((GetNodesToLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetClusterNodeLabelsResponseProto getClusterNodeLabels( + RpcController controller, GetClusterNodeLabelsRequestProto proto) + throws ServiceException { + GetClusterNodeLabelsRequestPBImpl request = + new GetClusterNodeLabelsRequestPBImpl(proto); + try { + GetClusterNodeLabelsResponse response = + real.getClusterNodeLabels(request); + return ((GetClusterNodeLabelsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java index fcdf96946c..65ea79f9c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java @@ -78,4 +78,9 @@ protected void startDispatcher() { protected void stopDispatcher() { // do nothing } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 2b7797fa2c..ecdcb20925 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -57,6 +57,12 @@ import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest; @@ -69,8 +75,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; @@ -618,4 +630,104 @@ public AccessControlList getAccessControlList() { public Server getServer() { return this.server; } + + @Override + public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request) + throws YarnException, IOException { + String argName = "addToClusterNodeLabels"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not add labels."); + throwStandbyException(); + } + + AddToClusterNodeLabelsResponse response = + recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class); + try { + rmContext.getNodeLabelManager().addToCluserNodeLabels(request.getNodeLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception add labels", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", "Exception add label"); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( + RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException { + String argName = "removeFromClusterNodeLabels"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not remove labels."); + throwStandbyException(); + } + + RemoveFromClusterNodeLabelsResponse response = + recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class); + try { + rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception remove labels", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", "Exception remove label"); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( + ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + String argName = "replaceLabelsOnNode"; + UserGroupInformation user = checkAcls(argName); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not set node to labels."); + throwStandbyException(); + } + + ReplaceLabelsOnNodeResponse response = + recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class); + try { + rmContext.getNodeLabelManager().replaceLabelsOnNode( + request.getNodeToLabels()); + RMAuditLogger + .logSuccess(user.getShortUserName(), argName, "AdminService"); + return response; + } catch (IOException ioe) { + LOG.info("Exception set node to labels. ", ioe); + RMAuditLogger.logFailure(user.getShortUserName(), argName, + adminAcl.toString(), "AdminService", + "Exception set node to labels."); + throw RPCUtil.getRemoteException(ioe); + } + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request) + throws YarnException, IOException { + return GetNodesToLabelsResponsePBImpl.newInstance(rmContext + .getNodeLabelManager().getNodeLabels()); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest request) + throws YarnException, IOException { + return GetClusterNodeLabelsResponsePBImpl.newInstance(rmContext.getNodeLabelManager() + .getClusterNodeLabels()); + } }