YARN-2494. Added NodeLabels Manager internal API and implementation. Contributed by Wangda Tan.

This commit is contained in:
Vinod Kumar Vavilapalli 2014-10-10 11:44:21 -07:00
parent cb81bac002
commit db7f165319
17 changed files with 2785 additions and 0 deletions

View File

@ -153,6 +153,9 @@ Release 2.6.0 - UNRELEASED
YARN-2544. Added admin-API objects for using node-labels. (Wangda Tan via
vinodkv)
YARN-2494. Added NodeLabels Manager internal API and implementation. (Wangda
Tan via vinodkv)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

View File

@ -1447,6 +1447,17 @@ public class YarnConfiguration extends Configuration {
public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy";
public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
.name();
public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels.";
/** URI for NodeLabelManager */
public static final String FS_NODE_LABELS_STORE_URI = NODE_LABELS_PREFIX
+ "fs-store.uri";
public static final String DEFAULT_FS_NODE_LABELS_STORE_URI = "file:///tmp/";
public static final String FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC =
NODE_LABELS_PREFIX + "fs-store.retry-policy-spec";
public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC =
"2000, 500";
public YarnConfiguration() {
super();

View File

@ -0,0 +1,722 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.nodelabels.event.StoreNewClusterNodeLabels;
import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEvent;
import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEventType;
import org.apache.hadoop.yarn.nodelabels.event.RemoveClusterNodeLabels;
import org.apache.hadoop.yarn.nodelabels.event.UpdateNodeToLabelsMappingsEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet;
public class CommonNodeLabelsManager extends AbstractService {
protected static final Log LOG = LogFactory.getLog(CommonNodeLabelsManager.class);
private static final int MAX_LABEL_LENGTH = 255;
public static final Set<String> EMPTY_STRING_SET = Collections
.unmodifiableSet(new HashSet<String>(0));
public static final String ANY = "*";
public static final Set<String> ACCESS_ANY_LABEL_SET = ImmutableSet.of(ANY);
private static final Pattern LABEL_PATTERN = Pattern
.compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*");
public static final int WILDCARD_PORT = 0;
/**
* If a user doesn't specify label of a queue or node, it belongs
* DEFAULT_LABEL
*/
public static final String NO_LABEL = "";
protected Dispatcher dispatcher;
protected ConcurrentMap<String, Label> labelCollections =
new ConcurrentHashMap<String, Label>();
protected ConcurrentMap<String, Host> nodeCollections =
new ConcurrentHashMap<String, Host>();
protected final ReadLock readLock;
protected final WriteLock writeLock;
protected NodeLabelsStore store;
protected static class Label {
public Resource resource;
protected Label() {
this.resource = Resource.newInstance(0, 0);
}
}
/**
* A <code>Host</code> can have multiple <code>Node</code>s
*/
protected static class Host {
public Set<String> labels;
public Map<NodeId, Node> nms;
protected Host() {
labels =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
nms = new ConcurrentHashMap<NodeId, Node>();
}
public Host copy() {
Host c = new Host();
c.labels = new HashSet<String>(labels);
for (Entry<NodeId, Node> entry : nms.entrySet()) {
c.nms.put(entry.getKey(), entry.getValue().copy());
}
return c;
}
}
protected static class Node {
public Set<String> labels;
public Resource resource;
public boolean running;
protected Node() {
labels = null;
resource = Resource.newInstance(0, 0);
running = false;
}
public Node copy() {
Node c = new Node();
if (labels != null) {
c.labels =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
} else {
c.labels = null;
}
c.resource = Resources.clone(resource);
c.running = running;
return c;
}
}
private final class ForwardingEventHandler implements
EventHandler<NodeLabelsStoreEvent> {
@Override
public void handle(NodeLabelsStoreEvent event) {
if (isInState(STATE.STARTED)) {
handleStoreEvent(event);
}
}
}
// Dispatcher related code
protected void handleStoreEvent(NodeLabelsStoreEvent event) {
try {
switch (event.getType()) {
case ADD_LABELS:
StoreNewClusterNodeLabels storeNewClusterNodeLabelsEvent =
(StoreNewClusterNodeLabels) event;
store.storeNewClusterNodeLabels(storeNewClusterNodeLabelsEvent
.getLabels());
break;
case REMOVE_LABELS:
RemoveClusterNodeLabels removeClusterNodeLabelsEvent =
(RemoveClusterNodeLabels) event;
store.removeClusterNodeLabels(removeClusterNodeLabelsEvent.getLabels());
break;
case STORE_NODE_TO_LABELS:
UpdateNodeToLabelsMappingsEvent updateNodeToLabelsMappingsEvent =
(UpdateNodeToLabelsMappingsEvent) event;
store.updateNodeToLabelsMappings(updateNodeToLabelsMappingsEvent
.getNodeToLabels());
break;
}
} catch (IOException e) {
LOG.error("Failed to store label modification to storage");
throw new YarnRuntimeException(e);
}
}
public CommonNodeLabelsManager() {
super(CommonNodeLabelsManager.class.getName());
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
// for UT purpose
protected void initDispatcher(Configuration conf) {
// create async handler
dispatcher = new AsyncDispatcher();
AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
asyncDispatcher.init(conf);
asyncDispatcher.setDrainEventsOnStop();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
initNodeLabelStore(conf);
labelCollections.put(NO_LABEL, new Label());
}
protected void initNodeLabelStore(Configuration conf) throws Exception {
this.store = new FileSystemNodeLabelsStore(this);
this.store.init(conf);
this.store.recover();
}
// for UT purpose
protected void startDispatcher() {
// start dispatcher
AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
asyncDispatcher.start();
}
@Override
protected void serviceStart() throws Exception {
// init dispatcher only when service start, because recover will happen in
// service init, we don't want to trigger any event handling at that time.
initDispatcher(getConfig());
dispatcher.register(NodeLabelsStoreEventType.class,
new ForwardingEventHandler());
startDispatcher();
}
// for UT purpose
protected void stopDispatcher() {
AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
asyncDispatcher.stop();
}
@Override
protected void serviceStop() throws Exception {
// finalize store
stopDispatcher();
store.close();
}
/**
* Add multiple node labels to repository
*
* @param labels
* new node labels added
*/
@SuppressWarnings("unchecked")
public void addToCluserNodeLabels(Set<String> labels) throws IOException {
if (null == labels || labels.isEmpty()) {
return;
}
labels = normalizeLabels(labels);
// do a check before actual adding them, will throw exception if any of them
// doesn't meet label name requirement
for (String label : labels) {
checkAndThrowLabelName(label);
}
for (String label : labels) {
this.labelCollections.put(label, new Label());
}
if (null != dispatcher) {
dispatcher.getEventHandler().handle(
new StoreNewClusterNodeLabels(labels));
}
LOG.info("Add labels: [" + StringUtils.join(labels.iterator(), ",") + "]");
}
protected void checkAddLabelsToNode(
Map<NodeId, Set<String>> addedLabelsToNode) throws IOException {
if (null == addedLabelsToNode || addedLabelsToNode.isEmpty()) {
return;
}
// check all labels being added existed
Set<String> knownLabels = labelCollections.keySet();
for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
if (!knownLabels.containsAll(entry.getValue())) {
String msg =
"Not all labels being added contained by known "
+ "label collections, please check" + ", added labels=["
+ StringUtils.join(entry.getValue(), ",") + "]";
LOG.error(msg);
throw new IOException(msg);
}
}
}
@SuppressWarnings("unchecked")
protected void internalAddLabelsToNode(
Map<NodeId, Set<String>> addedLabelsToNode) throws IOException {
// do add labels to nodes
Map<NodeId, Set<String>> newNMToLabels =
new HashMap<NodeId, Set<String>>();
for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();
createNodeIfNonExisted(entry.getKey());
if (nodeId.getPort() == WILDCARD_PORT) {
Host host = nodeCollections.get(nodeId.getHost());
host.labels.addAll(labels);
newNMToLabels.put(nodeId, host.labels);
} else {
Node nm = getNMInNodeSet(nodeId);
if (nm.labels == null) {
nm.labels = new HashSet<String>();
}
nm.labels.addAll(labels);
newNMToLabels.put(nodeId, nm.labels);
}
}
if (null != dispatcher) {
dispatcher.getEventHandler().handle(
new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
}
// shows node->labels we added
LOG.info("addLabelsToNode:");
for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
LOG.info(" NM=" + entry.getKey() + ", labels=["
+ StringUtils.join(entry.getValue().iterator(), ",") + "]");
}
}
/**
* add more labels to nodes
*
* @param addedLabelsToNode node -> labels map
*/
public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode)
throws IOException {
checkAddLabelsToNode(addedLabelsToNode);
internalAddLabelsToNode(addedLabelsToNode);
}
protected void checkRemoveFromClusterNodeLabels(
Collection<String> labelsToRemove) throws IOException {
if (null == labelsToRemove || labelsToRemove.isEmpty()) {
return;
}
// Check if label to remove doesn't existed or null/empty, will throw
// exception if any of labels to remove doesn't meet requirement
for (String label : labelsToRemove) {
label = normalizeLabel(label);
if (label == null || label.isEmpty()) {
throw new IOException("Label to be removed is null or empty");
}
if (!labelCollections.containsKey(label)) {
throw new IOException("Node label=" + label
+ " to be removed doesn't existed in cluster "
+ "node labels collection.");
}
}
}
@SuppressWarnings("unchecked")
protected void internalRemoveFromClusterNodeLabels(Collection<String> labelsToRemove) {
// remove labels from nodes
for (String nodeName : nodeCollections.keySet()) {
Host host = nodeCollections.get(nodeName);
if (null != host) {
host.labels.removeAll(labelsToRemove);
for (Node nm : host.nms.values()) {
if (nm.labels != null) {
nm.labels.removeAll(labelsToRemove);
}
}
}
}
// remove labels from node labels collection
for (String label : labelsToRemove) {
labelCollections.remove(label);
}
// create event to remove labels
if (null != dispatcher) {
dispatcher.getEventHandler().handle(
new RemoveClusterNodeLabels(labelsToRemove));
}
LOG.info("Remove labels: ["
+ StringUtils.join(labelsToRemove.iterator(), ",") + "]");
}
/**
* Remove multiple node labels from repository
*
* @param labelsToRemove
* node labels to remove
* @throws IOException
*/
public void removeFromClusterNodeLabels(Collection<String> labelsToRemove)
throws IOException {
checkRemoveFromClusterNodeLabels(labelsToRemove);
internalRemoveFromClusterNodeLabels(labelsToRemove);
}
protected void checkRemoveLabelsFromNode(
Map<NodeId, Set<String>> removeLabelsFromNode) throws IOException {
// check all labels being added existed
Set<String> knownLabels = labelCollections.keySet();
for (Entry<NodeId, Set<String>> entry : removeLabelsFromNode.entrySet()) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();
if (!knownLabels.containsAll(labels)) {
String msg =
"Not all labels being removed contained by known "
+ "label collections, please check" + ", removed labels=["
+ StringUtils.join(labels, ",") + "]";
LOG.error(msg);
throw new IOException(msg);
}
Set<String> originalLabels = null;
boolean nodeExisted = false;
if (WILDCARD_PORT != nodeId.getPort()) {
Node nm = getNMInNodeSet(nodeId);
if (nm != null) {
originalLabels = nm.labels;
nodeExisted = true;
}
} else {
Host host = nodeCollections.get(nodeId.getHost());
if (null != host) {
originalLabels = host.labels;
nodeExisted = true;
}
}
if (!nodeExisted) {
String msg =
"Try to remove labels from NM=" + nodeId
+ ", but the NM doesn't existed";
LOG.error(msg);
throw new IOException(msg);
}
if (labels == null || labels.isEmpty()) {
continue;
}
if (!originalLabels.containsAll(labels)) {
String msg =
"Try to remove labels = [" + StringUtils.join(labels, ",")
+ "], but not all labels contained by NM=" + nodeId;
LOG.error(msg);
throw new IOException(msg);
}
}
}
@SuppressWarnings("unchecked")
protected void internalRemoveLabelsFromNode(
Map<NodeId, Set<String>> removeLabelsFromNode) {
// do remove labels from nodes
Map<NodeId, Set<String>> newNMToLabels =
new HashMap<NodeId, Set<String>>();
for (Entry<NodeId, Set<String>> entry : removeLabelsFromNode.entrySet()) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();
if (nodeId.getPort() == WILDCARD_PORT) {
Host host = nodeCollections.get(nodeId.getHost());
host.labels.removeAll(labels);
newNMToLabels.put(nodeId, host.labels);
} else {
Node nm = getNMInNodeSet(nodeId);
if (nm.labels != null) {
nm.labels.removeAll(labels);
newNMToLabels.put(nodeId, nm.labels);
}
}
}
if (null != dispatcher) {
dispatcher.getEventHandler().handle(
new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
}
// shows node->labels we added
LOG.info("removeLabelsFromNode:");
for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
LOG.info(" NM=" + entry.getKey() + ", labels=["
+ StringUtils.join(entry.getValue().iterator(), ",") + "]");
}
}
/**
* remove labels from nodes, labels being removed most be contained by these
* nodes
*
* @param removeLabelsFromNode node -> labels map
*/
public void
removeLabelsFromNode(Map<NodeId, Set<String>> removeLabelsFromNode)
throws IOException {
checkRemoveLabelsFromNode(removeLabelsFromNode);
internalRemoveLabelsFromNode(removeLabelsFromNode);
}
protected void checkReplaceLabelsOnNode(
Map<NodeId, Set<String>> replaceLabelsToNode) throws IOException {
if (null == replaceLabelsToNode || replaceLabelsToNode.isEmpty()) {
return;
}
// check all labels being added existed
Set<String> knownLabels = labelCollections.keySet();
for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
if (!knownLabels.containsAll(entry.getValue())) {
String msg =
"Not all labels being replaced contained by known "
+ "label collections, please check" + ", new labels=["
+ StringUtils.join(entry.getValue(), ",") + "]";
LOG.error(msg);
throw new IOException(msg);
}
}
}
@SuppressWarnings("unchecked")
protected void internalReplaceLabelsOnNode(
Map<NodeId, Set<String>> replaceLabelsToNode) {
// do replace labels to nodes
Map<NodeId, Set<String>> newNMToLabels = new HashMap<NodeId, Set<String>>();
for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();
// update nodeCollections
createNodeIfNonExisted(entry.getKey());
if (nodeId.getPort() == WILDCARD_PORT) {
Host host = nodeCollections.get(nodeId.getHost());
host.labels.clear();
host.labels.addAll(labels);
newNMToLabels.put(nodeId, host.labels);
} else {
Node nm = getNMInNodeSet(nodeId);
if (nm.labels == null) {
nm.labels = new HashSet<String>();
}
nm.labels.clear();
nm.labels.addAll(labels);
newNMToLabels.put(nodeId, nm.labels);
}
}
if (null != dispatcher) {
dispatcher.getEventHandler().handle(
new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
}
// shows node->labels we added
LOG.info("setLabelsToNode:");
for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
LOG.info(" NM=" + entry.getKey() + ", labels=["
+ StringUtils.join(entry.getValue().iterator(), ",") + "]");
}
}
/**
* replace labels to nodes
*
* @param replaceLabelsToNode node -> labels map
*/
public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode)
throws IOException {
checkReplaceLabelsOnNode(replaceLabelsToNode);
internalReplaceLabelsOnNode(replaceLabelsToNode);
}
/**
* Get mapping of nodes to labels
*
* @return nodes to labels map
*/
public Map<NodeId, Set<String>> getNodeLabels() {
try {
readLock.lock();
Map<NodeId, Set<String>> nodeToLabels =
new HashMap<NodeId, Set<String>>();
for (Entry<String, Host> entry : nodeCollections.entrySet()) {
String hostName = entry.getKey();
Host host = entry.getValue();
for (NodeId nodeId : host.nms.keySet()) {
Set<String> nodeLabels = getLabelsByNode(nodeId);
if (nodeLabels == null || nodeLabels.isEmpty()) {
continue;
}
nodeToLabels.put(nodeId, nodeLabels);
}
if (!host.labels.isEmpty()) {
nodeToLabels
.put(NodeId.newInstance(hostName, WILDCARD_PORT), host.labels);
}
}
return Collections.unmodifiableMap(nodeToLabels);
} finally {
readLock.unlock();
}
}
/**
* Get existing valid labels in repository
*
* @return existing valid labels in repository
*/
public Set<String> getClusterNodeLabels() {
try {
readLock.lock();
Set<String> labels = new HashSet<String>(labelCollections.keySet());
labels.remove(NO_LABEL);
return Collections.unmodifiableSet(labels);
} finally {
readLock.unlock();
}
}
private void checkAndThrowLabelName(String label) throws IOException {
if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) {
throw new IOException("label added is empty or exceeds "
+ MAX_LABEL_LENGTH + " character(s)");
}
label = label.trim();
boolean match = LABEL_PATTERN.matcher(label).matches();
if (!match) {
throw new IOException("label name should only contains "
+ "{0-9, a-z, A-Z, -, _} and should not started with {-,_}"
+ ", now it is=" + label);
}
}
protected String normalizeLabel(String label) {
if (label != null) {
return label.trim();
}
return NO_LABEL;
}
private Set<String> normalizeLabels(Set<String> labels) {
Set<String> newLabels = new HashSet<String>();
for (String label : labels) {
newLabels.add(normalizeLabel(label));
}
return newLabels;
}
protected Node getNMInNodeSet(NodeId nodeId) {
return getNMInNodeSet(nodeId, nodeCollections);
}
protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map) {
return getNMInNodeSet(nodeId, map, false);
}
protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map,
boolean checkRunning) {
if (WILDCARD_PORT == nodeId.getPort()) {
return null;
}
Host host = map.get(nodeId.getHost());
if (null == host) {
return null;
}
Node nm = host.nms.get(nodeId);
if (null == nm) {
return null;
}
if (checkRunning) {
return nm.running ? nm : null;
}
return nm;
}
protected Set<String> getLabelsByNode(NodeId nodeId) {
return getLabelsByNode(nodeId, nodeCollections);
}
protected Set<String> getLabelsByNode(NodeId nodeId, Map<String, Host> map) {
Host host = map.get(nodeId.getHost());
if (null == host) {
return EMPTY_STRING_SET;
}
Node nm = host.nms.get(nodeId);
if (null != nm && null != nm.labels) {
return nm.labels;
} else {
return host.labels;
}
}
protected void createNodeIfNonExisted(NodeId nodeId) {
Host host = nodeCollections.get(nodeId.getHost());
if (null == host) {
host = new Host();
nodeCollections.put(nodeId.getHost(), host);
}
if (nodeId.getPort() != WILDCARD_PORT) {
Node nm = host.nms.get(nodeId);
if (null == nm) {
host.nms.put(nodeId, new Node());
}
}
}
}

