YARN-11377. [Federation] Support addToClusterNodeLabels、removeFromClusterNodeLabels、replaceLabelsOnNode API's for Federation. (#5525)

This commit is contained in:
slfan1989 2023-04-12 00:47:58 +08:00 committed by GitHub
parent 74ddf69f80
commit bffa49a64f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 368 additions and 9 deletions

View File

@ -21,6 +21,7 @@
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.util.Records;
@ -37,6 +38,17 @@ public static AddToClusterNodeLabelsRequest newInstance(List<NodeLabel> nodeLabe
return request;
}
@Public
@Unstable
public static AddToClusterNodeLabelsRequest newInstance(String subClusterId,
List<NodeLabel> nodeLabels) {
AddToClusterNodeLabelsRequest request = Records
.newRecord(AddToClusterNodeLabelsRequest.class);
request.setNodeLabels(nodeLabels);
request.setSubClusterId(subClusterId);
return request;
}
@Public
@Unstable
public abstract void setNodeLabels(List<NodeLabel> nodeLabels);
@ -44,4 +56,22 @@ public static AddToClusterNodeLabelsRequest newInstance(List<NodeLabel> nodeLabe
@Public
@Unstable
public abstract List<NodeLabel> getNodeLabels();
/**
* Get the subClusterId.
*
* @return subClusterId.
*/
@Public
@InterfaceStability.Evolving
public abstract String getSubClusterId();
/**
* Set the subClusterId.
*
* @param subClusterId subCluster Id.
*/
@Public
@InterfaceStability.Evolving
public abstract void setSubClusterId(String subClusterId);
}

View File

@ -21,6 +21,7 @@
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.util.Records;
@ -35,6 +36,15 @@ public static RemoveFromClusterNodeLabelsRequest newInstance(
return request;
}
public static RemoveFromClusterNodeLabelsRequest newInstance(String subClusterId,
Set<String> labels) {
RemoveFromClusterNodeLabelsRequest request =
Records.newRecord(RemoveFromClusterNodeLabelsRequest.class);
request.setNodeLabels(labels);
request.setSubClusterId(subClusterId);
return request;
}
@Public
@Evolving
public abstract void setNodeLabels(Set<String> labels);
@ -42,4 +52,22 @@ public static RemoveFromClusterNodeLabelsRequest newInstance(
@Public
@Evolving
public abstract Set<String> getNodeLabels();
/**
* Get the subClusterId.
*
* @return subClusterId.
*/
@Public
@InterfaceStability.Evolving
public abstract String getSubClusterId();
/**
* Set the subClusterId.
*
* @param subClusterId subCluster Id.
*/
@Public
@InterfaceStability.Evolving
public abstract void setSubClusterId(String subClusterId);
}

View File

@ -22,6 +22,7 @@
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
@ -37,6 +38,15 @@ public static ReplaceLabelsOnNodeRequest newInstance(
return request;
}
public static ReplaceLabelsOnNodeRequest newInstance(Map<NodeId, Set<String>> map,
String subClusterId) {
ReplaceLabelsOnNodeRequest request =
Records.newRecord(ReplaceLabelsOnNodeRequest.class);
request.setNodeToLabels(map);
request.setSubClusterId(subClusterId);
return request;
}
@Public
@Evolving
public abstract void setNodeToLabels(Map<NodeId, Set<String>> map);
@ -52,4 +62,22 @@ public static ReplaceLabelsOnNodeRequest newInstance(
@Public
@Evolving
public abstract boolean getFailOnUnknownNodes();
/**
* Get the subClusterId.
*
* @return subClusterId.
*/
@Public
@InterfaceStability.Evolving
public abstract String getSubClusterId();
/**
* Set the subClusterId.
*
* @param subClusterId subCluster Id.
*/
@Public
@InterfaceStability.Evolving
public abstract void setSubClusterId(String subClusterId);
}

View File

