YARN-3014. Replaces labels on a host should update all NM's labels on that host. Contributed by Wangda Tan

This commit is contained in:
Jian He 2015-01-09 17:49:53 -08:00
parent 27a489cba6
commit a260406268
4 changed files with 113 additions and 92 deletions

View File

@ -344,6 +344,9 @@ Release 2.7.0 - UNRELEASED
YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed
container statuses on heartbeat. (Chengbing Liu via jianhe) container statuses on heartbeat. (Chengbing Liu via jianhe)
YARN-3014. Replaces labels on a host should update all NM's labels on that
host. (Wangda Tan via jianhe)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -21,6 +21,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -131,6 +132,12 @@ public Node copy() {
} }
} }
private enum NodeLabelUpdateOperation {
ADD,
REMOVE,
REPLACE
}
private final class ForwardingEventHandler implements private final class ForwardingEventHandler implements
EventHandler<NodeLabelsStoreEvent> { EventHandler<NodeLabelsStoreEvent> {
@ -290,45 +297,6 @@ protected void checkAddLabelsToNode(
} }
} }
@SuppressWarnings("unchecked")
protected void internalAddLabelsToNode(
Map<NodeId, Set<String>> addedLabelsToNode) throws IOException {
// do add labels to nodes
Map<NodeId, Set<String>> newNMToLabels =
new HashMap<NodeId, Set<String>>();
for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();
createHostIfNonExisted(nodeId.getHost());
if (nodeId.getPort() == WILDCARD_PORT) {
Host host = nodeCollections.get(nodeId.getHost());
host.labels.addAll(labels);
newNMToLabels.put(nodeId, host.labels);
} else {
createNodeIfNonExisted(nodeId);
Node nm = getNMInNodeSet(nodeId);
if (nm.labels == null) {
nm.labels = new HashSet<String>();
}
nm.labels.addAll(labels);
newNMToLabels.put(nodeId, nm.labels);
}
}
if (null != dispatcher) {
dispatcher.getEventHandler().handle(
new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
}
// shows node->labels we added
LOG.info("addLabelsToNode:");
for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
LOG.info(" NM=" + entry.getKey() + ", labels=["
+ StringUtils.join(entry.getValue().iterator(), ",") + "]");
}
}
/** /**
* add more labels to nodes * add more labels to nodes
* *
@ -338,7 +306,7 @@ public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode)
throws IOException { throws IOException {
addedLabelsToNode = normalizeNodeIdToLabels(addedLabelsToNode); addedLabelsToNode = normalizeNodeIdToLabels(addedLabelsToNode);
checkAddLabelsToNode(addedLabelsToNode); checkAddLabelsToNode(addedLabelsToNode);
internalAddLabelsToNode(addedLabelsToNode); internalUpdateLabelsOnNodes(addedLabelsToNode, NodeLabelUpdateOperation.ADD);
} }
protected void checkRemoveFromClusterNodeLabels( protected void checkRemoveFromClusterNodeLabels(
@ -469,20 +437,70 @@ protected void checkRemoveLabelsFromNode(
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void internalRemoveLabelsFromNode( protected void internalUpdateLabelsOnNodes(
Map<NodeId, Set<String>> removeLabelsFromNode) { Map<NodeId, Set<String>> nodeToLabels, NodeLabelUpdateOperation op)
// do remove labels from nodes throws IOException {
// do update labels from nodes
Map<NodeId, Set<String>> newNMToLabels = Map<NodeId, Set<String>> newNMToLabels =
new HashMap<NodeId, Set<String>>(); new HashMap<NodeId, Set<String>>();
for (Entry<NodeId, Set<String>> entry : removeLabelsFromNode.entrySet()) { for (Entry<NodeId, Set<String>> entry : nodeToLabels.entrySet()) {
NodeId nodeId = entry.getKey(); NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue(); Set<String> labels = entry.getValue();
createHostIfNonExisted(nodeId.getHost());
if (nodeId.getPort() == WILDCARD_PORT) { if (nodeId.getPort() == WILDCARD_PORT) {
Host host = nodeCollections.get(nodeId.getHost()); Host host = nodeCollections.get(nodeId.getHost());
switch (op) {
case REMOVE:
host.labels.removeAll(labels); host.labels.removeAll(labels);
for (Node node : host.nms.values()) {
if (node.labels != null) {
node.labels.removeAll(labels);
}
}
break;
case ADD:
host.labels.addAll(labels);
for (Node node : host.nms.values()) {
if (node.labels != null) {
node.labels.addAll(labels);
}
}
break;
case REPLACE:
host.labels.clear();
host.labels.addAll(labels);
for (Node node : host.nms.values()) {
node.labels = null;
}
break;
default:
break;
}
newNMToLabels.put(nodeId, host.labels); newNMToLabels.put(nodeId, host.labels);
} else { } else {
if (EnumSet.of(NodeLabelUpdateOperation.ADD,
NodeLabelUpdateOperation.REPLACE).contains(op)) {
// Add and replace
createNodeIfNonExisted(nodeId);
Node nm = getNMInNodeSet(nodeId);
if (nm.labels == null) {
nm.labels = new HashSet<String>();
}
switch (op) {
case ADD:
nm.labels.addAll(labels);
break;
case REPLACE:
nm.labels.clear();
nm.labels.addAll(labels);
break;
default:
break;
}
newNMToLabels.put(nodeId, nm.labels);
} else {
// remove
Node nm = getNMInNodeSet(nodeId); Node nm = getNMInNodeSet(nodeId);
if (nm.labels != null) { if (nm.labels != null) {
nm.labels.removeAll(labels); nm.labels.removeAll(labels);
@ -490,6 +508,7 @@ protected void internalRemoveLabelsFromNode(
} }
} }
} }
}
if (null != dispatcher) { if (null != dispatcher) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
@ -497,7 +516,7 @@ protected void internalRemoveLabelsFromNode(
} }
// shows node->labels we added // shows node->labels we added
LOG.info("removeLabelsFromNode:"); LOG.info(op.name() + " labels on nodes:");
for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) { for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
LOG.info(" NM=" + entry.getKey() + ", labels=[" LOG.info(" NM=" + entry.getKey() + ", labels=["
+ StringUtils.join(entry.getValue().iterator(), ",") + "]"); + StringUtils.join(entry.getValue().iterator(), ",") + "]");
@ -517,7 +536,8 @@ protected void internalRemoveLabelsFromNode(
checkRemoveLabelsFromNode(removeLabelsFromNode); checkRemoveLabelsFromNode(removeLabelsFromNode);
internalRemoveLabelsFromNode(removeLabelsFromNode); internalUpdateLabelsOnNodes(removeLabelsFromNode,
NodeLabelUpdateOperation.REMOVE);
} }
protected void checkReplaceLabelsOnNode( protected void checkReplaceLabelsOnNode(
@ -540,46 +560,6 @@ protected void checkReplaceLabelsOnNode(
} }
} }
@SuppressWarnings("unchecked")
protected void internalReplaceLabelsOnNode(
Map<NodeId, Set<String>> replaceLabelsToNode) throws IOException {
// do replace labels to nodes
Map<NodeId, Set<String>> newNMToLabels = new HashMap<NodeId, Set<String>>();
for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();
createHostIfNonExisted(nodeId.getHost());
if (nodeId.getPort() == WILDCARD_PORT) {
Host host = nodeCollections.get(nodeId.getHost());
host.labels.clear();
host.labels.addAll(labels);
newNMToLabels.put(nodeId, host.labels);
} else {
createNodeIfNonExisted(nodeId);
Node nm = getNMInNodeSet(nodeId);
if (nm.labels == null) {
nm.labels = new HashSet<String>();
}
nm.labels.clear();
nm.labels.addAll(labels);
newNMToLabels.put(nodeId, nm.labels);
}
}
if (null != dispatcher) {
dispatcher.getEventHandler().handle(
new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
}
// shows node->labels we added
LOG.info("setLabelsToNode:");
for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
LOG.info(" NM=" + entry.getKey() + ", labels=["
+ StringUtils.join(entry.getValue().iterator(), ",") + "]");
}
}
/** /**
* replace labels to nodes * replace labels to nodes
* *
@ -591,7 +571,8 @@ public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode)
checkReplaceLabelsOnNode(replaceLabelsToNode); checkReplaceLabelsOnNode(replaceLabelsToNode);
internalReplaceLabelsOnNode(replaceLabelsToNode); internalUpdateLabelsOnNodes(replaceLabelsToNode,
NodeLabelUpdateOperation.REPLACE);
} }
/** /**

View File

@ -281,4 +281,42 @@ public void testTrimLabelsWhenModifyLabelsOnNodes() throws IOException {
mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet(" p2 "))); mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet(" p2 ")));
Assert.assertTrue(mgr.getNodeLabels().isEmpty()); Assert.assertTrue(mgr.getNodeLabels().isEmpty());
} }
@Test(timeout = 5000)
public void testReplaceLabelsOnHostsShouldUpdateNodesBelongTo()
throws IOException {
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p1", "p2")));
assertMapEquals(
mgr.getNodeLabels(),
ImmutableMap.of(toNodeId("n1"), toSet("p1", "p2")));
// Replace labels on n1:1 to P2
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p2"),
toNodeId("n1:2"), toSet("p2")));
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
toSet("p1", "p2"), toNodeId("n1:1"), toSet("p2"), toNodeId("n1:2"),
toSet("p2")));
// Replace labels on n1 to P1, both n1:1/n1 will be P1 now
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
toSet("p1"), toNodeId("n1:1"), toSet("p1"), toNodeId("n1:2"),
toSet("p1")));
// Set labels on n1:1 to P2 again to verify if add/remove works
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1:1"), toSet("p2")));
// Add p3 to n1, should makes n1:1 to be p2/p3, and n1:2 to be p1/p3
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
toSet("p1", "p3"), toNodeId("n1:1"), toSet("p2", "p3"),
toNodeId("n1:2"), toSet("p1", "p3")));
// Remove P3 from n1, should makes n1:1 to be p2, and n1:2 to be p1
mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p3")));
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
toSet("p1"), toNodeId("n1:1"), toSet("p2"), toNodeId("n1:2"),
toSet("p1")));
}
} }

View File

@ -37,7 +37,6 @@
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabel; import org.apache.hadoop.yarn.nodelabels.NodeLabel;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;