View File

@ -0,0 +1,255 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels;
import java.io.EOFException;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;
import com.google.common.collect.Sets;
public class FileSystemNodeLabelsStore extends NodeLabelsStore {
public FileSystemNodeLabelsStore(CommonNodeLabelsManager mgr) {
super(mgr);
}
protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class);
protected static final String ROOT_DIR_NAME = "FSNodeLabelManagerRoot";
protected static final String MIRROR_FILENAME = "nodelabel.mirror";
protected static final String EDITLOG_FILENAME = "nodelabel.editlog";
protected enum SerializedLogType {
ADD_LABELS, NODE_TO_LABELS, REMOVE_LABELS
}
Path fsWorkingPath;
Path rootDirPath;
FileSystem fs;
FSDataOutputStream editlogOs;
Path editLogPath;
@Override
public void init(Configuration conf) throws Exception {
fsWorkingPath =
new Path(conf.get(YarnConfiguration.FS_NODE_LABELS_STORE_URI,
YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_URI));
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
setFileSystem(conf);
// mkdir of root dir path
fs.mkdirs(rootDirPath);
}
@Override
public void close() throws IOException {
try {
fs.close();
editlogOs.close();
} catch (IOException e) {
LOG.warn("Exception happened whiling shutting down,", e);
}
}
private void setFileSystem(Configuration conf) throws IOException {
Configuration confCopy = new Configuration(conf);
confCopy.setBoolean("dfs.client.retry.policy.enabled", true);
String retryPolicy =
confCopy.get(YarnConfiguration.FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC,
YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC);
confCopy.set("dfs.client.retry.policy.spec", retryPolicy);
fs = fsWorkingPath.getFileSystem(confCopy);
// if it's local file system, use RawLocalFileSystem instead of
// LocalFileSystem, the latter one doesn't support append.
if (fs.getScheme().equals("file")) {
fs = ((LocalFileSystem)fs).getRaw();
}
}
private void ensureAppendEditlogFile() throws IOException {
editlogOs = fs.append(editLogPath);
}
private void ensureCloseEditlogFile() throws IOException {
editlogOs.close();
}
@Override
public void updateNodeToLabelsMappings(
Map<NodeId, Set<String>> nodeToLabels) throws IOException {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal());
((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
.newInstance(nodeToLabels)).getProto().writeDelimitedTo(editlogOs);
ensureCloseEditlogFile();
}
@Override
public void storeNewClusterNodeLabels(Set<String> labels)
throws IOException {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal());
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest.newInstance(labels)).getProto()
.writeDelimitedTo(editlogOs);
ensureCloseEditlogFile();
}
@Override
public void removeClusterNodeLabels(Collection<String> labels)
throws IOException {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal());
((RemoveFromClusterNodeLabelsRequestPBImpl) RemoveFromClusterNodeLabelsRequest.newInstance(Sets
.newHashSet(labels.iterator()))).getProto().writeDelimitedTo(editlogOs);
ensureCloseEditlogFile();
}
@Override
public void recover() throws IOException {
/*
* Steps of recover
* 1) Read from last mirror (from mirror or mirror.old)
* 2) Read from last edit log, and apply such edit log
* 3) Write new mirror to mirror.writing
* 4) Rename mirror to mirror.old
* 5) Move mirror.writing to mirror
* 6) Remove mirror.old
* 7) Remove edit log and create a new empty edit log
*/
// Open mirror from serialized file
Path mirrorPath = new Path(rootDirPath, MIRROR_FILENAME);
Path oldMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".old");
FSDataInputStream is = null;
if (fs.exists(mirrorPath)) {
is = fs.open(mirrorPath);
} else if (fs.exists(oldMirrorPath)) {
is = fs.open(oldMirrorPath);
}
if (null != is) {
Set<String> labels =
new AddToClusterNodeLabelsRequestPBImpl(
AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)).getNodeLabels();
Map<NodeId, Set<String>> nodeToLabels =
new ReplaceLabelsOnNodeRequestPBImpl(
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
.getNodeToLabels();
mgr.addToCluserNodeLabels(labels);
mgr.replaceLabelsOnNode(nodeToLabels);
is.close();
}
// Open and process editlog
editLogPath = new Path(rootDirPath, EDITLOG_FILENAME);
if (fs.exists(editLogPath)) {
is = fs.open(editLogPath);
while (true) {
try {
// read edit log one by one
SerializedLogType type = SerializedLogType.values()[is.readInt()];
switch (type) {
case ADD_LABELS: {
Collection<String> labels =
AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)
.getNodeLabelsList();
mgr.addToCluserNodeLabels(Sets.newHashSet(labels.iterator()));
break;
}
case REMOVE_LABELS: {
Collection<String> labels =
RemoveFromClusterNodeLabelsRequestProto.parseDelimitedFrom(is)
.getNodeLabelsList();
mgr.removeFromClusterNodeLabels(labels);
break;
}
case NODE_TO_LABELS: {
Map<NodeId, Set<String>> map =
new ReplaceLabelsOnNodeRequestPBImpl(
ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
.getNodeToLabels();
mgr.replaceLabelsOnNode(map);
break;
}
}
} catch (EOFException e) {
// EOF hit, break
break;
}
}
}
// Serialize current mirror to mirror.writing
Path writingMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".writing");
FSDataOutputStream os = fs.create(writingMirrorPath, true);
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequestPBImpl
.newInstance(mgr.getClusterNodeLabels())).getProto().writeDelimitedTo(os);
((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
.newInstance(mgr.getNodeLabels())).getProto().writeDelimitedTo(os);
os.close();
// Move mirror to mirror.old
if (fs.exists(mirrorPath)) {
fs.delete(oldMirrorPath, false);
fs.rename(mirrorPath, oldMirrorPath);
}
// move mirror.writing to mirror
fs.rename(writingMirrorPath, mirrorPath);
fs.delete(writingMirrorPath, false);
// remove mirror.old
fs.delete(oldMirrorPath, false);
// create a new editlog file
editlogOs = fs.create(editLogPath, true);
editlogOs.close();
LOG.info("Finished write mirror at:" + mirrorPath.toString());
LOG.info("Finished create editlog file at:" + editLogPath.toString());
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
public abstract class NodeLabelsStore implements Closeable {
protected final CommonNodeLabelsManager mgr;
protected Configuration conf;
public NodeLabelsStore(CommonNodeLabelsManager mgr) {
this.mgr = mgr;
}
/**
* Store node -> label
*/
public abstract void updateNodeToLabelsMappings(
Map<NodeId, Set<String>> nodeToLabels) throws IOException;
/**
* Store new labels
*/
public abstract void storeNewClusterNodeLabels(Set<String> label)
throws IOException;
/**
* Remove labels
*/
public abstract void removeClusterNodeLabels(Collection<String> labels)
throws IOException;
/**
* Recover labels and node to labels mappings from store
* @param conf
*/
public abstract void recover() throws IOException;
public void init(Configuration conf) throws Exception {
this.conf = conf;
}
public CommonNodeLabelsManager getNodeLabelsManager() {
return mgr;
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
public class NodeLabelsStoreEvent extends
AbstractEvent<NodeLabelsStoreEventType> {
public NodeLabelsStoreEvent(NodeLabelsStoreEventType type) {
super(type);
}
}

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.event;
public enum NodeLabelsStoreEventType {
REMOVE_LABELS,
ADD_LABELS,
STORE_NODE_TO_LABELS
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.event;
import java.util.Collection;
public class RemoveClusterNodeLabels extends NodeLabelsStoreEvent {
private Collection<String> labels;
public RemoveClusterNodeLabels(Collection<String> labels) {
super(NodeLabelsStoreEventType.REMOVE_LABELS);
this.labels = labels;
}
public Collection<String> getLabels() {
return labels;
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.event;
import java.util.Set;
public class StoreNewClusterNodeLabels extends NodeLabelsStoreEvent {
private Set<String> labels;
public StoreNewClusterNodeLabels(Set<String> labels) {
super(NodeLabelsStoreEventType.ADD_LABELS);
this.labels = labels;
}
public Set<String> getLabels() {
return labels;
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels.event;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeId;
public class UpdateNodeToLabelsMappingsEvent extends NodeLabelsStoreEvent {
private Map<NodeId, Set<String>> nodeToLabels;
public UpdateNodeToLabelsMappingsEvent(Map<NodeId, Set<String>> nodeToLabels) {
super(NodeLabelsStoreEventType.STORE_NODE_TO_LABELS);
this.nodeToLabels = nodeToLabels;
}
public Map<NodeId, Set<String>> getNodeToLabels() {
return nodeToLabels;
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.InlineDispatcher;
public class DummyCommonNodeLabelsManager extends CommonNodeLabelsManager {
Map<NodeId, Set<String>> lastNodeToLabels = null;
Collection<String> lastAddedlabels = null;
Collection<String> lastRemovedlabels = null;
@Override
public void initNodeLabelStore(Configuration conf) {
this.store = new NodeLabelsStore(this) {
@Override
public void recover() throws IOException {
}
@Override
public void removeClusterNodeLabels(Collection<String> labels)
throws IOException {
lastRemovedlabels = labels;
}
@Override
public void updateNodeToLabelsMappings(
Map<NodeId, Set<String>> nodeToLabels) throws IOException {
lastNodeToLabels = nodeToLabels;
}
@Override
public void storeNewClusterNodeLabels(Set<String> label) throws IOException {
lastAddedlabels = label;
}
@Override
public void close() throws IOException {
// do nothing
}
};
}
@Override
protected void initDispatcher(Configuration conf) {
super.dispatcher = new InlineDispatcher();
}
@Override
protected void startDispatcher() {
// do nothing
}
@Override
protected void stopDispatcher() {
// do nothing
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.junit.Assert;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
public class NodeLabelTestBase {
public static void assertMapEquals(Map<NodeId, Set<String>> m1,
ImmutableMap<NodeId, Set<String>> m2) {
Assert.assertEquals(m1.size(), m2.size());
for (NodeId k : m1.keySet()) {
Assert.assertTrue(m2.containsKey(k));
assertCollectionEquals(m1.get(k), m2.get(k));
}
}
public static void assertMapContains(Map<NodeId, Set<String>> m1,
ImmutableMap<NodeId, Set<String>> m2) {
for (NodeId k : m2.keySet()) {
Assert.assertTrue(m1.containsKey(k));
assertCollectionEquals(m1.get(k), m2.get(k));
}
}
public static void assertCollectionEquals(Collection<String> c1,
Collection<String> c2) {
Assert.assertEquals(c1.size(), c2.size());
Iterator<String> i1 = c1.iterator();
Iterator<String> i2 = c2.iterator();
while (i1.hasNext()) {
Assert.assertEquals(i1.next(), i2.next());
}
}
public static <E> Set<E> toSet(E... elements) {
Set<E> set = Sets.newHashSet(elements);
return set;
}
public NodeId toNodeId(String str) {
if (str.contains(":")) {
int idx = str.indexOf(':');
NodeId id =
NodeId.newInstance(str.substring(0, idx),
Integer.valueOf(str.substring(idx + 1)));
return id;
} else {
return NodeId.newInstance(str, CommonNodeLabelsManager.WILDCARD_PORT);
}
}
}

View File

@ -0,0 +1,261 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
public class TestCommonNodeLabelsManager extends NodeLabelTestBase {
DummyCommonNodeLabelsManager mgr = null;
@Before
public void before() {
mgr = new DummyCommonNodeLabelsManager();
mgr.init(new Configuration());
mgr.start();
}
@After
public void after() {
mgr.stop();
}
@Test(timeout = 5000)
public void testAddRemovelabel() throws Exception {
// Add some label
mgr.addToCluserNodeLabels(ImmutableSet.of("hello"));
assertCollectionEquals(mgr.lastAddedlabels, Arrays.asList("hello"));
mgr.addToCluserNodeLabels(ImmutableSet.of("world"));
mgr.addToCluserNodeLabels(toSet("hello1", "world1"));
assertCollectionEquals(mgr.lastAddedlabels,
Sets.newHashSet("hello1", "world1"));
Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
Sets.newHashSet("hello", "world", "hello1", "world1")));
// try to remove null, empty and non-existed label, should fail
for (String p : Arrays.asList(null, CommonNodeLabelsManager.NO_LABEL, "xx")) {
boolean caught = false;
try {
mgr.removeFromClusterNodeLabels(Arrays.asList(p));
} catch (IOException e) {
caught = true;
}
Assert.assertTrue("remove label should fail "
+ "when label is null/empty/non-existed", caught);
}
// Remove some label
mgr.removeFromClusterNodeLabels(Arrays.asList("hello"));
assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("hello"));
Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
Arrays.asList("world", "hello1", "world1")));
mgr.removeFromClusterNodeLabels(Arrays
.asList("hello1", "world1", "world"));
Assert.assertTrue(mgr.lastRemovedlabels.containsAll(Sets.newHashSet(
"hello1", "world1", "world")));
Assert.assertTrue(mgr.getClusterNodeLabels().isEmpty());
}
@Test(timeout = 5000)
public void testAddlabelWithCase() throws Exception {
// Add some label, case will not ignore here
mgr.addToCluserNodeLabels(ImmutableSet.of("HeLlO"));
assertCollectionEquals(mgr.lastAddedlabels, Arrays.asList("HeLlO"));
Assert.assertFalse(mgr.getClusterNodeLabels().containsAll(Arrays.asList("hello")));
}
@Test(timeout = 5000)
public void testAddInvalidlabel() throws IOException {
boolean caught = false;
try {
Set<String> set = new HashSet<String>();
set.add(null);
mgr.addToCluserNodeLabels(set);
} catch (IOException e) {
caught = true;
}
Assert.assertTrue("null label should not add to repo", caught);
caught = false;
try {
mgr.addToCluserNodeLabels(ImmutableSet.of(CommonNodeLabelsManager.NO_LABEL));
} catch (IOException e) {
caught = true;
}
Assert.assertTrue("empty label should not add to repo", caught);
caught = false;
try {
mgr.addToCluserNodeLabels(ImmutableSet.of("-?"));
} catch (IOException e) {
caught = true;
}
Assert.assertTrue("invalid label charactor should not add to repo", caught);
caught = false;
try {
mgr.addToCluserNodeLabels(ImmutableSet.of(StringUtils.repeat("c", 257)));
} catch (IOException e) {
caught = true;
}
Assert.assertTrue("too long label should not add to repo", caught);
caught = false;
try {
mgr.addToCluserNodeLabels(ImmutableSet.of("-aaabbb"));
} catch (IOException e) {
caught = true;
}
Assert.assertTrue("label cannot start with \"-\"", caught);
caught = false;
try {
mgr.addToCluserNodeLabels(ImmutableSet.of("_aaabbb"));
} catch (IOException e) {
caught = true;
}
Assert.assertTrue("label cannot start with \"_\"", caught);
caught = false;
try {
mgr.addToCluserNodeLabels(ImmutableSet.of("a^aabbb"));
} catch (IOException e) {
caught = true;
}
Assert.assertTrue("label cannot contains other chars like ^[] ...", caught);
caught = false;
try {
mgr.addToCluserNodeLabels(ImmutableSet.of("aa[a]bbb"));
} catch (IOException e) {
caught = true;
}
Assert.assertTrue("label cannot contains other chars like ^[] ...", caught);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(timeout = 5000)
public void testAddReplaceRemoveLabelsOnNodes() throws Exception {
// set a label on a node, but label doesn't exist
boolean caught = false;
try {
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("node"), toSet("label")));
} catch (IOException e) {
caught = true;
}
Assert.assertTrue("trying to set a label to a node but "
+ "label doesn't exist in repository should fail", caught);
// set a label on a node, but node is null or empty
try {
mgr.replaceLabelsOnNode(ImmutableMap.of(
toNodeId(CommonNodeLabelsManager.NO_LABEL), toSet("label")));
} catch (IOException e) {
caught = true;
}
Assert.assertTrue("trying to add a empty node but succeeded", caught);
// set node->label one by one
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2")));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2"), toSet("p3")));
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
toSet("p2"), toNodeId("n2"), toSet("p3")));
assertMapEquals(mgr.lastNodeToLabels,
ImmutableMap.of(toNodeId("n2"), toSet("p3")));
// set bunch of node->label
mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
toNodeId("n1"), toSet("p1")));
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
toSet("p1"), toNodeId("n2"), toSet("p3"), toNodeId("n3"), toSet("p3")));
assertMapEquals(mgr.lastNodeToLabels, ImmutableMap.of(toNodeId("n3"),
toSet("p3"), toNodeId("n1"), toSet("p1")));
/*
* n1: p1
* n2: p3
* n3: p3
*/
// remove label on node
mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
toSet("p3"), toNodeId("n3"), toSet("p3")));
assertMapEquals(mgr.lastNodeToLabels,
ImmutableMap.of(toNodeId("n1"), CommonNodeLabelsManager.EMPTY_STRING_SET));
// add label on node
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
toNodeId("n2"), toSet("p2")));
assertMapEquals(
mgr.getNodeLabels(),
ImmutableMap.of(toNodeId("n1"), toSet("p1"), toNodeId("n2"),
toSet("p2", "p3"), toNodeId("n3"), toSet("p3")));
assertMapEquals(mgr.lastNodeToLabels,
ImmutableMap.of(toNodeId("n1"), toSet("p1"), toNodeId("n2"),
toSet("p2", "p3")));
// remove labels on node
mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
toNodeId("n2"), toSet("p2", "p3"), toNodeId("n3"), toSet("p3")));
Assert.assertEquals(0, mgr.getNodeLabels().size());
assertMapEquals(mgr.lastNodeToLabels, ImmutableMap.of(toNodeId("n1"),
CommonNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n2"),
CommonNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n3"),
CommonNodeLabelsManager.EMPTY_STRING_SET));
}
@Test(timeout = 5000)
public void testRemovelabelWithNodes() throws Exception {
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2"), toSet("p2")));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n3"), toSet("p3")));
mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1"));
assertMapEquals(mgr.getNodeLabels(),
ImmutableMap.of(toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3")));
assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p1"));
mgr.removeFromClusterNodeLabels(ImmutableSet.of("p2", "p3"));
Assert.assertTrue(mgr.getNodeLabels().isEmpty());
Assert.assertTrue(mgr.getClusterNodeLabels().isEmpty());
assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p2", "p3"));
}
}