@ -95,6 +95,7 @@ message RefreshNodesResourcesResponseProto {
message AddToClusterNodeLabelsRequestProto {
repeated string deprecatedNodeLabels = 1;
repeated NodeLabelProto nodeLabels = 2;
optional string sub_cluster_id = 3;
}
message AddToClusterNodeLabelsResponseProto {
@ -102,6 +103,7 @@ message AddToClusterNodeLabelsResponseProto {
message RemoveFromClusterNodeLabelsRequestProto {
repeated string nodeLabels = 1;
optional string sub_cluster_id = 2;
}
message RemoveFromClusterNodeLabelsResponseProto {
@ -110,6 +112,7 @@ message RemoveFromClusterNodeLabelsResponseProto {
message ReplaceLabelsOnNodeRequestProto {
repeated NodeIdToLabelsProto nodeToLabels = 1;
optional bool failOnUnknownNodes = 2;
optional string sub_cluster_id = 3;
}
message ReplaceLabelsOnNodeResponseProto {

View File

@ -152,4 +152,20 @@ public List<NodeLabel> getNodeLabels() {
initLocalNodeLabels();
return this.updatedNodeLabels;
}
@Override
public String getSubClusterId() {
AddToClusterNodeLabelsRequestProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
}
@Override
public void setSubClusterId(String subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
builder.setSubClusterId(subClusterId);
}
}

View File

@ -103,10 +103,25 @@ public Set<String> getNodeLabels() {
return this.labels;
}
@Override
public String getSubClusterId() {
RemoveFromClusterNodeLabelsRequestProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
}
@Override
public void setSubClusterId(String subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
builder.setSubClusterId(subClusterId);
}
@Override
public int hashCode() {
assert false : "hashCode not designed";
return 0;
return getProto().hashCode();
}
@Override

View File

@ -151,6 +151,22 @@ public boolean getFailOnUnknownNodes() {
return p.getFailOnUnknownNodes();
}
@Override
public String getSubClusterId() {
ReplaceLabelsOnNodeRequestProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
}
@Override
public void setSubClusterId(String subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
builder.setSubClusterId(subClusterId);
}
@Override
public void setFailOnUnknownNodes(boolean failOnUnknownNodes) {
maybeInitBuilder();
@ -163,8 +179,7 @@ private NodeIdProto convertToProtoFormat(NodeId t) {
@Override
public int hashCode() {
assert false : "hashCode not designed";
return 0;
return getProto().hashCode();
}
@Override

View File

@ -512,22 +512,113 @@ public RefreshNodesResourcesResponse refreshNodesResources(RefreshNodesResources
@Override
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
AddToClusterNodeLabelsRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
// parameter verification.
if (request == null) {
routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing AddToClusterNodeLabels request.", null);
}
String subClusterId = request.getSubClusterId();
if (StringUtils.isBlank(subClusterId)) {
routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing AddToClusterNodeLabels SubClusterId.", null);
}
try {
long startTime = clock.getTime();
RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
new Class[]{AddToClusterNodeLabelsRequest.class}, new Object[]{request});
Collection<AddToClusterNodeLabelsResponse> addToClusterNodeLabelsResps =
remoteMethod.invokeConcurrent(this, AddToClusterNodeLabelsResponse.class, subClusterId);
if (CollectionUtils.isNotEmpty(addToClusterNodeLabelsResps)) {
long stopTime = clock.getTime();
routerMetrics.succeededAddToClusterNodeLabelsRetrieved(stopTime - startTime);
return AddToClusterNodeLabelsResponse.newInstance();
}
} catch (YarnException e) {
routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException(e,
"Unable to addToClusterNodeLabels due to exception. " + e.getMessage());
}
routerMetrics.incrAddToClusterNodeLabelsFailedRetrieved();
throw new YarnException("Unable to addToClusterNodeLabels.");
}
@Override
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
RemoveFromClusterNodeLabelsRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
// parameter verification.
if (request == null) {
routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing RemoveFromClusterNodeLabels request.", null);
}
String subClusterId = request.getSubClusterId();
if (StringUtils.isBlank(subClusterId)) {
routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing RemoveFromClusterNodeLabels SubClusterId.",
null);
}
try {
long startTime = clock.getTime();
RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
new Class[]{RemoveFromClusterNodeLabelsRequest.class}, new Object[]{request});
Collection<RemoveFromClusterNodeLabelsResponse> refreshNodesResourcesResps =
remoteMethod.invokeConcurrent(this, RemoveFromClusterNodeLabelsResponse.class,
subClusterId);
if (CollectionUtils.isNotEmpty(refreshNodesResourcesResps)) {
long stopTime = clock.getTime();
routerMetrics.succeededRemoveFromClusterNodeLabelsRetrieved(stopTime - startTime);
return RemoveFromClusterNodeLabelsResponse.newInstance();
}
} catch (YarnException e) {
routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
RouterServerUtil.logAndThrowException(e,
"Unable to removeFromClusterNodeLabels due to exception. " + e.getMessage());
}
routerMetrics.incrRemoveFromClusterNodeLabelsFailedRetrieved();
throw new YarnException("Unable to removeFromClusterNodeLabels.");
}
@Override
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(ReplaceLabelsOnNodeRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
// parameter verification.
if (request == null) {
routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing ReplaceLabelsOnNode request.", null);
}
String subClusterId = request.getSubClusterId();
if (StringUtils.isBlank(subClusterId)) {
routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing ReplaceLabelsOnNode SubClusterId.", null);
}
try {
long startTime = clock.getTime();
RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
new Class[]{ReplaceLabelsOnNodeRequest.class}, new Object[]{request});
Collection<ReplaceLabelsOnNodeResponse> replaceLabelsOnNodeResps =
remoteMethod.invokeConcurrent(this, ReplaceLabelsOnNodeResponse.class, subClusterId);
if (CollectionUtils.isNotEmpty(replaceLabelsOnNodeResps)) {
long stopTime = clock.getTime();
routerMetrics.succeededRemoveFromClusterNodeLabelsRetrieved(stopTime - startTime);
return ReplaceLabelsOnNodeResponse.newInstance();
}
} catch (YarnException e) {
routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
RouterServerUtil.logAndThrowException(e,
"Unable to replaceLabelsOnNode due to exception. " + e.getMessage());
}
routerMetrics.incrReplaceLabelsOnNodeFailedRetrieved();
throw new YarnException("Unable to replaceLabelsOnNode.");
}
@Override

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@ -42,6 +43,12 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@ -55,6 +62,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import static org.junit.Assert.assertNotNull;
@ -388,4 +397,128 @@ public void testRefreshNodesResourcesNormalRequest() throws Exception {
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshNodesResources(request1));
}
@Test
public void testAddToClusterNodeLabelsEmptyRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing AddToClusterNodeLabels request.",
() -> interceptor.addToClusterNodeLabels(null));
// null request2.
AddToClusterNodeLabelsRequest request = AddToClusterNodeLabelsRequest.newInstance(null, null);
LambdaTestUtils.intercept(YarnException.class, "Missing AddToClusterNodeLabels SubClusterId.",
() -> interceptor.addToClusterNodeLabels(request));
}
@Test
public void testAddToClusterNodeLabelsNormalRequest() throws Exception {
// case1, We add NodeLabel to subCluster SC-1
NodeLabel nodeLabelA = NodeLabel.newInstance("a");
NodeLabel nodeLabelB = NodeLabel.newInstance("b");
List<NodeLabel> labels = new ArrayList<>();
labels.add(nodeLabelA);
labels.add(nodeLabelB);
AddToClusterNodeLabelsRequest request =
AddToClusterNodeLabelsRequest.newInstance("SC-1", labels);
AddToClusterNodeLabelsResponse response = interceptor.addToClusterNodeLabels(request);
assertNotNull(response);
// case2, test the non-exist subCluster.
AddToClusterNodeLabelsRequest request1 =
AddToClusterNodeLabelsRequest.newInstance("SC-NON", labels);
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.addToClusterNodeLabels(request1));
}
@Test
public void testRemoveFromClusterNodeLabelsEmptyRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing RemoveFromClusterNodeLabels request.",
() -> interceptor.removeFromClusterNodeLabels(null));
// null request2.
RemoveFromClusterNodeLabelsRequest request =
RemoveFromClusterNodeLabelsRequest.newInstance(null, null);
LambdaTestUtils.intercept(YarnException.class,
"Missing RemoveFromClusterNodeLabels SubClusterId.",
() -> interceptor.removeFromClusterNodeLabels(request));
}
@Test
public void testRemoveFromClusterNodeLabelsNormalRequest() throws Exception {
// case1, We add nodelabel a for SC-1, and then remove nodelabel a
// Step1. Add NodeLabel for subCluster SC-1
NodeLabel nodeLabelA = NodeLabel.newInstance("a");
NodeLabel nodeLabelB = NodeLabel.newInstance("b");
List<NodeLabel> nodeLabels = new ArrayList<>();
nodeLabels.add(nodeLabelA);
nodeLabels.add(nodeLabelB);
AddToClusterNodeLabelsRequest request =
AddToClusterNodeLabelsRequest.newInstance("SC-1", nodeLabels);
interceptor.addToClusterNodeLabels(request);
// Step2. We delete the label a of subCluster SC-1
Set<String> labels = new HashSet<>();
labels.add("a");
RemoveFromClusterNodeLabelsRequest request1 =
RemoveFromClusterNodeLabelsRequest.newInstance("SC-1", labels);
RemoveFromClusterNodeLabelsResponse response =
interceptor.removeFromClusterNodeLabels(request1);
assertNotNull(response);
// case2, test the non-exist subCluster.
RemoveFromClusterNodeLabelsRequest request2 =
RemoveFromClusterNodeLabelsRequest.newInstance("SC-NON", labels);
LambdaTestUtils.intercept(YarnException.class,
"subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.removeFromClusterNodeLabels(request2));
}
@Test
public void testReplaceLabelsOnNodeEmptyRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing ReplaceLabelsOnNode request.",
() -> interceptor.replaceLabelsOnNode(null));
// null request2.
Map<NodeId, Set<String>> labelMap = new HashMap<>();
ReplaceLabelsOnNodeRequest request = ReplaceLabelsOnNodeRequest.newInstance(labelMap, null);
LambdaTestUtils.intercept(YarnException.class, "Missing ReplaceLabelsOnNode SubClusterId.",
() -> interceptor.replaceLabelsOnNode(request));
}
@Test
public void tesReplaceLabelsOnNodeEmptyNormalRequest() throws Exception {
// case1, We add nodelabel for SC-1, and then replace the label for the specific node.
NodeLabel nodeLabelA = NodeLabel.newInstance("a");
NodeLabel nodeLabelB = NodeLabel.newInstance("b");
List<NodeLabel> nodeLabels = new ArrayList<>();
nodeLabels.add(nodeLabelA);
nodeLabels.add(nodeLabelB);
AddToClusterNodeLabelsRequest request =
AddToClusterNodeLabelsRequest.newInstance("SC-1", nodeLabels);
interceptor.addToClusterNodeLabels(request);
Map<NodeId, Set<String>> pMap = new HashMap<>();
NodeId nodeId = NodeId.newInstance("127.0.0.1", 0);
Set<String> labels = new HashSet<>();
labels.add("a");
pMap.put(nodeId, labels);
ReplaceLabelsOnNodeRequest request1 = ReplaceLabelsOnNodeRequest.newInstance(pMap, "SC-1");
ReplaceLabelsOnNodeResponse response = interceptor.replaceLabelsOnNode(request1);
assertNotNull(response);
// case2, test the non-exist subCluster.
ReplaceLabelsOnNodeRequest request2 =
ReplaceLabelsOnNodeRequest.newInstance(pMap, "SC-NON");
LambdaTestUtils.intercept(YarnException.class,
"subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.replaceLabelsOnNode(request2));
}
}