YARN-8103. Add CLI interface to query node attributes. Contributed by Bibin A Chundatt.
This commit is contained in:
parent
76183428b7
commit
eb08543c7a
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.sls.nodemanager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -220,16 +221,9 @@ public Map<String, Long> getAllocationTagsWithCount() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void setNodeAttributes(String prefix,
|
||||
Set<NodeAttribute> nodeAttributes) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Set<NodeAttribute>> getAllNodeAttributes() {
|
||||
return null;
|
||||
public Set<NodeAttribute> getAllNodeAttributes() {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -209,13 +209,7 @@ public Map<String, Long> getAllocationTagsWithCount() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNodeAttributes(String prefix,
|
||||
Set<NodeAttribute> nodeAttributes) {
|
||||
node.setNodeAttributes(prefix, nodeAttributes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Set<NodeAttribute>> getAllNodeAttributes() {
|
||||
public Set<NodeAttribute> getAllNodeAttributes() {
|
||||
return node.getAllNodeAttributes();
|
||||
}
|
||||
|
||||
|
@ -55,7 +55,7 @@ function hadoop_usage
|
||||
hadoop_add_subcommand "timelinereader" client "run the timeline reader server"
|
||||
hadoop_add_subcommand "timelineserver" daemon "run the timeline server"
|
||||
hadoop_add_subcommand "top" client "view cluster information"
|
||||
hadoop_add_subcommand "node-attributes" "map node to attibutes"
|
||||
hadoop_add_subcommand "nodeattributes" client "node attributes cli client"
|
||||
hadoop_add_subcommand "version" client "print the version"
|
||||
hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" true
|
||||
}
|
||||
@ -187,7 +187,7 @@ ${HADOOP_COMMON_HOME}/${HADOOP_COMMON_LIB_JARS_DIR}"
|
||||
hadoop_add_classpath "$HADOOP_YARN_HOME/$YARN_DIR/timelineservice/lib/*"
|
||||
HADOOP_CLASSNAME='org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer'
|
||||
;;
|
||||
node-attributes)
|
||||
nodeattributes)
|
||||
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="false"
|
||||
HADOOP_CLASSNAME='org.apache.hadoop.yarn.client.cli.NodeAttributesCLI'
|
||||
;;
|
||||
|
@ -258,4 +258,17 @@ public NodeUpdateType getNodeUpdateType() {
|
||||
* Set the node update type (null indicates absent node update type).
|
||||
* */
|
||||
public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {}
|
||||
|
||||
/**
|
||||
* Set the node attributes of node.
|
||||
*
|
||||
* @param nodeAttributes set of node attributes.
|
||||
*/
|
||||
public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes);
|
||||
|
||||
/**
|
||||
* Get node attributes of node.
|
||||
* @return the set of node attributes.
|
||||
*/
|
||||
public abstract Set<NodeAttribute> getNodeAttributes();
|
||||
}
|
||||
|
@ -355,6 +355,7 @@ message NodeReportProto {
|
||||
optional ResourceUtilizationProto node_utilization = 12;
|
||||
optional uint32 decommissioning_timeout = 13;
|
||||
optional NodeUpdateTypeProto node_update_type = 14;
|
||||
repeated NodeAttributeProto node_attributes = 15;
|
||||
}
|
||||
|
||||
message NodeIdToLabelsProto {
|
||||
|
@ -36,6 +36,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
@ -52,6 +53,7 @@ public class ClusterCLI extends YarnCLI {
|
||||
public static final String LIST_LABELS_CMD = "list-node-labels";
|
||||
public static final String DIRECTLY_ACCESS_NODE_LABEL_STORE =
|
||||
"directly-access-node-label-store";
|
||||
public static final String LIST_CLUSTER_ATTRIBUTES="list-node-attributes";
|
||||
public static final String CMD = "cluster";
|
||||
private boolean accessLocal = false;
|
||||
static CommonNodeLabelsManager localNodeLabelsManager = null;
|
||||
@ -71,6 +73,8 @@ public int run(String[] args) throws Exception {
|
||||
|
||||
opts.addOption("lnl", LIST_LABELS_CMD, false,
|
||||
"List cluster node-label collection");
|
||||
opts.addOption("lna", LIST_CLUSTER_ATTRIBUTES, false,
|
||||
"List cluster node-attribute collection");
|
||||
opts.addOption("h", HELP_CMD, false, "Displays help for all commands.");
|
||||
opts.addOption("dnl", DIRECTLY_ACCESS_NODE_LABEL_STORE, false,
|
||||
"This is DEPRECATED, will be removed in future releases. Directly access node label store, "
|
||||
@ -102,6 +106,8 @@ public int run(String[] args) throws Exception {
|
||||
|
||||
if (parsedCli.hasOption(LIST_LABELS_CMD)) {
|
||||
printClusterNodeLabels();
|
||||
} else if(parsedCli.hasOption(LIST_CLUSTER_ATTRIBUTES)){
|
||||
printClusterNodeAttributes();
|
||||
} else if (parsedCli.hasOption(HELP_CMD)) {
|
||||
printUsage(opts);
|
||||
return 0;
|
||||
@ -112,6 +118,17 @@ public int run(String[] args) throws Exception {
|
||||
return 0;
|
||||
}
|
||||
|
||||
private void printClusterNodeAttributes() throws IOException, YarnException {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
PrintWriter pw = new PrintWriter(
|
||||
new OutputStreamWriter(baos, Charset.forName("UTF-8")));
|
||||
for (NodeAttributeInfo attribute : client.getClusterAttributes()) {
|
||||
pw.println(attribute.toString());
|
||||
}
|
||||
pw.close();
|
||||
sysout.println(baos.toString("UTF-8"));
|
||||
}
|
||||
|
||||
void printClusterNodeLabels() throws YarnException, IOException {
|
||||
List<NodeLabel> nodeLabels = null;
|
||||
if (accessLocal) {
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -44,7 +44,6 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
@ -307,6 +306,18 @@ private void printNodeStatus(String nodeIdStr) throws YarnException,
|
||||
Collections.sort(nodeLabelsList);
|
||||
nodeReportStr.println(StringUtils.join(nodeLabelsList.iterator(), ','));
|
||||
|
||||
if (nodeReport.getNodeAttributes().size() > 0) {
|
||||
ArrayList nodeAtrs = new ArrayList<>(nodeReport.getNodeAttributes());
|
||||
nodeReportStr.print("\tNode Attributes : ");
|
||||
nodeReportStr.println(nodeAtrs.get(0).toString());
|
||||
for (int index = 1; index < nodeAtrs.size(); index++) {
|
||||
nodeReportStr.println(
|
||||
String.format("\t%18s%s", "", nodeAtrs.get(index).toString()));
|
||||
}
|
||||
} else {
|
||||
nodeReportStr.println("\tNode Attributes : ");
|
||||
}
|
||||
|
||||
nodeReportStr.print("\tResource Utilization by Node : ");
|
||||
if (nodeReport.getNodeUtilization() != null) {
|
||||
nodeReportStr.print("PMem:"
|
||||
|
@ -18,6 +18,9 @@
|
||||
|
||||
package org.apache.hadoop.yarn.client.cli;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
@ -74,7 +77,32 @@ public void testGetClusterNodeLabels() throws Exception {
|
||||
pw.close();
|
||||
verify(sysOut).println(baos.toString("UTF-8"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGetClusterNodeAttributes() throws Exception {
|
||||
YarnClient client = mock(YarnClient.class);
|
||||
when(client.getClusterAttributes()).thenReturn(ImmutableSet
|
||||
.of(NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"),
|
||||
NodeAttributeType.STRING), NodeAttributeInfo
|
||||
.newInstance(NodeAttributeKey.newInstance("CPU"),
|
||||
NodeAttributeType.STRING)));
|
||||
ClusterCLI cli = new ClusterCLI();
|
||||
cli.setClient(client);
|
||||
cli.setSysOutPrintStream(sysOut);
|
||||
cli.setSysErrPrintStream(sysErr);
|
||||
|
||||
int rc = cli.run(new String[] {ClusterCLI.CMD,
|
||||
"-" + ClusterCLI.LIST_CLUSTER_ATTRIBUTES});
|
||||
assertEquals(0, rc);
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
PrintWriter pw = new PrintWriter(baos);
|
||||
pw.println("rm.yarn.io/GPU(STRING)");
|
||||
pw.println("rm.yarn.io/CPU(STRING)");
|
||||
pw.close();
|
||||
verify(sysOut).println(baos.toString("UTF-8"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetClusterNodeLabelsWithLocalAccess() throws Exception {
|
||||
YarnClient client = mock(YarnClient.class);
|
||||
@ -157,6 +185,8 @@ public void testHelp() throws Exception {
|
||||
pw.println(" option is UNSTABLE, could be");
|
||||
pw.println(" removed in future releases.");
|
||||
pw.println(" -h,--help Displays help for all commands.");
|
||||
pw.println(" -lna,--list-node-attributes List cluster node-attribute");
|
||||
pw.println(" collection");
|
||||
pw.println(" -lnl,--list-node-labels List cluster node-label");
|
||||
pw.println(" collection");
|
||||
pw.close();
|
||||
|
@ -18,6 +18,20 @@
|
||||
|
||||
package org.apache.hadoop.yarn.client.cli;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
|
||||
import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
@ -29,8 +43,8 @@
|
||||
import java.io.PrintStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
@ -56,75 +70,122 @@ public class TestNodeAttributesCLI {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestNodeAttributesCLI.class);
|
||||
private ResourceManagerAdministrationProtocol admin;
|
||||
private NodesToAttributesMappingRequest request;
|
||||
private ApplicationClientProtocol client;
|
||||
private NodesToAttributesMappingRequest nodeToAttrRequest;
|
||||
private NodeAttributesCLI nodeAttributesCLI;
|
||||
private ByteArrayOutputStream errOutBytes = new ByteArrayOutputStream();
|
||||
private ByteArrayOutputStream sysOutBytes = new ByteArrayOutputStream();
|
||||
private String errOutput;
|
||||
private String sysOutput;
|
||||
|
||||
@Before
|
||||
public void configure() throws IOException, YarnException {
|
||||
|
||||
admin = mock(ResourceManagerAdministrationProtocol.class);
|
||||
client = mock(ApplicationClientProtocol.class);
|
||||
|
||||
when(admin.mapAttributesToNodes(any(NodesToAttributesMappingRequest.class)))
|
||||
.thenAnswer(new Answer<NodesToAttributesMappingResponse>() {
|
||||
@Override
|
||||
public NodesToAttributesMappingResponse answer(
|
||||
InvocationOnMock invocation) throws Throwable {
|
||||
request =
|
||||
nodeToAttrRequest =
|
||||
(NodesToAttributesMappingRequest) invocation.getArguments()[0];
|
||||
return NodesToAttributesMappingResponse.newInstance();
|
||||
}
|
||||
});
|
||||
|
||||
nodeAttributesCLI = new NodeAttributesCLI(new Configuration()) {
|
||||
nodeAttributesCLI = new NodeAttributesCLI() {
|
||||
@Override
|
||||
protected ResourceManagerAdministrationProtocol createAdminProtocol()
|
||||
throws IOException {
|
||||
return admin;
|
||||
protected AdminCommandHandler getAdminCommandHandler() {
|
||||
return new AdminCommandHandler() {
|
||||
@Override
|
||||
protected ResourceManagerAdministrationProtocol createAdminProtocol()
|
||||
throws IOException {
|
||||
return admin;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClientCommandHandler getClientCommandHandler() {
|
||||
ClientCommandHandler handler = new ClientCommandHandler() {
|
||||
@Override
|
||||
protected ApplicationClientProtocol createApplicationProtocol()
|
||||
throws IOException {
|
||||
return client;
|
||||
}
|
||||
};
|
||||
handler.setSysOut(new PrintStream(sysOutBytes));
|
||||
return handler;
|
||||
}
|
||||
};
|
||||
|
||||
nodeAttributesCLI.setErrOut(new PrintStream(errOutBytes));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHelp() throws Exception {
|
||||
String[] args = new String[] { "-help", "-replace" };
|
||||
String[] args = new String[] {"-help", "-replace"};
|
||||
assertTrue("It should have succeeded help for replace", 0 == runTool(args));
|
||||
assertOutputContains(
|
||||
"-replace <\"node1:attribute[(type)][=value],attribute1"
|
||||
+ "[=value],attribute2 node2:attribute2[=value],attribute3\"> :");
|
||||
assertOutputContains("Replace the node to attributes mapping information at"
|
||||
assertErrorContains("-replace <\"node1:attribute[(type)][=value],attribute1"
|
||||
+ "[=value],attribute2 node2:attribute2[=value],attribute3\">");
|
||||
assertErrorContains("Replace the node to attributes mapping information at"
|
||||
+ " the ResourceManager with the new mapping. Currently supported"
|
||||
+ " attribute type. And string is the default type too. Attribute value"
|
||||
+ " if not specified for string type value will be considered as empty"
|
||||
+ " string. Replaced node-attributes should not violate the existing"
|
||||
+ " attribute to attribute type mapping.");
|
||||
|
||||
args = new String[] { "-help", "-remove" };
|
||||
args = new String[] {"-help", "-remove"};
|
||||
assertTrue("It should have succeeded help for replace", 0 == runTool(args));
|
||||
assertOutputContains(
|
||||
"-remove <\"node1:attribute,attribute1" + " node2:attribute2\"> :");
|
||||
assertOutputContains("Removes the specified node to attributes mapping"
|
||||
assertErrorContains(
|
||||
"-remove <\"node1:attribute,attribute1" + " node2:attribute2\">");
|
||||
assertErrorContains("Removes the specified node to attributes mapping"
|
||||
+ " information at the ResourceManager");
|
||||
|
||||
args = new String[] { "-help", "-add" };
|
||||
args = new String[] {"-help", "-add"};
|
||||
assertTrue("It should have succeeded help for replace", 0 == runTool(args));
|
||||
assertOutputContains("-add <\"node1:attribute[(type)][=value],"
|
||||
+ "attribute1[=value],attribute2 node2:attribute2[=value],attribute3\">"
|
||||
+ " :");
|
||||
assertOutputContains("Adds or updates the node to attributes mapping"
|
||||
assertErrorContains("-add <\"node1:attribute[(type)][=value],"
|
||||
+ "attribute1[=value],attribute2 node2:attribute2[=value],"
|
||||
+ "attribute3\">");
|
||||
assertErrorContains("Adds or updates the node to attributes mapping"
|
||||
+ " information at the ResourceManager. Currently supported attribute"
|
||||
+ " type is string. And string is the default type too. Attribute value"
|
||||
+ " if not specified for string type value will be considered as empty"
|
||||
+ " string. Added or updated node-attributes should not violate the"
|
||||
+ " existing attribute to attribute type mapping.");
|
||||
|
||||
args = new String[] { "-help", "-failOnUnknownNodes" };
|
||||
args = new String[] {"-help", "-failOnUnknownNodes"};
|
||||
assertTrue("It should have succeeded help for replace", 0 == runTool(args));
|
||||
assertOutputContains("-failOnUnknownNodes :");
|
||||
assertOutputContains("Can be used optionally along with other options. When"
|
||||
+ " its set, it will fail if specified nodes are unknown.");
|
||||
assertErrorContains("-failOnUnknownNodes");
|
||||
assertErrorContains("Can be used optionally along with [add,remove,"
|
||||
+ "replace] options. When set, command will fail if specified nodes "
|
||||
+ "are unknown.");
|
||||
|
||||
args = new String[] {"-help", "-list"};
|
||||
assertTrue("It should have succeeded help for replace", 0 == runTool(args));
|
||||
assertErrorContains("-list");
|
||||
assertErrorContains("List all attributes in cluster");
|
||||
|
||||
args = new String[] {"-help", "-nodes"};
|
||||
assertTrue("It should have succeeded help for replace", 0 == runTool(args));
|
||||
assertErrorContains("-nodes");
|
||||
assertErrorContains(
|
||||
"Works with [list] to specify node hostnames whose mappings "
|
||||
+ "are required to be displayed.");
|
||||
|
||||
args = new String[] {"-help", "-attributes"};
|
||||
assertTrue("It should have succeeded help for replace", 0 == runTool(args));
|
||||
assertErrorContains("-attributes");
|
||||
assertErrorContains(
|
||||
"Works with [attributestonodes] to specify attributes whose mapping "
|
||||
+ "are required to be displayed.");
|
||||
|
||||
args = new String[] {"-help", "-attributestonodes"};
|
||||
assertTrue("It should have succeeded help for replace", 0 == runTool(args));
|
||||
assertErrorContains("-attributestonodes");
|
||||
assertErrorContains("Displays mapping of attributes to nodes and attribute "
|
||||
+ "values grouped by attributes");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -133,62 +194,62 @@ public void testReplace() throws Exception {
|
||||
// failure scenarios
|
||||
// --------------------------------
|
||||
// parenthesis not match
|
||||
String[] args = new String[] { "-replace", "x(" };
|
||||
String[] args = new String[] {"-replace", "x("};
|
||||
assertTrue("It should have failed as no node is specified",
|
||||
0 != runTool(args));
|
||||
assertFailureMessageContains(NodeAttributesCLI.INVALID_MAPPING_ERR_MSG);
|
||||
|
||||
// parenthesis not match
|
||||
args = new String[] { "-replace", "x:(=abc" };
|
||||
args = new String[] {"-replace", "x:(=abc"};
|
||||
assertTrue(
|
||||
"It should have failed as no closing parenthesis is not specified",
|
||||
0 != runTool(args));
|
||||
assertFailureMessageContains(
|
||||
"Attribute for node x is not properly configured : (=abc");
|
||||
|
||||
args = new String[] { "-replace", "x:()=abc" };
|
||||
args = new String[] {"-replace", "x:()=abc"};
|
||||
assertTrue("It should have failed as no type specified inside parenthesis",
|
||||
0 != runTool(args));
|
||||
assertFailureMessageContains(
|
||||
"Attribute for node x is not properly configured : ()=abc");
|
||||
|
||||
args = new String[] { "-replace", ":x(string)" };
|
||||
args = new String[] {"-replace", ":x(string)"};
|
||||
assertTrue("It should have failed as no node is specified",
|
||||
0 != runTool(args));
|
||||
assertFailureMessageContains("Node name cannot be empty");
|
||||
|
||||
// Not expected key=value specifying inner parenthesis
|
||||
args = new String[] { "-replace", "x:(key=value)" };
|
||||
args = new String[] {"-replace", "x:(key=value)"};
|
||||
assertTrue(0 != runTool(args));
|
||||
assertFailureMessageContains(
|
||||
"Attribute for node x is not properly configured : (key=value)");
|
||||
|
||||
// Should fail as no attributes specified
|
||||
args = new String[] { "-replace" };
|
||||
args = new String[] {"-replace"};
|
||||
assertTrue("Should fail as no attribute mappings specified",
|
||||
0 != runTool(args));
|
||||
assertFailureMessageContains(NodeAttributesCLI.NO_MAPPING_ERR_MSG);
|
||||
assertFailureMessageContains(NodeAttributesCLI.MISSING_ARGUMENT);
|
||||
|
||||
// no labels, should fail
|
||||
args = new String[] { "-replace", "-failOnUnknownNodes",
|
||||
"x:key(string)=value,key2=val2" };
|
||||
args = new String[] {"-replace", "-failOnUnknownNodes",
|
||||
"x:key(string)=value,key2=val2"};
|
||||
assertTrue("Should fail as no attribute mappings specified for replace",
|
||||
0 != runTool(args));
|
||||
assertFailureMessageContains(NodeAttributesCLI.NO_MAPPING_ERR_MSG);
|
||||
assertFailureMessageContains(NodeAttributesCLI.MISSING_ARGUMENT);
|
||||
|
||||
// no labels, should fail
|
||||
args = new String[] { "-replace", " " };
|
||||
args = new String[] {"-replace", " "};
|
||||
assertTrue(0 != runTool(args));
|
||||
assertFailureMessageContains(NodeAttributesCLI.NO_MAPPING_ERR_MSG);
|
||||
|
||||
args = new String[] { "-replace", ", " };
|
||||
args = new String[] {"-replace", ", "};
|
||||
assertTrue(0 != runTool(args));
|
||||
assertFailureMessageContains(NodeAttributesCLI.INVALID_MAPPING_ERR_MSG);
|
||||
// --------------------------------
|
||||
// success scenarios
|
||||
// --------------------------------
|
||||
args = new String[] { "-replace",
|
||||
"x:key(string)=value,key2=val2 y:key2=val23,key3 z:key4" };
|
||||
args = new String[] {"-replace",
|
||||
"x:key(string)=value,key2=val2 y:key2=val23,key3 z:key4"};
|
||||
assertTrue("Should not fail as attribute has been properly mapped",
|
||||
0 == runTool(args));
|
||||
List<NodeToAttributes> nodeAttributesList = new ArrayList<>();
|
||||
@ -221,10 +282,10 @@ public void testReplace() throws Exception {
|
||||
.add(NodeAttribute.newInstance("key4", NodeAttributeType.STRING, ""));
|
||||
nodeAttributesList.add(NodeToAttributes.newInstance("z", attributes));
|
||||
|
||||
NodesToAttributesMappingRequest expected =
|
||||
NodesToAttributesMappingRequest.newInstance(
|
||||
AttributeMappingOperationType.REPLACE, nodeAttributesList, false);
|
||||
assertTrue(request.equals(expected));
|
||||
NodesToAttributesMappingRequest expected = NodesToAttributesMappingRequest
|
||||
.newInstance(AttributeMappingOperationType.REPLACE, nodeAttributesList,
|
||||
false);
|
||||
assertTrue(nodeToAttrRequest.equals(expected));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -233,16 +294,17 @@ public void testRemove() throws Exception {
|
||||
// failure scenarios
|
||||
// --------------------------------
|
||||
// parenthesis not match
|
||||
String[] args = new String[] { "-remove", "x:" };
|
||||
String[] args = new String[] {"-remove", "x:"};
|
||||
assertTrue("It should have failed as no node is specified",
|
||||
0 != runTool(args));
|
||||
assertFailureMessageContains(
|
||||
"Attributes cannot be null or empty for Operation REMOVE on the node x");
|
||||
"Attributes cannot be null or empty for Operation [remove] on the "
|
||||
+ "node x");
|
||||
// --------------------------------
|
||||
// success scenarios
|
||||
// --------------------------------
|
||||
args =
|
||||
new String[] { "-remove", "x:key2,key3 z:key4", "-failOnUnknownNodes" };
|
||||
new String[] {"-remove", "x:key2,key3 z:key4", "-failOnUnknownNodes"};
|
||||
assertTrue("Should not fail as attribute has been properly mapped",
|
||||
0 == runTool(args));
|
||||
List<NodeToAttributes> nodeAttributesList = new ArrayList<>();
|
||||
@ -259,10 +321,10 @@ public void testRemove() throws Exception {
|
||||
.add(NodeAttribute.newInstance("key4", NodeAttributeType.STRING, ""));
|
||||
nodeAttributesList.add(NodeToAttributes.newInstance("z", attributes));
|
||||
|
||||
NodesToAttributesMappingRequest expected =
|
||||
NodesToAttributesMappingRequest.newInstance(
|
||||
AttributeMappingOperationType.REMOVE, nodeAttributesList, true);
|
||||
assertTrue(request.equals(expected));
|
||||
NodesToAttributesMappingRequest expected = NodesToAttributesMappingRequest
|
||||
.newInstance(AttributeMappingOperationType.REMOVE, nodeAttributesList,
|
||||
true);
|
||||
assertTrue(nodeToAttrRequest.equals(expected));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -271,16 +333,16 @@ public void testAdd() throws Exception {
|
||||
// failure scenarios
|
||||
// --------------------------------
|
||||
// parenthesis not match
|
||||
String[] args = new String[] { "-add", "x:" };
|
||||
String[] args = new String[] {"-add", "x:"};
|
||||
assertTrue("It should have failed as no node is specified",
|
||||
0 != runTool(args));
|
||||
assertFailureMessageContains(
|
||||
"Attributes cannot be null or empty for Operation ADD on the node x");
|
||||
"Attributes cannot be null or empty for Operation [add] on the node x");
|
||||
// --------------------------------
|
||||
// success scenarios
|
||||
// --------------------------------
|
||||
args = new String[] { "-add", "x:key2=123,key3=abc z:key4(string)",
|
||||
"-failOnUnknownNodes" };
|
||||
args = new String[] {"-add", "x:key2=123,key3=abc z:key4(string)",
|
||||
"-failOnUnknownNodes"};
|
||||
assertTrue("Should not fail as attribute has been properly mapped",
|
||||
0 == runTool(args));
|
||||
List<NodeToAttributes> nodeAttributesList = new ArrayList<>();
|
||||
@ -297,16 +359,16 @@ public void testAdd() throws Exception {
|
||||
.add(NodeAttribute.newInstance("key4", NodeAttributeType.STRING, ""));
|
||||
nodeAttributesList.add(NodeToAttributes.newInstance("z", attributes));
|
||||
|
||||
NodesToAttributesMappingRequest expected =
|
||||
NodesToAttributesMappingRequest.newInstance(
|
||||
AttributeMappingOperationType.ADD, nodeAttributesList, true);
|
||||
assertTrue(request.equals(expected));
|
||||
NodesToAttributesMappingRequest expected = NodesToAttributesMappingRequest
|
||||
.newInstance(AttributeMappingOperationType.ADD, nodeAttributesList,
|
||||
true);
|
||||
assertTrue(nodeToAttrRequest.equals(expected));
|
||||
|
||||
// --------------------------------
|
||||
// with Duplicate mappings for a host
|
||||
// --------------------------------
|
||||
args = new String[] { "-add", "x:key2=123,key3=abc x:key4(string)",
|
||||
"-failOnUnknownNodes" };
|
||||
args = new String[] {"-add", "x:key2=123,key3=abc x:key4(string)",
|
||||
"-failOnUnknownNodes"};
|
||||
assertTrue("Should not fail as attribute has been properly mapped",
|
||||
0 == runTool(args));
|
||||
nodeAttributesList = new ArrayList<>();
|
||||
@ -315,32 +377,161 @@ public void testAdd() throws Exception {
|
||||
.add(NodeAttribute.newInstance("key4", NodeAttributeType.STRING, ""));
|
||||
nodeAttributesList.add(NodeToAttributes.newInstance("x", attributes));
|
||||
|
||||
expected =
|
||||
NodesToAttributesMappingRequest.newInstance(
|
||||
AttributeMappingOperationType.ADD, nodeAttributesList, true);
|
||||
assertTrue(request.equals(expected));
|
||||
expected = NodesToAttributesMappingRequest
|
||||
.newInstance(AttributeMappingOperationType.ADD, nodeAttributesList,
|
||||
true);
|
||||
assertTrue(nodeToAttrRequest.equals(expected));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListAttributes() throws Exception {
|
||||
|
||||
// GetClusterNodeAttributesRequest
|
||||
when(client
|
||||
.getClusterNodeAttributes(any(GetClusterNodeAttributesRequest.class)))
|
||||
.thenAnswer(new Answer<GetClusterNodeAttributesResponse>() {
|
||||
@Override
|
||||
public GetClusterNodeAttributesResponse answer(
|
||||
InvocationOnMock invocation) throws Throwable {
|
||||
GetClusterNodeAttributesRequest nodeAttrReq =
|
||||
(GetClusterNodeAttributesRequest) invocation.getArguments()[0];
|
||||
return GetClusterNodeAttributesResponse.newInstance(ImmutableSet
|
||||
.of(NodeAttributeInfo
|
||||
.newInstance(NodeAttributeKey.newInstance("GPU"),
|
||||
NodeAttributeType.STRING)));
|
||||
}
|
||||
});
|
||||
|
||||
// --------------------------------
|
||||
// Success scenarios
|
||||
// --------------------------------
|
||||
String[] args = new String[] {"-list"};
|
||||
assertTrue("It should be success since it list all attributes",
|
||||
0 == runTool(args));
|
||||
assertSysOutContains("Attribute\t Type",
|
||||
"rm.yarn.io/GPU\t STRING");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeToAttributes() throws Exception {
|
||||
// GetNodesToAttributesRequest response
|
||||
when(client.getNodesToAttributes(any(GetNodesToAttributesRequest.class)))
|
||||
.thenAnswer(new Answer<GetNodesToAttributesResponse>() {
|
||||
@Override
|
||||
public GetNodesToAttributesResponse answer(
|
||||
InvocationOnMock invocation) throws Throwable {
|
||||
GetNodesToAttributesRequest nodeToAttributes =
|
||||
(GetNodesToAttributesRequest) invocation.getArguments()[0];
|
||||
return GetNodesToAttributesResponse.newInstance(
|
||||
ImmutableMap.<String, Set<NodeAttribute>>builder()
|
||||
.put("hostname", ImmutableSet.of(NodeAttribute
|
||||
.newInstance("GPU", NodeAttributeType.STRING, "ARM")))
|
||||
.build());
|
||||
}
|
||||
});
|
||||
// --------------------------------
|
||||
// Failure scenarios
|
||||
// --------------------------------
|
||||
String[] args = new String[] {"-nodetoattributes", "-nodes"};
|
||||
assertTrue("It should not success since nodes are not specified",
|
||||
0 != runTool(args));
|
||||
assertErrorContains(NodeAttributesCLI.INVALID_COMMAND_USAGE);
|
||||
|
||||
// Missing argument for nodes
|
||||
args = new String[] {"-nodestoattributes", "-nodes"};
|
||||
assertTrue("It should not success since nodes are not specified",
|
||||
0 != runTool(args));
|
||||
assertErrorContains(NodeAttributesCLI.MISSING_ARGUMENT);
|
||||
|
||||
// --------------------------------
|
||||
// Success with hostname param
|
||||
// --------------------------------
|
||||
args = new String[] {"-nodestoattributes", "-nodes", "hostname"};
|
||||
assertTrue("Should return hostname to attributed list", 0 == runTool(args));
|
||||
assertSysOutContains("hostname");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAttributesToNodes() throws Exception {
|
||||
// GetAttributesToNodesResponse response
|
||||
when(client.getAttributesToNodes(any(GetAttributesToNodesRequest.class)))
|
||||
.thenAnswer(new Answer<GetAttributesToNodesResponse>() {
|
||||
@Override
|
||||
public GetAttributesToNodesResponse answer(
|
||||
InvocationOnMock invocation) throws Throwable {
|
||||
GetAttributesToNodesRequest attrToNodes =
|
||||
(GetAttributesToNodesRequest) invocation.getArguments()[0];
|
||||
return GetAttributesToNodesResponse.newInstance(
|
||||
ImmutableMap.<NodeAttributeKey,
|
||||
List<NodeToAttributeValue>>builder()
|
||||
.put(NodeAttributeKey.newInstance("GPU"), ImmutableList
|
||||
.of(NodeToAttributeValue.newInstance("host1", "ARM")))
|
||||
.build());
|
||||
}
|
||||
});
|
||||
// --------------------------------
|
||||
// Success scenarios
|
||||
// --------------------------------
|
||||
String[] args = new String[] {"-attributestonodes"};
|
||||
assertTrue("It should be success since it list all attributes",
|
||||
0 == runTool(args));
|
||||
assertSysOutContains("Hostname\tAttribute-value", "rm.yarn.io/GPU :",
|
||||
"host1\t ARM");
|
||||
|
||||
// --------------------------------
|
||||
// fail scenario argument filter missing
|
||||
// --------------------------------
|
||||
args = new String[] {"-attributestonodes", "-attributes"};
|
||||
assertTrue(
|
||||
"It should not success since attributes for filter are not specified",
|
||||
0 != runTool(args));
|
||||
assertErrorContains(NodeAttributesCLI.MISSING_ARGUMENT);
|
||||
|
||||
// --------------------------------
|
||||
// fail scenario argument filter missing
|
||||
// --------------------------------
|
||||
args = new String[] {"-attributestonodes", "-attributes", "fail/da/fail"};
|
||||
assertTrue("It should not success since attributes format is not correct",
|
||||
0 != runTool(args));
|
||||
assertErrorContains(
|
||||
"Attribute format not correct. Should be <[prefix]/[name]> "
|
||||
+ ":fail/da/fail");
|
||||
}
|
||||
|
||||
private void assertFailureMessageContains(String... messages) {
|
||||
assertOutputContains(messages);
|
||||
assertOutputContains(NodeAttributesCLI.USAGE_YARN_NODE_ATTRIBUTES);
|
||||
assertErrorContains(messages);
|
||||
assertErrorContains(NodeAttributesCLI.USAGE_YARN_NODE_ATTRIBUTES);
|
||||
}
|
||||
|
||||
private void assertOutputContains(String... messages) {
|
||||
private void assertErrorContains(String... messages) {
|
||||
for (String message : messages) {
|
||||
if (!errOutput.contains(message)) {
|
||||
fail("Expected output to contain '" + message
|
||||
+ "' but err_output was:\n" + errOutput);
|
||||
fail(
|
||||
"Expected output to contain '" + message + "' but err_output was:\n"
|
||||
+ errOutput);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void assertSysOutContains(String... messages) {
|
||||
for (String message : messages) {
|
||||
if (!sysOutput.contains(message)) {
|
||||
fail(
|
||||
"Expected output to contain '" + message + "' but sys_output was:\n"
|
||||
+ sysOutput);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int runTool(String... args) throws Exception {
|
||||
errOutBytes.reset();
|
||||
sysOutBytes.reset();
|
||||
LOG.info("Running: NodeAttributesCLI " + Joiner.on(" ").join(args));
|
||||
int ret = nodeAttributesCLI.run(args);
|
||||
errOutput = new String(errOutBytes.toByteArray(), Charsets.UTF_8);
|
||||
sysOutput = new String(sysOutBytes.toByteArray(), Charsets.UTF_8);
|
||||
LOG.info("Err_output:\n" + errOutput);
|
||||
LOG.info("Sys_output:\n" + sysOutput);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.yarn.client.cli;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
@ -1544,8 +1546,8 @@ private List<NodeReport> getNodeReports(
|
||||
public void testNodeStatus() throws Exception {
|
||||
NodeId nodeId = NodeId.newInstance("host0", 0);
|
||||
NodeCLI cli = new NodeCLI();
|
||||
when(client.getNodeReports()).thenReturn(
|
||||
getNodeReports(3, NodeState.RUNNING, false));
|
||||
when(client.getNodeReports())
|
||||
.thenReturn(getNodeReports(3, NodeState.RUNNING, false, false, false));
|
||||
cli.setClient(client);
|
||||
cli.setSysOutPrintStream(sysOut);
|
||||
cli.setSysErrPrintStream(sysErr);
|
||||
@ -1568,6 +1570,8 @@ public void testNodeStatus() throws Exception {
|
||||
pw.println("\tCPU-Used : 0 vcores");
|
||||
pw.println("\tCPU-Capacity : 0 vcores");
|
||||
pw.println("\tNode-Labels : a,b,c,x,y,z");
|
||||
pw.println("\tNode Attributes : rm.yarn.io/GPU(STRING)=ARM");
|
||||
pw.println("\t rm.yarn.io/CPU(STRING)=ARM");
|
||||
pw.println("\tResource Utilization by Node : PMem:2048 MB, VMem:4096 MB, VCores:8.0");
|
||||
pw.println("\tResource Utilization by Containers : PMem:1024 MB, VMem:2048 MB, VCores:4.0");
|
||||
pw.close();
|
||||
@ -1604,6 +1608,7 @@ public void testNodeStatusWithEmptyNodeLabels() throws Exception {
|
||||
pw.println("\tCPU-Used : 0 vcores");
|
||||
pw.println("\tCPU-Capacity : 0 vcores");
|
||||
pw.println("\tNode-Labels : ");
|
||||
pw.println("\tNode Attributes : ");
|
||||
pw.println("\tResource Utilization by Node : PMem:2048 MB, VMem:4096 MB, VCores:8.0");
|
||||
pw.println("\tResource Utilization by Containers : PMem:1024 MB, VMem:2048 MB, VCores:4.0");
|
||||
pw.close();
|
||||
@ -1616,8 +1621,8 @@ public void testNodeStatusWithEmptyNodeLabels() throws Exception {
|
||||
public void testNodeStatusWithEmptyResourceUtilization() throws Exception {
|
||||
NodeId nodeId = NodeId.newInstance("host0", 0);
|
||||
NodeCLI cli = new NodeCLI();
|
||||
when(client.getNodeReports()).thenReturn(
|
||||
getNodeReports(3, NodeState.RUNNING, false, true));
|
||||
when(client.getNodeReports())
|
||||
.thenReturn(getNodeReports(3, NodeState.RUNNING, false, true, true));
|
||||
cli.setClient(client);
|
||||
cli.setSysOutPrintStream(sysOut);
|
||||
cli.setSysErrPrintStream(sysErr);
|
||||
@ -1640,6 +1645,7 @@ public void testNodeStatusWithEmptyResourceUtilization() throws Exception {
|
||||
pw.println("\tCPU-Used : 0 vcores");
|
||||
pw.println("\tCPU-Capacity : 0 vcores");
|
||||
pw.println("\tNode-Labels : a,b,c,x,y,z");
|
||||
pw.println("\tNode Attributes : ");
|
||||
pw.println("\tResource Utilization by Node : ");
|
||||
pw.println("\tResource Utilization by Containers : ");
|
||||
pw.close();
|
||||
@ -2049,18 +2055,20 @@ private void verifyUsageInfo(YarnCLI cli) throws Exception {
|
||||
cli.run(new String[] { "application" });
|
||||
verify(sysErr).println("Invalid Command Usage : ");
|
||||
}
|
||||
|
||||
|
||||
private List<NodeReport> getNodeReports(int noOfNodes, NodeState state) {
|
||||
return getNodeReports(noOfNodes, state, true, false);
|
||||
return getNodeReports(noOfNodes, state, true, false, true);
|
||||
}
|
||||
|
||||
private List<NodeReport> getNodeReports(int noOfNodes, NodeState state,
|
||||
boolean emptyNodeLabel) {
|
||||
return getNodeReports(noOfNodes, state, emptyNodeLabel, false);
|
||||
boolean emptyNodeLabel, boolean emptyAttributes) {
|
||||
return getNodeReports(noOfNodes, state, emptyNodeLabel, false,
|
||||
emptyAttributes);
|
||||
}
|
||||
|
||||
private List<NodeReport> getNodeReports(int noOfNodes, NodeState state,
|
||||
boolean emptyNodeLabel, boolean emptyResourceUtilization) {
|
||||
boolean emptyNodeLabel, boolean emptyResourceUtilization,
|
||||
boolean emptyAttributes) {
|
||||
List<NodeReport> nodeReports = new ArrayList<NodeReport>();
|
||||
|
||||
for (int i = 0; i < noOfNodes; i++) {
|
||||
@ -2082,6 +2090,11 @@ private List<NodeReport> getNodeReports(int noOfNodes, NodeState state,
|
||||
nodeReport.setAggregatedContainersUtilization(containersUtilization);
|
||||
nodeReport.setNodeUtilization(nodeUtilization);
|
||||
}
|
||||
if (!emptyAttributes) {
|
||||
nodeReport.setNodeAttributes(ImmutableSet.of(NodeAttribute
|
||||
.newInstance("GPU", NodeAttributeType.STRING, "ARM"),
|
||||
NodeAttribute.newInstance("CPU", NodeAttributeType.STRING, "ARM")));
|
||||
}
|
||||
nodeReports.add(nodeReport);
|
||||
}
|
||||
return nodeReports;
|
||||
|
@ -130,14 +130,18 @@ public boolean equals(Object obj) {
|
||||
}
|
||||
if (obj instanceof NodeAttributeInfo) {
|
||||
NodeAttributeInfo other = (NodeAttributeInfo) obj;
|
||||
getAttributeKey().equals(other.getAttributeKey());
|
||||
return true;
|
||||
return getAttributeKey().equals(other.getAttributeKey());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getAttributeKey().toString() + ":Type-" + getAttributeType();
|
||||
StringBuilder strBuilder = new StringBuilder();
|
||||
NodeAttributeKey key = this.getAttributeKey();
|
||||
strBuilder.append(key.getAttributePrefix()).append("/")
|
||||
.append(key.getAttributeName()).append("(")
|
||||
.append(this.getAttributeType()).append(")");
|
||||
return strBuilder.toString();
|
||||
}
|
||||
}
|
||||
|
@ -152,15 +152,19 @@ public boolean equals(Object obj) {
|
||||
}
|
||||
if (obj instanceof NodeAttribute) {
|
||||
NodeAttribute other = (NodeAttribute) obj;
|
||||
getAttributeKey().equals(other.getAttributeKey());
|
||||
return true;
|
||||
return getAttributeKey().equals(other.getAttributeKey());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getAttributeKey().toString() + ":Value-" + getAttributeValue()
|
||||
+ ":Type-" + getAttributeType();
|
||||
StringBuilder strBuilder = new StringBuilder();
|
||||
NodeAttributeKey key = this.getAttributeKey();
|
||||
strBuilder.append(key.getAttributePrefix()).append("/")
|
||||
.append(key.getAttributeName()).append("(")
|
||||
.append(this.getAttributeType()).append(")=")
|
||||
.append(this.getAttributeValue());
|
||||
return strBuilder.toString();
|
||||
}
|
||||
}
|
||||
|
@ -18,17 +18,21 @@
|
||||
|
||||
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.NodeUpdateType;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProtoOrBuilder;
|
||||
@ -50,6 +54,7 @@ public class NodeReportPBImpl extends NodeReport {
|
||||
private ResourceUtilization containersUtilization = null;
|
||||
private ResourceUtilization nodeUtilization = null;
|
||||
Set<String> labels;
|
||||
private Set<NodeAttribute> nodeAttributes;
|
||||
|
||||
public NodeReportPBImpl() {
|
||||
builder = NodeReportProto.newBuilder();
|
||||
@ -268,6 +273,14 @@ private void mergeLocalToBuilder() {
|
||||
builder.clearNodeLabels();
|
||||
builder.addAllNodeLabels(this.labels);
|
||||
}
|
||||
if (this.nodeAttributes != null) {
|
||||
builder.clearNodeAttributes();
|
||||
List<NodeAttributeProto> attrList = new ArrayList<>();
|
||||
for (NodeAttribute attr : this.nodeAttributes) {
|
||||
attrList.add(convertToProtoFormat(attr));
|
||||
}
|
||||
builder.addAllNodeAttributes(attrList);
|
||||
}
|
||||
if (this.nodeUtilization != null
|
||||
&& !((ResourceUtilizationPBImpl) this.nodeUtilization).getProto()
|
||||
.equals(builder.getNodeUtilization())) {
|
||||
@ -306,7 +319,16 @@ private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) {
|
||||
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
|
||||
return ((NodeIdPBImpl) nodeId).getProto();
|
||||
}
|
||||
|
||||
|
||||
private NodeAttributeProto convertToProtoFormat(NodeAttribute nodeAttr) {
|
||||
return ((NodeAttributePBImpl) nodeAttr).getProto();
|
||||
}
|
||||
|
||||
private NodeAttributePBImpl convertFromProtoFormat(
|
||||
NodeAttributeProto nodeAttr) {
|
||||
return new NodeAttributePBImpl(nodeAttr);
|
||||
}
|
||||
|
||||
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
|
||||
return new ResourcePBImpl(p);
|
||||
}
|
||||
@ -427,4 +449,24 @@ public void setNodeUpdateType(NodeUpdateType nodeUpdateType) {
|
||||
}
|
||||
builder.setNodeUpdateType(ProtoUtils.convertToProtoFormat(nodeUpdateType));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNodeAttributes(Set<NodeAttribute> nodeAttrs) {
|
||||
maybeInitBuilder();
|
||||
builder.clearNodeAttributes();
|
||||
this.nodeAttributes = nodeAttrs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<NodeAttribute> getNodeAttributes() {
|
||||
if (nodeAttributes != null) {
|
||||
return nodeAttributes;
|
||||
}
|
||||
NodeReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
this.nodeAttributes = new HashSet<>();
|
||||
for (NodeAttributeProto nattrProto : p.getNodeAttributesList()) {
|
||||
nodeAttributes.add(convertFromProtoFormat(nattrProto));
|
||||
}
|
||||
return nodeAttributes;
|
||||
}
|
||||
}
|
||||
|
@ -54,6 +54,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
@ -199,7 +200,7 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
|
||||
NodeUpdateType nodeUpdateType) {
|
||||
return newNodeReport(nodeId, nodeState, httpAddress, rackName, used,
|
||||
capability, numContainers, healthReport, lastHealthReportTime,
|
||||
nodeLabels, null, null, decommissioningTimeout, nodeUpdateType);
|
||||
nodeLabels, null, null, decommissioningTimeout, nodeUpdateType, null);
|
||||
}
|
||||
|
||||
public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
|
||||
@ -207,7 +208,7 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
|
||||
int numContainers, String healthReport, long lastHealthReportTime,
|
||||
Set<String> nodeLabels, ResourceUtilization containersUtilization,
|
||||
ResourceUtilization nodeUtilization, Integer decommissioningTimeout,
|
||||
NodeUpdateType nodeUpdateType) {
|
||||
NodeUpdateType nodeUpdateType, Set<NodeAttribute> attrs) {
|
||||
NodeReport nodeReport = recordFactory.newRecordInstance(NodeReport.class);
|
||||
nodeReport.setNodeId(nodeId);
|
||||
nodeReport.setNodeState(nodeState);
|
||||
@ -223,6 +224,7 @@ public static NodeReport newNodeReport(NodeId nodeId, NodeState nodeState,
|
||||
nodeReport.setNodeUtilization(nodeUtilization);
|
||||
nodeReport.setDecommissioningTimeout(decommissioningTimeout);
|
||||
nodeReport.setNodeUpdateType(nodeUpdateType);
|
||||
nodeReport.setNodeAttributes(attrs);
|
||||
return nodeReport;
|
||||
}
|
||||
|
||||
|
@ -983,12 +983,11 @@ public NodesToAttributesMappingResponse mapAttributesToNodes(
|
||||
List<NodeToAttributes> nodesToAttributes = request.getNodesToAttributes();
|
||||
boolean failOnUnknownNodes = request.getFailOnUnknownNodes();
|
||||
|
||||
Map<String, Set<NodeAttribute>> nodeAttributeMapping =
|
||||
validateAndFetch(nodesToAttributes, failOnUnknownNodes);
|
||||
|
||||
NodeAttributesManager nodeAttributesManager =
|
||||
rm.getRMContext().getNodeAttributesManager();
|
||||
try {
|
||||
Map<String, Set<NodeAttribute>> nodeAttributeMapping =
|
||||
validateAndFetch(nodesToAttributes, failOnUnknownNodes);
|
||||
switch (request.getOperation()) {
|
||||
case ADD:
|
||||
nodeAttributesManager.addNodeAttributes(nodeAttributeMapping);
|
||||
|
@ -1050,8 +1050,9 @@ private NodeReport createNodeReports(RMNode rmNode) {
|
||||
if (schedulerNodeReport != null) {
|
||||
used = schedulerNodeReport.getUsedResource();
|
||||
numContainers = schedulerNodeReport.getNumContainers();
|
||||
}
|
||||
}
|
||||
|
||||
Set<NodeAttribute> attrs = rmNode.getAllNodeAttributes();
|
||||
NodeReport report =
|
||||
BuilderUtils.newNodeReport(rmNode.getNodeID(), rmNode.getState(),
|
||||
rmNode.getHttpAddress(), rmNode.getRackName(), used,
|
||||
@ -1059,7 +1060,7 @@ private NodeReport createNodeReports(RMNode rmNode) {
|
||||
rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
|
||||
rmNode.getNodeLabels(), rmNode.getAggregatedContainersUtilization(),
|
||||
rmNode.getNodeUtilization(), rmNode.getDecommissioningTimeout(),
|
||||
null);
|
||||
null, attrs);
|
||||
|
||||
return report;
|
||||
}
|
||||
|
@ -673,10 +673,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||
this.rmContext.getNodeAttributesManager()
|
||||
.replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
|
||||
ImmutableMap.of(nodeId.getHost(), nodeAttributes));
|
||||
|
||||
// Update node attributes to RMNode
|
||||
rmNode.setNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
|
||||
nodeAttributes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -197,15 +197,8 @@ public interface RMNode {
|
||||
*/
|
||||
RMContext getRMContext();
|
||||
|
||||
/**
|
||||
* Sets node attributes per prefix.
|
||||
* @param prefix node attribute prefix
|
||||
* @param nodeAttributes node attributes
|
||||
*/
|
||||
void setNodeAttributes(String prefix, Set<NodeAttribute> nodeAttributes);
|
||||
|
||||
/**
|
||||
* @return all node attributes grouped by their prefix as a map.
|
||||
* @return all node attributes as a Set.
|
||||
*/
|
||||
Map<String, Set<NodeAttribute>> getAllNodeAttributes();
|
||||
Set<NodeAttribute> getAllNodeAttributes();
|
||||
}
|
||||
|
@ -59,7 +59,9 @@
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.nodelabels.AttributeValue;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
@ -186,9 +188,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||
private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
|
||||
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||
|
||||
// Node attributes, store by prefix
|
||||
private Map<String, Set<NodeAttribute>> nodeAttributes = new HashMap<>();
|
||||
|
||||
private static final StateMachineFactory<RMNodeImpl,
|
||||
NodeState,
|
||||
RMNodeEventType,
|
||||
@ -1552,13 +1551,10 @@ public RMContext getRMContext() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNodeAttributes(String prefix,
|
||||
Set<NodeAttribute> nodeAttributeSet) {
|
||||
this.nodeAttributes.put(prefix, nodeAttributeSet);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Set<NodeAttribute>> getAllNodeAttributes() {
|
||||
return this.nodeAttributes;
|
||||
public Set<NodeAttribute> getAllNodeAttributes() {
|
||||
NodeAttributesManager attrMgr = context.getNodeAttributesManager();
|
||||
Map<NodeAttribute, AttributeValue> nodeattrs =
|
||||
attrMgr.getAttributesForNode(hostName);
|
||||
return nodeattrs.keySet();
|
||||
}
|
||||
}
|
||||
|
@ -116,16 +116,11 @@ public NodeInfo(RMNode ni, ResourceScheduler sched) {
|
||||
}
|
||||
|
||||
// add attributes
|
||||
Map<String, Set<NodeAttribute>> nodeAttributes =
|
||||
ni.getAllNodeAttributes();
|
||||
Set<NodeAttribute> attrs = ni.getAllNodeAttributes();
|
||||
nodeAttributesInfo = new NodeAttributesInfo();
|
||||
if (nodeAttributes != null) {
|
||||
for (Set<NodeAttribute> attrs : nodeAttributes.values()) {
|
||||
for (NodeAttribute attribute : attrs) {
|
||||
NodeAttributeInfo info = new NodeAttributeInfo(attribute);
|
||||
this.nodeAttributesInfo.addNodeAttributeInfo(info);
|
||||
}
|
||||
}
|
||||
for (NodeAttribute attribute : attrs) {
|
||||
NodeAttributeInfo info = new NodeAttributeInfo(attribute);
|
||||
this.nodeAttributesInfo.addNodeAttributeInfo(info);
|
||||
}
|
||||
|
||||
// add allocation tags
|
||||
|
@ -292,8 +292,8 @@ public void setNodeAttributes(String prefix,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Set<NodeAttribute>> getAllNodeAttributes() {
|
||||
return null;
|
||||
public Set<NodeAttribute> getAllNodeAttributes() {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1611,9 +1611,10 @@ public void testMapAttributesToNodes() throws Exception, YarnException {
|
||||
try {
|
||||
rm.adminService.mapAttributesToNodes(request);
|
||||
fail("host5 is not a valid node, It should have failed");
|
||||
} catch (Exception ex) {
|
||||
} catch (YarnException ex) {
|
||||
Assert.assertEquals("Exception Message is not as desired",
|
||||
" Following nodes does not exist : [host5]", ex.getMessage());
|
||||
" Following nodes does not exist : [host5]",
|
||||
ex.getCause().getMessage());
|
||||
}
|
||||
|
||||
request =
|
||||
@ -1633,10 +1634,10 @@ public void testMapAttributesToNodes() throws Exception, YarnException {
|
||||
// against hostname hence the message as : nodes does not exist.
|
||||
rm.adminService.mapAttributesToNodes(request);
|
||||
fail("host with the port should fail as only hostnames are validated");
|
||||
} catch (Exception ex) {
|
||||
} catch (YarnException ex) {
|
||||
Assert.assertEquals("Exception Message is not as desired",
|
||||
" Following nodes does not exist : [host4:8889, host2:8889]",
|
||||
ex.getMessage());
|
||||
ex.getCause().getMessage());
|
||||
}
|
||||
|
||||
request =
|
||||
@ -1669,11 +1670,10 @@ public void testMapAttributesToNodes() throws Exception, YarnException {
|
||||
try {
|
||||
rm.adminService.mapAttributesToNodes(request);
|
||||
fail("This operation should fail as prefix should be \"nm.yarn.io\".");
|
||||
} catch (Exception ex) {
|
||||
} catch (YarnException ex) {
|
||||
Assert.assertEquals("Exception Message is not as desired",
|
||||
"Invalid Attribute Mapping for the node host5. Prefix should be "
|
||||
+ "rm.yarn.io",
|
||||
ex.getMessage());
|
||||
+ "rm.yarn.io", ex.getCause().getMessage());
|
||||
}
|
||||
|
||||
rm.close();
|
||||
|
Loading…
Reference in New Issue
Block a user