View File

@ -0,0 +1,252 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.nodelabels;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase {
MockNodeLabelManager mgr = null;
Configuration conf = null;
private static class MockNodeLabelManager extends
CommonNodeLabelsManager {
@Override
protected void initDispatcher(Configuration conf) {
super.dispatcher = new InlineDispatcher();
}
@Override
protected void startDispatcher() {
// do nothing
}
@Override
protected void stopDispatcher() {
// do nothing
}
}
private FileSystemNodeLabelsStore getStore() {
return (FileSystemNodeLabelsStore) mgr.store;
}
@Before
public void before() throws IOException {
mgr = new MockNodeLabelManager();
conf = new Configuration();
File tempDir = File.createTempFile("nlb", ".tmp");
tempDir.delete();
tempDir.mkdirs();
tempDir.deleteOnExit();
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_URI,
tempDir.getAbsolutePath());
mgr.init(conf);
mgr.start();
}
@After
public void after() throws IOException {
getStore().fs.delete(getStore().rootDirPath, true);
mgr.stop();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(timeout = 10000)
public void testRecoverWithMirror() throws Exception {
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
mgr.addToCluserNodeLabels(toSet("p4"));
mgr.addToCluserNodeLabels(toSet("p5", "p6"));
mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n1"), toSet("p1"),
toNodeId("n2"), toSet("p2")));
mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"),
toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6")));
/*
* node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7
*/
mgr.removeFromClusterNodeLabels(toSet("p1"));
mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5"));
/*
* After removed p2: n2 p4: n4 p6: n6, n7
*/
// shutdown mgr and start a new mgr
mgr.stop();
mgr = new MockNodeLabelManager();
mgr.init(conf);
// check variables
Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
Arrays.asList("p2", "p4", "p6")));
assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
toNodeId("n7"), toSet("p6")));
// stutdown mgr and start a new mgr
mgr.stop();
mgr = new MockNodeLabelManager();
mgr.init(conf);
// check variables
Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
Arrays.asList("p2", "p4", "p6")));
assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
toNodeId("n7"), toSet("p6")));
mgr.stop();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(timeout = 10000)
public void testEditlogRecover() throws Exception {
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
mgr.addToCluserNodeLabels(toSet("p4"));
mgr.addToCluserNodeLabels(toSet("p5", "p6"));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
toNodeId("n2"), toSet("p2")));
mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"),
toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6")));
/*
* node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7
*/
mgr.removeFromClusterNodeLabels(toSet("p1"));
mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5"));
/*
* After removed p2: n2 p4: n4 p6: n6, n7
*/
// shutdown mgr and start a new mgr
mgr.stop();
mgr = new MockNodeLabelManager();
mgr.init(conf);
// check variables
Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
Arrays.asList("p2", "p4", "p6")));
assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
toNodeId("n7"), toSet("p6")));
mgr.stop();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test//(timeout = 10000)
public void testSerilizationAfterRecovery() throws Exception {
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
mgr.addToCluserNodeLabels(toSet("p4"));
mgr.addToCluserNodeLabels(toSet("p5", "p6"));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
toNodeId("n2"), toSet("p2")));
mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"),
toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6")));
/*
* node -> labels
* p1: n1
* p2: n2
* p3: n3
* p4: n4
* p5: n5
* p6: n6, n7
*/
mgr.removeFromClusterNodeLabels(toSet("p1"));
mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5"));
/*
* After removed
* p2: n2
* p4: n4
* p6: n6, n7
*/
// shutdown mgr and start a new mgr
mgr.stop();
mgr = new MockNodeLabelManager();
mgr.init(conf);
mgr.start();
// check variables
Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
Arrays.asList("p2", "p4", "p6")));
assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
toNodeId("n7"), toSet("p6")));
/*
* Add label p7,p8 then shutdown
*/
mgr = new MockNodeLabelManager();
mgr.init(conf);
mgr.start();
mgr.addToCluserNodeLabels(toSet("p7", "p8"));
mgr.stop();
/*
* Restart, add label p9 and shutdown
*/
mgr = new MockNodeLabelManager();
mgr.init(conf);
mgr.start();
mgr.addToCluserNodeLabels(toSet("p9"));
mgr.stop();
/*
* Recovery, and see if p9 added
*/
mgr = new MockNodeLabelManager();
mgr.init(conf);
mgr.start();
// check variables
Assert.assertEquals(6, mgr.getClusterNodeLabels().size());
Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
Arrays.asList("p2", "p4", "p6", "p7", "p8", "p9")));
mgr.stop();
}
}

