YARN-2504. Enhanced RM Admin CLI to support management of node-labels. Contribyted by Wangda Tan.
This commit is contained in:
parent
39063cd36f
commit
8256766498
@ -172,9 +172,12 @@ Release 2.6.0 - UNRELEASED
|
||||
YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating
|
||||
resources based on node-labels. (Wangda Tan via vinodkv)
|
||||
|
||||
YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources
|
||||
YARN-2500. Enhaced ResourceManager to support schedulers allocating resources
|
||||
based on node-labels. (Wangda Tan via vinodkv)
|
||||
|
||||
YARN-2504. Enhanced RM Admin CLI to support management of node-labels.
|
||||
(Wangda Tan via vinodkv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
|
||||
|
@ -30,6 +30,12 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
@ -42,6 +48,10 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
|
||||
@ -110,4 +120,34 @@ public RefreshServiceAclsResponse refreshServiceAcls(
|
||||
public UpdateNodeResourceResponse updateNodeResource(
|
||||
UpdateNodeResourceRequest request)
|
||||
throws YarnException, IOException;
|
||||
|
||||
@Public
|
||||
@Evolving
|
||||
@Idempotent
|
||||
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request)
|
||||
throws YarnException, IOException;
|
||||
|
||||
@Public
|
||||
@Evolving
|
||||
@Idempotent
|
||||
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||
RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException;
|
||||
|
||||
@Public
|
||||
@Evolving
|
||||
@Idempotent
|
||||
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException;
|
||||
|
||||
@Public
|
||||
@Evolving
|
||||
@Idempotent
|
||||
public GetNodesToLabelsResponse getNodeToLabels(
|
||||
GetNodesToLabelsRequest request) throws YarnException, IOException;
|
||||
|
||||
@Public
|
||||
@Evolving
|
||||
@Idempotent
|
||||
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
||||
GetClusterNodeLabelsRequest request) throws YarnException, IOException;
|
||||
}
|
||||
|
@ -39,4 +39,9 @@ service ResourceManagerAdministrationProtocolService {
|
||||
rpc refreshServiceAcls(RefreshServiceAclsRequestProto) returns (RefreshServiceAclsResponseProto);
|
||||
rpc getGroupsForUser(GetGroupsForUserRequestProto) returns (GetGroupsForUserResponseProto);
|
||||
rpc updateNodeResource (UpdateNodeResourceRequestProto) returns (UpdateNodeResourceResponseProto);
|
||||
rpc addToClusterNodeLabels(AddToClusterNodeLabelsRequestProto) returns (AddToClusterNodeLabelsResponseProto);
|
||||
rpc removeFromClusterNodeLabels(RemoveFromClusterNodeLabelsRequestProto) returns (RemoveFromClusterNodeLabelsResponseProto);
|
||||
rpc replaceLabelsOnNodes(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto);
|
||||
rpc getNodeToLabels(GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto);
|
||||
rpc getClusterNodeLabels(GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
|
||||
}
|
||||
|
@ -19,11 +19,17 @@
|
||||
package org.apache.hadoop.yarn.client.cli;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -33,6 +39,7 @@
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.client.RMHAServiceTarget;
|
||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||
@ -41,13 +48,21 @@
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
@ -55,6 +70,8 @@ public class RMAdminCLI extends HAAdmin {
|
||||
|
||||
private final RecordFactory recordFactory =
|
||||
RecordFactoryProvider.getRecordFactory(null);
|
||||
private boolean directlyAccessNodeLabelStore = false;
|
||||
static CommonNodeLabelsManager localNodeLabelsManager = null;
|
||||
|
||||
protected final static Map<String, UsageInfo> ADMIN_USAGE =
|
||||
ImmutableMap.<String, UsageInfo>builder()
|
||||
@ -78,7 +95,30 @@ public class RMAdminCLI extends HAAdmin {
|
||||
.put("-help", new UsageInfo("[cmd]",
|
||||
"Displays help for the given command or all commands if none " +
|
||||
"is specified."))
|
||||
.build();
|
||||
.put("-addToClusterNodeLabels",
|
||||
new UsageInfo("[label1,label2,label3] (label splitted by \",\")",
|
||||
"add to cluster node labels "))
|
||||
.put("-removeFromClusterNodeLabels",
|
||||
new UsageInfo("[label1,label2,label3] (label splitted by \",\")",
|
||||
"remove from cluster node labels"))
|
||||
.put("-replaceLabelsOnNode",
|
||||
new UsageInfo("[node1:port,label1,label2 node2:port,label1,label2]",
|
||||
"replace labels on nodes"))
|
||||
.put("-getNodeToLabels", new UsageInfo("",
|
||||
"Get node to label mappings"))
|
||||
.put("-getClusterNodeLabels",
|
||||
new UsageInfo("", "Get node labels in the cluster"))
|
||||
.put("-directlyAccessNodeLabelStore",
|
||||
new UsageInfo("", "Directly access node label store, "
|
||||
+ "with this option, all node label related operations"
|
||||
+ " will not connect RM. Instead, they will"
|
||||
+ " access/modify stored node labels directly."
|
||||
+ " By default, it is false (access via RM)."
|
||||
+ " AND PLEASE NOTE: if you configured"
|
||||
+ " yarn.node-labels.fs-store.uri to a local directory"
|
||||
+ " (instead of NFS or HDFS), this option will only work"
|
||||
+ " when the command run on the machine where RM is running."))
|
||||
.build();
|
||||
|
||||
public RMAdminCLI() {
|
||||
super();
|
||||
@ -202,10 +242,12 @@ private static void printUsage(String cmd, boolean isHAEnabled) {
|
||||
|
||||
}
|
||||
|
||||
protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException {
|
||||
protected ResourceManagerAdministrationProtocol createAdminProtocol()
|
||||
throws IOException {
|
||||
// Get the current configuration
|
||||
final YarnConfiguration conf = new YarnConfiguration(getConf());
|
||||
return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class);
|
||||
return ClientRMProxy.createRMProxy(conf,
|
||||
ResourceManagerAdministrationProtocol.class);
|
||||
}
|
||||
|
||||
private int refreshQueues() throws IOException, YarnException {
|
||||
@ -285,8 +327,187 @@ private int getGroups(String[] usernames) throws IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Make it protected to make unit test can change it.
|
||||
protected static synchronized CommonNodeLabelsManager
|
||||
getNodeLabelManagerInstance(Configuration conf) {
|
||||
if (localNodeLabelsManager == null) {
|
||||
localNodeLabelsManager = new CommonNodeLabelsManager();
|
||||
localNodeLabelsManager.init(conf);
|
||||
localNodeLabelsManager.start();
|
||||
}
|
||||
return localNodeLabelsManager;
|
||||
}
|
||||
|
||||
private int addToClusterNodeLabels(String args) throws IOException,
|
||||
YarnException {
|
||||
Set<String> labels = new HashSet<String>();
|
||||
for (String p : args.split(",")) {
|
||||
labels.add(p);
|
||||
}
|
||||
|
||||
return addToClusterNodeLabels(labels);
|
||||
}
|
||||
|
||||
private int addToClusterNodeLabels(Set<String> labels) throws IOException,
|
||||
YarnException {
|
||||
if (directlyAccessNodeLabelStore) {
|
||||
getNodeLabelManagerInstance(getConf()).addToCluserNodeLabels(labels);
|
||||
} else {
|
||||
ResourceManagerAdministrationProtocol adminProtocol =
|
||||
createAdminProtocol();
|
||||
AddToClusterNodeLabelsRequest request =
|
||||
AddToClusterNodeLabelsRequest.newInstance(labels);
|
||||
adminProtocol.addToClusterNodeLabels(request);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int removeFromClusterNodeLabels(String args) throws IOException,
|
||||
YarnException {
|
||||
Set<String> labels = new HashSet<String>();
|
||||
for (String p : args.split(",")) {
|
||||
labels.add(p);
|
||||
}
|
||||
|
||||
if (directlyAccessNodeLabelStore) {
|
||||
getNodeLabelManagerInstance(getConf()).removeFromClusterNodeLabels(labels);
|
||||
} else {
|
||||
ResourceManagerAdministrationProtocol adminProtocol =
|
||||
createAdminProtocol();
|
||||
RemoveFromClusterNodeLabelsRequest request =
|
||||
RemoveFromClusterNodeLabelsRequest.newInstance(labels);
|
||||
adminProtocol.removeFromClusterNodeLabels(request);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int getNodeToLabels() throws IOException, YarnException {
|
||||
Map<NodeId, Set<String>> nodeToLabels = null;
|
||||
|
||||
if (directlyAccessNodeLabelStore) {
|
||||
nodeToLabels = getNodeLabelManagerInstance(getConf()).getNodeLabels();
|
||||
} else {
|
||||
ResourceManagerAdministrationProtocol adminProtocol =
|
||||
createAdminProtocol();
|
||||
|
||||
nodeToLabels =
|
||||
adminProtocol.getNodeToLabels(GetNodesToLabelsRequest.newInstance())
|
||||
.getNodeToLabels();
|
||||
}
|
||||
for (NodeId host : sortNodeIdSet(nodeToLabels.keySet())) {
|
||||
System.out.println(String.format("Host=%s, Node-labels=[%s]",
|
||||
(host.getPort() == 0 ? host.getHost() : host.toString()),
|
||||
StringUtils.join(sortStrSet(nodeToLabels.get(host)), ",")));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int getClusterNodeLabels() throws IOException, YarnException {
|
||||
Set<String> labels = null;
|
||||
if (directlyAccessNodeLabelStore) {
|
||||
labels = getNodeLabelManagerInstance(getConf()).getClusterNodeLabels();
|
||||
} else {
|
||||
ResourceManagerAdministrationProtocol adminProto = createAdminProtocol();
|
||||
labels =
|
||||
adminProto.getClusterNodeLabels(
|
||||
GetClusterNodeLabelsRequest.newInstance()).getNodeLabels();
|
||||
}
|
||||
|
||||
System.out.println(String.format("Node-labels=%s",
|
||||
StringUtils.join(sortStrSet(labels).iterator(), ",")));
|
||||
return 0;
|
||||
}
|
||||
|
||||
private List<NodeId> sortNodeIdSet(Set<NodeId> nodes) {
|
||||
List<NodeId> list = new ArrayList<NodeId>();
|
||||
list.addAll(nodes);
|
||||
Collections.sort(list);
|
||||
return list;
|
||||
}
|
||||
|
||||
private List<String> sortStrSet(Set<String> labels) {
|
||||
List<String> list = new ArrayList<String>();
|
||||
list.addAll(labels);
|
||||
Collections.sort(list);
|
||||
return list;
|
||||
}
|
||||
|
||||
private Map<NodeId, Set<String>> buildNodeLabelsFromStr(String args)
|
||||
throws IOException {
|
||||
Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>();
|
||||
|
||||
for (String nodeToLabels : args.split("[ \n]")) {
|
||||
nodeToLabels = nodeToLabels.trim();
|
||||
if (nodeToLabels.isEmpty() || nodeToLabels.startsWith("#")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String[] splits = nodeToLabels.split(",");
|
||||
String nodeIdStr = splits[0];
|
||||
|
||||
if (nodeIdStr.trim().isEmpty()) {
|
||||
throw new IOException("node name cannot be empty");
|
||||
}
|
||||
|
||||
String nodeName;
|
||||
int port;
|
||||
if (nodeIdStr.contains(":")) {
|
||||
nodeName = nodeIdStr.substring(0, nodeIdStr.indexOf(":"));
|
||||
port = Integer.valueOf(nodeIdStr.substring(nodeIdStr.indexOf(":")));
|
||||
} else {
|
||||
nodeName = nodeIdStr;
|
||||
port = 0;
|
||||
}
|
||||
|
||||
NodeId nodeId = NodeId.newInstance(nodeName, port);
|
||||
|
||||
map.put(nodeId, new HashSet<String>());
|
||||
|
||||
for (int i = 1; i < splits.length; i++) {
|
||||
if (!splits[i].trim().isEmpty()) {
|
||||
map.get(nodeId).add(splits[i].trim().toLowerCase());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return map;
|
||||
}
|
||||
|
||||
private int replaceLabelsOnNodes(String args) throws IOException,
|
||||
YarnException {
|
||||
Map<NodeId, Set<String>> map = buildNodeLabelsFromStr(args);
|
||||
return replaceLabelsOnNodes(map);
|
||||
}
|
||||
|
||||
private int replaceLabelsOnNodes(Map<NodeId, Set<String>> map)
|
||||
throws IOException, YarnException {
|
||||
if (directlyAccessNodeLabelStore) {
|
||||
getNodeLabelManagerInstance(getConf()).replaceLabelsOnNode(map);
|
||||
} else {
|
||||
ResourceManagerAdministrationProtocol adminProtocol =
|
||||
createAdminProtocol();
|
||||
ReplaceLabelsOnNodeRequest request =
|
||||
ReplaceLabelsOnNodeRequest.newInstance(map);
|
||||
adminProtocol.replaceLabelsOnNode(request);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
// -directlyAccessNodeLabelStore is a additional option for node label
|
||||
// access, so just search if we have specified this option, and remove it
|
||||
List<String> argsList = new ArrayList<String>();
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
if (args[i].equals("-directlyAccessNodeLabelStore")) {
|
||||
directlyAccessNodeLabelStore = true;
|
||||
} else {
|
||||
argsList.add(args[i]);
|
||||
}
|
||||
}
|
||||
args = argsList.toArray(new String[0]);
|
||||
|
||||
YarnConfiguration yarnConf =
|
||||
getConf() == null ? new YarnConfiguration() : new YarnConfiguration(
|
||||
getConf());
|
||||
@ -351,6 +572,31 @@ public int run(String[] args) throws Exception {
|
||||
} else if ("-getGroups".equals(cmd)) {
|
||||
String[] usernames = Arrays.copyOfRange(args, i, args.length);
|
||||
exitCode = getGroups(usernames);
|
||||
} else if ("-addToClusterNodeLabels".equals(cmd)) {
|
||||
if (i >= args.length) {
|
||||
System.err.println("No cluster node-labels are specified");
|
||||
exitCode = -1;
|
||||
} else {
|
||||
exitCode = addToClusterNodeLabels(args[i]);
|
||||
}
|
||||
} else if ("-removeFromClusterNodeLabels".equals(cmd)) {
|
||||
if (i >= args.length) {
|
||||
System.err.println("No cluster node-labels are specified");
|
||||
exitCode = -1;
|
||||
} else {
|
||||
exitCode = removeFromClusterNodeLabels(args[i]);
|
||||
}
|
||||
} else if ("-replaceLabelsOnNode".equals(cmd)) {
|
||||
if (i >= args.length) {
|
||||
System.err.println("No cluster node-labels are specified");
|
||||
exitCode = -1;
|
||||
} else {
|
||||
exitCode = replaceLabelsOnNodes(args[i]);
|
||||
}
|
||||
} else if ("-getNodeToLabels".equals(cmd)) {
|
||||
exitCode = getNodeToLabels();
|
||||
} else if ("-getClusterNodeLabels".equals(cmd)) {
|
||||
exitCode = getClusterNodeLabels();
|
||||
} else {
|
||||
exitCode = -1;
|
||||
System.err.println(cmd.substring(1) + ": Unknown command");
|
||||
@ -380,6 +626,9 @@ public int run(String[] args) throws Exception {
|
||||
System.err.println(cmd.substring(1) + ": "
|
||||
+ e.getLocalizedMessage());
|
||||
}
|
||||
if (null != localNodeLabelsManager) {
|
||||
localNodeLabelsManager.stop();
|
||||
}
|
||||
return exitCode;
|
||||
}
|
||||
|
||||
|
@ -16,18 +16,17 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client;
|
||||
package org.apache.hadoop.yarn.client.cli;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.never;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
@ -37,9 +36,16 @@
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.ha.HAServiceTarget;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.nodelabels.DummyCommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
|
||||
@ -49,6 +55,10 @@
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
public class TestRMAdminCLI {
|
||||
|
||||
@ -56,10 +66,25 @@ public class TestRMAdminCLI {
|
||||
private HAServiceProtocol haadmin;
|
||||
private RMAdminCLI rmAdminCLI;
|
||||
private RMAdminCLI rmAdminCLIWithHAEnabled;
|
||||
private CommonNodeLabelsManager dummyNodeLabelsManager;
|
||||
private boolean remoteAdminServiceAccessed = false;
|
||||
|
||||
@SuppressWarnings("static-access")
|
||||
@Before
|
||||
public void configure() throws IOException {
|
||||
public void configure() throws IOException, YarnException {
|
||||
remoteAdminServiceAccessed = false;
|
||||
dummyNodeLabelsManager = new DummyCommonNodeLabelsManager();
|
||||
admin = mock(ResourceManagerAdministrationProtocol.class);
|
||||
when(admin.addToClusterNodeLabels(any(AddToClusterNodeLabelsRequest.class)))
|
||||
.thenAnswer(new Answer<AddToClusterNodeLabelsResponse>() {
|
||||
|
||||
@Override
|
||||
public AddToClusterNodeLabelsResponse answer(
|
||||
InvocationOnMock invocation) throws Throwable {
|
||||
remoteAdminServiceAccessed = true;
|
||||
return AddToClusterNodeLabelsResponse.newInstance();
|
||||
}
|
||||
});
|
||||
|
||||
haadmin = mock(HAServiceProtocol.class);
|
||||
when(haadmin.getServiceStatus()).thenReturn(new HAServiceStatus(
|
||||
@ -69,7 +94,6 @@ public void configure() throws IOException {
|
||||
when(haServiceTarget.getProxy(any(Configuration.class), anyInt()))
|
||||
.thenReturn(haadmin);
|
||||
rmAdminCLI = new RMAdminCLI(new Configuration()) {
|
||||
|
||||
@Override
|
||||
protected ResourceManagerAdministrationProtocol createAdminProtocol()
|
||||
throws IOException {
|
||||
@ -81,6 +105,7 @@ protected HAServiceTarget resolveTarget(String rmId) {
|
||||
return haServiceTarget;
|
||||
}
|
||||
};
|
||||
rmAdminCLI.localNodeLabelsManager = dummyNodeLabelsManager;
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
@ -361,6 +386,127 @@ public void testException() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAccessLocalNodeLabelManager() throws Exception {
|
||||
assertFalse(dummyNodeLabelsManager.getServiceState() == STATE.STOPPED);
|
||||
|
||||
String[] args =
|
||||
{ "-addToClusterNodeLabels", "x,y", "-directlyAccessNodeLabelStore" };
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll(
|
||||
ImmutableSet.of("x", "y")));
|
||||
|
||||
// reset localNodeLabelsManager
|
||||
dummyNodeLabelsManager.removeFromClusterNodeLabels(ImmutableSet.of("x", "y"));
|
||||
|
||||
// change the sequence of "-directlyAccessNodeLabelStore" and labels,
|
||||
// should not matter
|
||||
args =
|
||||
new String[] { "-addToClusterNodeLabels",
|
||||
"-directlyAccessNodeLabelStore", "x,y" };
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll(
|
||||
ImmutableSet.of("x", "y")));
|
||||
|
||||
// local node labels manager will be close after running
|
||||
assertTrue(dummyNodeLabelsManager.getServiceState() == STATE.STOPPED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAccessRemoteNodeLabelManager() throws Exception {
|
||||
String[] args =
|
||||
{ "-addToClusterNodeLabels", "x,y" };
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
|
||||
// localNodeLabelsManager shouldn't accessed
|
||||
assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().isEmpty());
|
||||
|
||||
// remote node labels manager accessed
|
||||
assertTrue(remoteAdminServiceAccessed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddToClusterNodeLabels() throws Exception {
|
||||
// successfully add labels
|
||||
String[] args =
|
||||
{ "-addToClusterNodeLabels", "x", "-directlyAccessNodeLabelStore" };
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll(
|
||||
ImmutableSet.of("x")));
|
||||
|
||||
// no labels, should fail
|
||||
args = new String[] { "-addToClusterNodeLabels" };
|
||||
assertTrue(0 != rmAdminCLI.run(args));
|
||||
|
||||
// no labels, should fail
|
||||
args =
|
||||
new String[] { "-addToClusterNodeLabels",
|
||||
"-directlyAccessNodeLabelStore" };
|
||||
assertTrue(0 != rmAdminCLI.run(args));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveFromClusterNodeLabels() throws Exception {
|
||||
// Successfully remove labels
|
||||
dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x"));
|
||||
String[] args =
|
||||
{ "-removeFromClusterNodeLabels", "x", "-directlyAccessNodeLabelStore" };
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().isEmpty());
|
||||
|
||||
// no labels, should fail
|
||||
args = new String[] { "-removeFromClusterNodeLabels" };
|
||||
assertTrue(0 != rmAdminCLI.run(args));
|
||||
|
||||
// no labels, should fail
|
||||
args =
|
||||
new String[] { "-removeFromClusterNodeLabels",
|
||||
"-directlyAccessNodeLabelStore" };
|
||||
assertTrue(0 != rmAdminCLI.run(args));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplaceLabelsOnNode() throws Exception {
|
||||
// Successfully replace labels
|
||||
dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
||||
String[] args =
|
||||
{ "-replaceLabelsOnNode", "node1,x,y node2,y",
|
||||
"-directlyAccessNodeLabelStore" };
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey(
|
||||
NodeId.newInstance("node1", 0)));
|
||||
assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey(
|
||||
NodeId.newInstance("node2", 0)));
|
||||
|
||||
// no labels, should fail
|
||||
args = new String[] { "-replaceLabelsOnNode" };
|
||||
assertTrue(0 != rmAdminCLI.run(args));
|
||||
|
||||
// no labels, should fail
|
||||
args =
|
||||
new String[] { "-replaceLabelsOnNode",
|
||||
"-directlyAccessNodeLabelStore" };
|
||||
assertTrue(0 != rmAdminCLI.run(args));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetClusterNodeLabels() throws Exception {
|
||||
// Successfully get labels
|
||||
String[] args =
|
||||
{ "-getClusterNodeLabels",
|
||||
"-directlyAccessNodeLabelStore" };
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetNodeToLabels() throws Exception {
|
||||
// Successfully get node-to-labels
|
||||
String[] args =
|
||||
{ "-getNodeToLabels",
|
||||
"-directlyAccessNodeLabelStore" };
|
||||
assertEquals(0, rmAdminCLI.run(args));
|
||||
}
|
||||
|
||||
private void testError(String[] args, String template,
|
||||
ByteArrayOutputStream data, int resultCode) throws Exception {
|
||||
assertEquals(resultCode, rmAdminCLI.run(args));
|
@ -238,7 +238,11 @@ protected void stopDispatcher() {
|
||||
protected void serviceStop() throws Exception {
|
||||
// finalize store
|
||||
stopDispatcher();
|
||||
store.close();
|
||||
|
||||
// only close store when we enabled store persistent
|
||||
if (null != store) {
|
||||
store.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -29,17 +29,28 @@
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
@ -52,8 +63,18 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
|
||||
@ -66,6 +87,10 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
|
||||
|
||||
@ -206,4 +231,74 @@ public UpdateNodeResourceResponse updateNodeResource(
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
|
||||
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||
AddToClusterNodeLabelsRequestProto requestProto =
|
||||
((AddToClusterNodeLabelsRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new AddToClusterNodeLabelsResponsePBImpl(
|
||||
proxy.addToClusterNodeLabels(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||
RemoveFromClusterNodeLabelsRequest request) throws YarnException,
|
||||
IOException {
|
||||
RemoveFromClusterNodeLabelsRequestProto requestProto =
|
||||
((RemoveFromClusterNodeLabelsRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new RemoveFromClusterNodeLabelsResponsePBImpl(
|
||||
proxy.removeFromClusterNodeLabels(null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||
ReplaceLabelsOnNodeRequestProto requestProto =
|
||||
((ReplaceLabelsOnNodeRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new ReplaceLabelsOnNodeResponsePBImpl(proxy.replaceLabelsOnNodes(
|
||||
null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request)
|
||||
throws YarnException, IOException {
|
||||
GetNodesToLabelsRequestProto requestProto =
|
||||
((GetNodesToLabelsRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetNodesToLabelsResponsePBImpl(proxy.getNodeToLabels(
|
||||
null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
||||
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||
GetClusterNodeLabelsRequestProto requestProto =
|
||||
((GetClusterNodeLabelsRequestPBImpl) request).getProto();
|
||||
try {
|
||||
return new GetClusterNodeLabelsResponsePBImpl(proxy.getClusterNodeLabels(
|
||||
null, requestProto));
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -22,8 +22,14 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
|
||||
@ -36,17 +42,32 @@
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
|
||||
@ -59,6 +80,10 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
|
||||
|
||||
@ -204,4 +229,86 @@ public UpdateNodeResourceResponseProto updateNodeResource(RpcController controll
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddToClusterNodeLabelsResponseProto addToClusterNodeLabels(
|
||||
RpcController controller, AddToClusterNodeLabelsRequestProto proto)
|
||||
throws ServiceException {
|
||||
AddToClusterNodeLabelsRequestPBImpl request =
|
||||
new AddToClusterNodeLabelsRequestPBImpl(proto);
|
||||
try {
|
||||
AddToClusterNodeLabelsResponse response =
|
||||
real.addToClusterNodeLabels(request);
|
||||
return ((AddToClusterNodeLabelsResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoveFromClusterNodeLabelsResponseProto removeFromClusterNodeLabels(
|
||||
RpcController controller, RemoveFromClusterNodeLabelsRequestProto proto)
|
||||
throws ServiceException {
|
||||
RemoveFromClusterNodeLabelsRequestPBImpl request =
|
||||
new RemoveFromClusterNodeLabelsRequestPBImpl(proto);
|
||||
try {
|
||||
RemoveFromClusterNodeLabelsResponse response =
|
||||
real.removeFromClusterNodeLabels(request);
|
||||
return ((RemoveFromClusterNodeLabelsResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaceLabelsOnNodeResponseProto replaceLabelsOnNodes(
|
||||
RpcController controller, ReplaceLabelsOnNodeRequestProto proto)
|
||||
throws ServiceException {
|
||||
ReplaceLabelsOnNodeRequestPBImpl request =
|
||||
new ReplaceLabelsOnNodeRequestPBImpl(proto);
|
||||
try {
|
||||
ReplaceLabelsOnNodeResponse response = real.replaceLabelsOnNode(request);
|
||||
return ((ReplaceLabelsOnNodeResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNodesToLabelsResponseProto getNodeToLabels(
|
||||
RpcController controller, GetNodesToLabelsRequestProto proto)
|
||||
throws ServiceException {
|
||||
GetNodesToLabelsRequestPBImpl request =
|
||||
new GetNodesToLabelsRequestPBImpl(proto);
|
||||
try {
|
||||
GetNodesToLabelsResponse response = real.getNodeToLabels(request);
|
||||
return ((GetNodesToLabelsResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterNodeLabelsResponseProto getClusterNodeLabels(
|
||||
RpcController controller, GetClusterNodeLabelsRequestProto proto)
|
||||
throws ServiceException {
|
||||
GetClusterNodeLabelsRequestPBImpl request =
|
||||
new GetClusterNodeLabelsRequestPBImpl(proto);
|
||||
try {
|
||||
GetClusterNodeLabelsResponse response =
|
||||
real.getClusterNodeLabels(request);
|
||||
return ((GetClusterNodeLabelsResponsePBImpl) response).getProto();
|
||||
} catch (YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -78,4 +78,9 @@ protected void startDispatcher() {
|
||||
protected void stopDispatcher() {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
super.serviceStop();
|
||||
}
|
||||
}
|
||||
|
@ -57,6 +57,12 @@
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
|
||||
@ -69,8 +75,14 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||
@ -618,4 +630,104 @@ public AccessControlList getAccessControlList() {
|
||||
public Server getServer() {
|
||||
return this.server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request)
|
||||
throws YarnException, IOException {
|
||||
String argName = "addToClusterNodeLabels";
|
||||
UserGroupInformation user = checkAcls(argName);
|
||||
|
||||
if (!isRMActive()) {
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), argName,
|
||||
adminAcl.toString(), "AdminService",
|
||||
"ResourceManager is not active. Can not add labels.");
|
||||
throwStandbyException();
|
||||
}
|
||||
|
||||
AddToClusterNodeLabelsResponse response =
|
||||
recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class);
|
||||
try {
|
||||
rmContext.getNodeLabelManager().addToCluserNodeLabels(request.getNodeLabels());
|
||||
RMAuditLogger
|
||||
.logSuccess(user.getShortUserName(), argName, "AdminService");
|
||||
return response;
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Exception add labels", ioe);
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), argName,
|
||||
adminAcl.toString(), "AdminService", "Exception add label");
|
||||
throw RPCUtil.getRemoteException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||
RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||
String argName = "removeFromClusterNodeLabels";
|
||||
UserGroupInformation user = checkAcls(argName);
|
||||
|
||||
if (!isRMActive()) {
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), argName,
|
||||
adminAcl.toString(), "AdminService",
|
||||
"ResourceManager is not active. Can not remove labels.");
|
||||
throwStandbyException();
|
||||
}
|
||||
|
||||
RemoveFromClusterNodeLabelsResponse response =
|
||||
recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class);
|
||||
try {
|
||||
rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels());
|
||||
RMAuditLogger
|
||||
.logSuccess(user.getShortUserName(), argName, "AdminService");
|
||||
return response;
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Exception remove labels", ioe);
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), argName,
|
||||
adminAcl.toString(), "AdminService", "Exception remove label");
|
||||
throw RPCUtil.getRemoteException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||
String argName = "replaceLabelsOnNode";
|
||||
UserGroupInformation user = checkAcls(argName);
|
||||
|
||||
if (!isRMActive()) {
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), argName,
|
||||
adminAcl.toString(), "AdminService",
|
||||
"ResourceManager is not active. Can not set node to labels.");
|
||||
throwStandbyException();
|
||||
}
|
||||
|
||||
ReplaceLabelsOnNodeResponse response =
|
||||
recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class);
|
||||
try {
|
||||
rmContext.getNodeLabelManager().replaceLabelsOnNode(
|
||||
request.getNodeToLabels());
|
||||
RMAuditLogger
|
||||
.logSuccess(user.getShortUserName(), argName, "AdminService");
|
||||
return response;
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Exception set node to labels. ", ioe);
|
||||
RMAuditLogger.logFailure(user.getShortUserName(), argName,
|
||||
adminAcl.toString(), "AdminService",
|
||||
"Exception set node to labels.");
|
||||
throw RPCUtil.getRemoteException(ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request)
|
||||
throws YarnException, IOException {
|
||||
return GetNodesToLabelsResponsePBImpl.newInstance(rmContext
|
||||
.getNodeLabelManager().getNodeLabels());
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest request)
|
||||
throws YarnException, IOException {
|
||||
return GetClusterNodeLabelsResponsePBImpl.newInstance(rmContext.getNodeLabelManager()
|
||||
.getClusterNodeLabels());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user