YARN-2699. Fixed a bug in CommonNodeLabelsManager that caused tests to fail when using ephemeral ports on NodeIDs. Contributed by Wangda Tan.
This commit is contained in:
parent
368743140d
commit
abae63caf9
@ -594,6 +594,9 @@ Release 2.6.0 - UNRELEASED
|
|||||||
tracking per label when a host runs multiple node-managers. (Wangda Tan via
|
tracking per label when a host runs multiple node-managers. (Wangda Tan via
|
||||||
vinodkv)
|
vinodkv)
|
||||||
|
|
||||||
|
YARN-2699. Fixed a bug in CommonNodeLabelsManager that caused tests to fail
|
||||||
|
when using ephemeral ports on NodeIDs. (Wangda Tan via vinodkv)
|
||||||
|
|
||||||
BREAKDOWN OF YARN-1051 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF YARN-1051 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
YARN-1707. Introduce APIs to add/remove/resize queues in the
|
YARN-1707. Introduce APIs to add/remove/resize queues in the
|
||||||
|
@ -299,14 +299,14 @@ protected void internalAddLabelsToNode(
|
|||||||
for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
|
for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
|
||||||
NodeId nodeId = entry.getKey();
|
NodeId nodeId = entry.getKey();
|
||||||
Set<String> labels = entry.getValue();
|
Set<String> labels = entry.getValue();
|
||||||
|
|
||||||
createNodeIfNonExisted(entry.getKey());
|
createHostIfNonExisted(nodeId.getHost());
|
||||||
|
|
||||||
if (nodeId.getPort() == WILDCARD_PORT) {
|
if (nodeId.getPort() == WILDCARD_PORT) {
|
||||||
Host host = nodeCollections.get(nodeId.getHost());
|
Host host = nodeCollections.get(nodeId.getHost());
|
||||||
host.labels.addAll(labels);
|
host.labels.addAll(labels);
|
||||||
newNMToLabels.put(nodeId, host.labels);
|
newNMToLabels.put(nodeId, host.labels);
|
||||||
} else {
|
} else {
|
||||||
|
createNodeIfNonExisted(nodeId);
|
||||||
Node nm = getNMInNodeSet(nodeId);
|
Node nm = getNMInNodeSet(nodeId);
|
||||||
if (nm.labels == null) {
|
if (nm.labels == null) {
|
||||||
nm.labels = new HashSet<String>();
|
nm.labels = new HashSet<String>();
|
||||||
@ -534,21 +534,21 @@ protected void checkReplaceLabelsOnNode(
|
|||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected void internalReplaceLabelsOnNode(
|
protected void internalReplaceLabelsOnNode(
|
||||||
Map<NodeId, Set<String>> replaceLabelsToNode) {
|
Map<NodeId, Set<String>> replaceLabelsToNode) throws IOException {
|
||||||
// do replace labels to nodes
|
// do replace labels to nodes
|
||||||
Map<NodeId, Set<String>> newNMToLabels = new HashMap<NodeId, Set<String>>();
|
Map<NodeId, Set<String>> newNMToLabels = new HashMap<NodeId, Set<String>>();
|
||||||
for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
|
for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
|
||||||
NodeId nodeId = entry.getKey();
|
NodeId nodeId = entry.getKey();
|
||||||
Set<String> labels = entry.getValue();
|
Set<String> labels = entry.getValue();
|
||||||
|
|
||||||
// update nodeCollections
|
createHostIfNonExisted(nodeId.getHost());
|
||||||
createNodeIfNonExisted(entry.getKey());
|
|
||||||
if (nodeId.getPort() == WILDCARD_PORT) {
|
if (nodeId.getPort() == WILDCARD_PORT) {
|
||||||
Host host = nodeCollections.get(nodeId.getHost());
|
Host host = nodeCollections.get(nodeId.getHost());
|
||||||
host.labels.clear();
|
host.labels.clear();
|
||||||
host.labels.addAll(labels);
|
host.labels.addAll(labels);
|
||||||
newNMToLabels.put(nodeId, host.labels);
|
newNMToLabels.put(nodeId, host.labels);
|
||||||
} else {
|
} else {
|
||||||
|
createNodeIfNonExisted(nodeId);
|
||||||
Node nm = getNMInNodeSet(nodeId);
|
Node nm = getNMInNodeSet(nodeId);
|
||||||
if (nm.labels == null) {
|
if (nm.labels == null) {
|
||||||
nm.labels = new HashSet<String>();
|
nm.labels = new HashSet<String>();
|
||||||
@ -672,10 +672,6 @@ protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map) {
|
|||||||
|
|
||||||
protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map,
|
protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map,
|
||||||
boolean checkRunning) {
|
boolean checkRunning) {
|
||||||
if (WILDCARD_PORT == nodeId.getPort()) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
Host host = map.get(nodeId.getHost());
|
Host host = map.get(nodeId.getHost());
|
||||||
if (null == host) {
|
if (null == host) {
|
||||||
return null;
|
return null;
|
||||||
@ -707,17 +703,22 @@ protected Set<String> getLabelsByNode(NodeId nodeId, Map<String, Host> map) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void createNodeIfNonExisted(NodeId nodeId) {
|
protected void createNodeIfNonExisted(NodeId nodeId) throws IOException {
|
||||||
Host host = nodeCollections.get(nodeId.getHost());
|
Host host = nodeCollections.get(nodeId.getHost());
|
||||||
if (null == host) {
|
if (null == host) {
|
||||||
host = new Host();
|
throw new IOException("Should create host before creating node.");
|
||||||
nodeCollections.put(nodeId.getHost(), host);
|
|
||||||
}
|
}
|
||||||
if (nodeId.getPort() != WILDCARD_PORT) {
|
Node nm = host.nms.get(nodeId);
|
||||||
Node nm = host.nms.get(nodeId);
|
if (null == nm) {
|
||||||
if (null == nm) {
|
host.nms.put(nodeId, new Node());
|
||||||
host.nms.put(nodeId, new Node());
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void createHostIfNonExisted(String hostName) {
|
||||||
|
Host host = nodeCollections.get(hostName);
|
||||||
|
if (null == host) {
|
||||||
|
host = new Host();
|
||||||
|
nodeCollections.put(hostName, host);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -181,7 +181,15 @@ public void activateNode(NodeId nodeId, Resource resource) {
|
|||||||
// save if we have a node before
|
// save if we have a node before
|
||||||
Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
|
Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
|
||||||
|
|
||||||
createNodeIfNonExisted(nodeId);
|
createHostIfNonExisted(nodeId.getHost());
|
||||||
|
try {
|
||||||
|
createNodeIfNonExisted(nodeId);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("This shouldn't happen, cannot get host in nodeCollection"
|
||||||
|
+ " associated to the node being activated");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
Node nm = getNMInNodeSet(nodeId);
|
Node nm = getNMInNodeSet(nodeId);
|
||||||
nm.resource = resource;
|
nm.resource = resource;
|
||||||
nm.running = true;
|
nm.running = true;
|
||||||
@ -220,7 +228,7 @@ public void deactivateNode(NodeId nodeId) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateNodeResource(NodeId node, Resource newResource) {
|
public void updateNodeResource(NodeId node, Resource newResource) throws IOException {
|
||||||
deactivateNode(node);
|
deactivateNode(node);
|
||||||
activateNode(node, newResource);
|
activateNode(node, newResource);
|
||||||
}
|
}
|
||||||
|
@ -96,6 +96,14 @@ public void testNodeActiveDeactiveUpdate() throws Exception {
|
|||||||
Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null),
|
Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null),
|
||||||
Resources.add(SMALL_RESOURCE, LARGE_NODE));
|
Resources.add(SMALL_RESOURCE, LARGE_NODE));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 5000)
|
||||||
|
public void testActivateNodeManagerWithZeroPort() throws Exception {
|
||||||
|
// active two NM, one is zero port , another is non-zero port. no exception
|
||||||
|
// should be raised
|
||||||
|
mgr.activateNode(NodeId.newInstance("n1", 0), SMALL_RESOURCE);
|
||||||
|
mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE);
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||||
@Test(timeout = 5000)
|
@Test(timeout = 5000)
|
||||||
|
Loading…
Reference in New Issue
Block a user