View File

@ -0,0 +1,447 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.ImmutableSet;
public class RMNodeLabelsManager extends CommonNodeLabelsManager {
protected static class Queue {
protected Set<String> acccessibleNodeLabels;
protected Resource resource;
protected Queue() {
acccessibleNodeLabels =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
resource = Resource.newInstance(0, 0);
}
}
ConcurrentMap<String, Queue> queueCollections =
new ConcurrentHashMap<String, Queue>();
protected AccessControlList adminAcl;
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
adminAcl =
new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
}
@Override
public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode)
throws IOException {
try {
writeLock.lock();
// get nodesCollection before edition
Map<String, Host> before = cloneNodeMap(addedLabelsToNode.keySet());
super.addLabelsToNode(addedLabelsToNode);
// get nodesCollection after edition
Map<String, Host> after = cloneNodeMap(addedLabelsToNode.keySet());
// update running nodes resources
updateResourceMappings(before, after);
} finally {
writeLock.unlock();
}
}
protected void checkRemoveFromClusterNodeLabelsOfQueue(
Collection<String> labelsToRemove) throws IOException {
// Check if label to remove doesn't existed or null/empty, will throw
// exception if any of labels to remove doesn't meet requirement
for (String label : labelsToRemove) {
label = normalizeLabel(label);
// check if any queue contains this label
for (Entry<String, Queue> entry : queueCollections.entrySet()) {
String queueName = entry.getKey();
Set<String> queueLabels = entry.getValue().acccessibleNodeLabels;
if (queueLabels.contains(label)) {
throw new IOException("Cannot remove label=" + label
+ ", because queue=" + queueName + " is using this label. "
+ "Please remove label on queue before remove the label");
}
}
}
}
@Override
public void removeFromClusterNodeLabels(Collection<String> labelsToRemove)
throws IOException {
try {
writeLock.lock();
checkRemoveFromClusterNodeLabelsOfQueue(labelsToRemove);
// copy before NMs
Map<String, Host> before = cloneNodeMap();
super.removeFromClusterNodeLabels(labelsToRemove);
updateResourceMappings(before, nodeCollections);
} finally {
writeLock.unlock();
}
}
@Override
public void
removeLabelsFromNode(Map<NodeId, Set<String>> removeLabelsFromNode)
throws IOException {
try {
writeLock.lock();
// get nodesCollection before edition
Map<String, Host> before =
cloneNodeMap(removeLabelsFromNode.keySet());
super.removeLabelsFromNode(removeLabelsFromNode);
// get nodesCollection before edition
Map<String, Host> after = cloneNodeMap(removeLabelsFromNode.keySet());
// update running nodes resources
updateResourceMappings(before, after);
} finally {
writeLock.unlock();
}
}
@Override
public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode)
throws IOException {
try {
writeLock.lock();
// get nodesCollection before edition
Map<String, Host> before = cloneNodeMap(replaceLabelsToNode.keySet());
super.replaceLabelsOnNode(replaceLabelsToNode);
// get nodesCollection after edition
Map<String, Host> after = cloneNodeMap(replaceLabelsToNode.keySet());
// update running nodes resources
updateResourceMappings(before, after);
} finally {
writeLock.unlock();
}
}
/*
* Following methods are used for setting if a node is up and running, and it
* will update running nodes resource
*/
public void activateNode(NodeId nodeId, Resource resource) {
try {
writeLock.lock();
// save if we have a node before
Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
createNodeIfNonExisted(nodeId);
Node nm = getNMInNodeSet(nodeId);
nm.resource = resource;
nm.running = true;
// get the node after edition
Map<String, Host> after = cloneNodeMap(ImmutableSet.of(nodeId));
updateResourceMappings(before, after);
} finally {
writeLock.unlock();
}
}
/*
* Following methods are used for setting if a node unregistered to RM
*/
public void deactivateNode(NodeId nodeId) {
try {
writeLock.lock();
// save if we have a node before
Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
Node nm = getNMInNodeSet(nodeId);
if (null != nm) {
// set nm is not running, and its resource = 0
nm.running = false;
nm.resource = Resource.newInstance(0, 0);
}
// get the node after edition
Map<String, Host> after = cloneNodeMap(ImmutableSet.of(nodeId));
updateResourceMappings(before, after);
} finally {
writeLock.unlock();
}
}
public void updateNodeResource(NodeId node, Resource newResource) {
deactivateNode(node);
activateNode(node, newResource);
}
public void reinitializeQueueLabels(Map<String, Set<String>> queueToLabels) {
try {
writeLock.lock();
// clear before set
this.queueCollections.clear();
for (Entry<String, Set<String>> entry : queueToLabels.entrySet()) {
String queue = entry.getKey();
Queue q = new Queue();
this.queueCollections.put(queue, q);
Set<String> labels = entry.getValue();
if (labels.contains(ANY)) {
continue;
}
q.acccessibleNodeLabels.addAll(labels);
for (Host host : nodeCollections.values()) {
for (Entry<NodeId, Node> nentry : host.nms.entrySet()) {
NodeId nodeId = nentry.getKey();
Node nm = nentry.getValue();
if (nm.running && isNodeUsableByQueue(getLabelsByNode(nodeId), q)) {
Resources.addTo(q.resource, nm.resource);
}
}
}
}
} finally {
writeLock.unlock();
}
}
public Resource getQueueResource(String queueName, Set<String> queueLabels,
Resource clusterResource) {
try {
readLock.lock();
if (queueLabels.contains(ANY)) {
return clusterResource;
}
Queue q = queueCollections.get(queueName);
if (null == q) {
return Resources.none();
}
return q.resource;
} finally {
readLock.unlock();
}
}
public Set<String> getLabelsOnNode(NodeId nodeId) {
try {
readLock.lock();
Set<String> nodeLabels = getLabelsByNode(nodeId);
return Collections.unmodifiableSet(nodeLabels);
} finally {
readLock.unlock();
}
}
public boolean containsNodeLabel(String label) {
try {
readLock.lock();
return label != null
&& (label.isEmpty() || labelCollections.containsKey(label));
} finally {
readLock.unlock();
}
}
private Map<String, Host> cloneNodeMap(Set<NodeId> nodesToCopy) {
Map<String, Host> map = new HashMap<String, Host>();
for (NodeId nodeId : nodesToCopy) {
if (!map.containsKey(nodeId.getHost())) {
Host originalN = nodeCollections.get(nodeId.getHost());
if (null == originalN) {
continue;
}
Host n = originalN.copy();
n.nms.clear();
map.put(nodeId.getHost(), n);
}
Host n = map.get(nodeId.getHost());
if (WILDCARD_PORT == nodeId.getPort()) {
for (Entry<NodeId, Node> entry : nodeCollections
.get(nodeId.getHost()).nms.entrySet()) {
n.nms.put(entry.getKey(), entry.getValue().copy());
}
} else {
Node nm = getNMInNodeSet(nodeId);
if (null != nm) {
n.nms.put(nodeId, nm.copy());
}
}
}
return map;
}
private void updateResourceMappings(Map<String, Host> before,
Map<String, Host> after) {
// Get NMs in before only
Set<NodeId> allNMs = new HashSet<NodeId>();
for (Entry<String, Host> entry : before.entrySet()) {
allNMs.addAll(entry.getValue().nms.keySet());
}
for (Entry<String, Host> entry : after.entrySet()) {
allNMs.addAll(entry.getValue().nms.keySet());
}
// traverse all nms
for (NodeId nodeId : allNMs) {
Node oldNM;
if ((oldNM = getNMInNodeSet(nodeId, before, true)) != null) {
Set<String> oldLabels = getLabelsByNode(nodeId, before);
// no label in the past
if (oldLabels.isEmpty()) {
// update labels
Label label = labelCollections.get(NO_LABEL);
Resources.subtractFrom(label.resource, oldNM.resource);
// update queues, all queue can access this node
for (Queue q : queueCollections.values()) {
Resources.subtractFrom(q.resource, oldNM.resource);
}
} else {
// update labels
for (String labelName : oldLabels) {
Label label = labelCollections.get(labelName);
if (null == label) {
continue;
}
Resources.subtractFrom(label.resource, oldNM.resource);
}
// update queues, only queue can access this node will be subtract
for (Queue q : queueCollections.values()) {
if (isNodeUsableByQueue(oldLabels, q)) {
Resources.subtractFrom(q.resource, oldNM.resource);
}
}
}
}
Node newNM;
if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) {
Set<String> newLabels = getLabelsByNode(nodeId, after);
// no label in the past
if (newLabels.isEmpty()) {
// update labels
Label label = labelCollections.get(NO_LABEL);
Resources.addTo(label.resource, newNM.resource);
// update queues, all queue can access this node
for (Queue q : queueCollections.values()) {
Resources.addTo(q.resource, newNM.resource);
}
} else {
// update labels
for (String labelName : newLabels) {
Label label = labelCollections.get(labelName);
Resources.addTo(label.resource, newNM.resource);
}
// update queues, only queue can access this node will be subtract
for (Queue q : queueCollections.values()) {
if (isNodeUsableByQueue(newLabels, q)) {
Resources.addTo(q.resource, newNM.resource);
}
}
}
}
}
}
public Resource getResourceByLabel(String label, Resource clusterResource) {
label = normalizeLabel(label);
try {
readLock.lock();
if (null == labelCollections.get(label)) {
return Resources.none();
}
return labelCollections.get(label).resource;
} finally {
readLock.unlock();
}
}
private boolean isNodeUsableByQueue(Set<String> nodeLabels, Queue q) {
// node without any labels can be accessed by any queue
if (nodeLabels == null || nodeLabels.isEmpty()
|| (nodeLabels.size() == 1 && nodeLabels.contains(NO_LABEL))) {
return true;
}
for (String label : nodeLabels) {
if (q.acccessibleNodeLabels.contains(label)) {
return true;
}
}
return false;
}
private Map<String, Host> cloneNodeMap() {
Set<NodeId> nodesToCopy = new HashSet<NodeId>();
for (String nodeName : nodeCollections.keySet()) {
nodesToCopy.add(NodeId.newInstance(nodeName, WILDCARD_PORT));
}
return cloneNodeMap(nodesToCopy);
}
public boolean checkAccess(UserGroupInformation user) {
// make sure only admin can invoke
// this method
if (adminAcl.isUserAllowed(user)) {
return true;
}
return false;
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore;
public class DummyRMNodeLabelsManager extends RMNodeLabelsManager {
Map<NodeId, Set<String>> lastNodeToLabels = null;
Collection<String> lastAddedlabels = null;
Collection<String> lastRemovedlabels = null;
@Override
public void initNodeLabelStore(Configuration conf) {
this.store = new NodeLabelsStore(this) {
@Override
public void recover() throws IOException {
// do nothing
}
@Override
public void removeClusterNodeLabels(Collection<String> labels)
throws IOException {
// do nothing
}
@Override
public void updateNodeToLabelsMappings(
Map<NodeId, Set<String>> nodeToLabels) throws IOException {
// do nothing
}
@Override
public void storeNewClusterNodeLabels(Set<String> label) throws IOException {
// do nothing
}
@Override
public void close() throws IOException {
// do nothing
}
};
}
@Override
protected void initDispatcher(Configuration conf) {
super.dispatcher = new InlineDispatcher();
}
@Override
protected void startDispatcher() {
// do nothing
}
@Override
protected void stopDispatcher() {
// do nothing
}
}

View File

@ -0,0 +1,367 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public class TestRMNodeLabelsManager extends NodeLabelTestBase {
private final Resource EMPTY_RESOURCE = Resource.newInstance(0, 0);
private final Resource SMALL_RESOURCE = Resource.newInstance(100, 0);
private final Resource LARGE_NODE = Resource.newInstance(1000, 0);
DummyRMNodeLabelsManager mgr = null;
@Before
public void before() {
mgr = new DummyRMNodeLabelsManager();
mgr.init(new Configuration());
mgr.start();
}
@After
public void after() {
mgr.stop();
}
@Test(timeout = 5000)
public void testNodeActiveDeactiveUpdate() throws Exception {
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3")));
Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE);
Assert.assertEquals(mgr.getResourceByLabel("p2", null), EMPTY_RESOURCE);
Assert.assertEquals(mgr.getResourceByLabel("p3", null), EMPTY_RESOURCE);
Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null),
EMPTY_RESOURCE);
// active two NM to n1, one large and one small
mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE);
Assert.assertEquals(mgr.getResourceByLabel("p1", null),
Resources.add(SMALL_RESOURCE, LARGE_NODE));
// change the large NM to small, check if resource updated
mgr.updateNodeResource(NodeId.newInstance("n1", 2), SMALL_RESOURCE);
Assert.assertEquals(mgr.getResourceByLabel("p1", null),
Resources.multiply(SMALL_RESOURCE, 2));
// deactive one NM, and check if resource updated
mgr.deactivateNode(NodeId.newInstance("n1", 1));
Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE);
// continus deactive, check if resource updated
mgr.deactivateNode(NodeId.newInstance("n1", 2));
Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE);
// Add two NM to n1 back
mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE);
// And remove p1, now the two NM should come to default label,
mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1"));
Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null),
Resources.add(SMALL_RESOURCE, LARGE_NODE));
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(timeout = 5000)
public void testUpdateNodeLabelWithActiveNode() throws Exception {
mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3")));
// active two NM to n1, one large and one small
mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n2", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n3", 1), SMALL_RESOURCE);
// change label of n1 to p2
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2")));
Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE);
Assert.assertEquals(mgr.getResourceByLabel("p2", null),
Resources.multiply(SMALL_RESOURCE, 2));
Assert.assertEquals(mgr.getResourceByLabel("p3", null), SMALL_RESOURCE);
// add more labels
mgr.addToCluserNodeLabels(toSet("p4", "p5", "p6"));
mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n4"), toSet("p1"),
toNodeId("n5"), toSet("p2"), toNodeId("n6"), toSet("p3"),
toNodeId("n7"), toSet("p4"), toNodeId("n8"), toSet("p5")));
// now node -> label is,
// p1 : n4
// p2 : n1, n2, n5
// p3 : n3, n6
// p4 : n7
// p5 : n8
// no-label : n9
// active these nodes
mgr.activateNode(NodeId.newInstance("n4", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n5", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n6", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n7", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n8", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("n9", 1), SMALL_RESOURCE);
// check varibles
Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE);
Assert.assertEquals(mgr.getResourceByLabel("p2", null),
Resources.multiply(SMALL_RESOURCE, 3));
Assert.assertEquals(mgr.getResourceByLabel("p3", null),
Resources.multiply(SMALL_RESOURCE, 2));
Assert.assertEquals(mgr.getResourceByLabel("p4", null),
Resources.multiply(SMALL_RESOURCE, 1));
Assert.assertEquals(mgr.getResourceByLabel("p5", null),
Resources.multiply(SMALL_RESOURCE, 1));
Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null),
Resources.multiply(SMALL_RESOURCE, 1));
// change a bunch of nodes -> labels
// n4 -> p2
// n7 -> empty
// n5 -> p1
// n8 -> empty
// n9 -> p1
//
// now become:
// p1 : n5, n9
// p2 : n1, n2, n4
// p3 : n3, n6
// p4 : [ ]
// p5 : [ ]
// no label: n8, n7
mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n4"), toSet("p2"),
toNodeId("n7"), RMNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n5"),
toSet("p1"), toNodeId("n8"), RMNodeLabelsManager.EMPTY_STRING_SET,
toNodeId("n9"), toSet("p1")));
// check varibles
Assert.assertEquals(mgr.getResourceByLabel("p1", null),
Resources.multiply(SMALL_RESOURCE, 2));
Assert.assertEquals(mgr.getResourceByLabel("p2", null),
Resources.multiply(SMALL_RESOURCE, 3));
Assert.assertEquals(mgr.getResourceByLabel("p3", null),
Resources.multiply(SMALL_RESOURCE, 2));
Assert.assertEquals(mgr.getResourceByLabel("p4", null),
Resources.multiply(SMALL_RESOURCE, 0));
Assert.assertEquals(mgr.getResourceByLabel("p5", null),
Resources.multiply(SMALL_RESOURCE, 0));
Assert.assertEquals(mgr.getResourceByLabel("", null),
Resources.multiply(SMALL_RESOURCE, 2));
}
@Test(timeout=5000)
public void testGetQueueResource() throws Exception {
Resource clusterResource = Resource.newInstance(9999, 1);
/*
* Node->Labels:
* host1 : red, blue
* host2 : blue, yellow
* host3 : yellow
* host4 :
*/
mgr.addToCluserNodeLabels(toSet("red", "blue", "yellow"));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host1"),
toSet("red", "blue")));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host2"),
toSet("blue", "yellow")));
mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host3"), toSet("yellow")));
// active two NM to n1, one large and one small
mgr.activateNode(NodeId.newInstance("host1", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("host2", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("host3", 1), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("host4", 1), SMALL_RESOURCE);
// reinitialize queue
Set<String> q1Label = toSet("red", "blue");
Set<String> q2Label = toSet("blue", "yellow");
Set<String> q3Label = toSet("yellow");
Set<String> q4Label = RMNodeLabelsManager.EMPTY_STRING_SET;
Set<String> q5Label = toSet(RMNodeLabelsManager.ANY);
Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>();
queueToLabels.put("Q1", q1Label);
queueToLabels.put("Q2", q2Label);
queueToLabels.put("Q3", q3Label);
queueToLabels.put("Q4", q4Label);
queueToLabels.put("Q5", q5Label);
mgr.reinitializeQueueLabels(queueToLabels);
// check resource
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q1", q1Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4),
mgr.getQueueResource("Q2", q2Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q3", q3Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1),
mgr.getQueueResource("Q4", q4Label, clusterResource));
Assert.assertEquals(clusterResource,
mgr.getQueueResource("Q5", q5Label, clusterResource));
mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("host1"), toSet("red"),
toNodeId("host2"), toSet("blue", "yellow")));
mgr.addLabelsToNode(ImmutableMap.of(toNodeId("host3"), toSet("red")));
/*
* Check resource after changes some labels
* Node->Labels:
* host1 : blue (was: red, blue)
* host2 : (was: blue, yellow)
* host3 : red, yellow (was: yellow)
* host4 :
*/
// check resource
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4),
mgr.getQueueResource("Q1", q1Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4),
mgr.getQueueResource("Q2", q2Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q3", q3Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
mgr.getQueueResource("Q4", q4Label, clusterResource));
Assert.assertEquals(clusterResource,
mgr.getQueueResource("Q5", q5Label, clusterResource));
/*
* Check resource after deactive/active some nodes
* Node->Labels:
* (deactived) host1 : blue
* host2 :
* (deactived and then actived) host3 : red, yellow
* host4 :
*/
mgr.deactivateNode(NodeId.newInstance("host1", 1));
mgr.deactivateNode(NodeId.newInstance("host3", 1));
mgr.activateNode(NodeId.newInstance("host3", 1), SMALL_RESOURCE);
// check resource
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q1", q1Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q2", q2Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q3", q3Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
mgr.getQueueResource("Q4", q4Label, clusterResource));
Assert.assertEquals(clusterResource,
mgr.getQueueResource("Q5", q5Label, clusterResource));
/*
* Check resource after refresh queue:
* Q1: blue
* Q2: red, blue
* Q3: red
* Q4:
* Q5: ANY
*/
q1Label = toSet("blue");
q2Label = toSet("blue", "red");
q3Label = toSet("red");
q4Label = RMNodeLabelsManager.EMPTY_STRING_SET;
q5Label = toSet(RMNodeLabelsManager.ANY);
queueToLabels.clear();
queueToLabels.put("Q1", q1Label);
queueToLabels.put("Q2", q2Label);
queueToLabels.put("Q3", q3Label);
queueToLabels.put("Q4", q4Label);
queueToLabels.put("Q5", q5Label);
mgr.reinitializeQueueLabels(queueToLabels);
// check resource
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
mgr.getQueueResource("Q1", q1Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q2", q2Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q3", q3Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
mgr.getQueueResource("Q4", q4Label, clusterResource));
Assert.assertEquals(clusterResource,
mgr.getQueueResource("Q5", q5Label, clusterResource));
/*
* Active NMs in nodes already have NM
* Node->Labels:
* host2 :
* host3 : red, yellow (3 NMs)
* host4 : (2 NMs)
*/
mgr.activateNode(NodeId.newInstance("host3", 2), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("host3", 3), SMALL_RESOURCE);
mgr.activateNode(NodeId.newInstance("host4", 2), SMALL_RESOURCE);
// check resource
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q1", q1Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 6),
mgr.getQueueResource("Q2", q2Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 6),
mgr.getQueueResource("Q3", q3Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q4", q4Label, clusterResource));
Assert.assertEquals(clusterResource,
mgr.getQueueResource("Q5", q5Label, clusterResource));
/*
* Deactive NMs in nodes already have NMs
* Node->Labels:
* host2 :
* host3 : red, yellow (2 NMs)
* host4 : (0 NMs)
*/
mgr.deactivateNode(NodeId.newInstance("host3", 3));
mgr.deactivateNode(NodeId.newInstance("host4", 2));
mgr.deactivateNode(NodeId.newInstance("host4", 1));
// check resource
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1),
mgr.getQueueResource("Q1", q1Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q2", q2Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
mgr.getQueueResource("Q3", q3Label, clusterResource));
Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1),
mgr.getQueueResource("Q4", q4Label, clusterResource));
Assert.assertEquals(clusterResource,
mgr.getQueueResource("Q5", q5Label, clusterResource));
}
}