YARN-3964. Support NodeLabelsProvider at Resource Manager side.

Contributed by Dian Fu.
This commit is contained in:
Devaraj K 2015-10-11 11:21:29 +05:30
parent 7e2c971fed
commit db93047881
18 changed files with 648 additions and 64 deletions

View File

@ -229,6 +229,9 @@ Release 2.8.0 - UNRELEASED
YARN-261. Ability to fail AM attempts (Andrey Klochkov and
Rohith Sharma K S via jlowe)
YARN-3964. Support NodeLabelsProvider at Resource Manager side.
(Dian Fu via devaraj)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -1999,14 +1999,17 @@ private static void addDeprecatedKeys() {
public static final String NODELABEL_CONFIGURATION_TYPE =
NODE_LABELS_PREFIX + "configuration-type";
public static final String CENTALIZED_NODELABEL_CONFIGURATION_TYPE =
public static final String CENTRALIZED_NODELABEL_CONFIGURATION_TYPE =
"centralized";
public static final String DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE =
"delegated-centralized";
public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE =
"distributed";
public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
CENTALIZED_NODELABEL_CONFIGURATION_TYPE;
CENTRALIZED_NODELABEL_CONFIGURATION_TYPE;
public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY =
YARN_PREFIX + "cluster.max-application-priority";
@ -2019,6 +2022,20 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
}
@Private
public static boolean isCentralizedNodeLabelConfiguration(
Configuration conf) {
return CENTRALIZED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get(
NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
}
@Private
public static boolean isDelegatedCentralizedNodeLabelConfiguration(
Configuration conf) {
return DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get(
NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
}
private static final String NM_NODE_LABELS_PREFIX = NM_PREFIX
+ "node-labels.";
@ -2055,6 +2072,23 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX
+ "node-labels.";
public static final String RM_NODE_LABELS_PROVIDER_CONFIG =
RM_NODE_LABELS_PREFIX + "provider";
private static final String RM_NODE_LABELS_PROVIDER_PREFIX =
RM_NODE_LABELS_PREFIX + "provider.";
//If -1 is configured then no timer task should be created
public static final String RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
RM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";
//once in 30 mins
public static final long DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
30 * 60 * 1000;
public static final String AM_BLACKLISTING_ENABLED =
YARN_PREFIX + "am.blacklisting.enabled";
public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true;

View File

@ -101,7 +101,7 @@ public class CommonNodeLabelsManager extends AbstractService {
protected NodeLabelsStore store;
private boolean nodeLabelsEnabled = false;
private boolean isDistributedNodeLabelConfiguration = false;
private boolean isCentralizedNodeLabelConfiguration = true;
/**
* A <code>Host</code> can have multiple <code>Node</code>s
@ -220,8 +220,8 @@ protected void serviceInit(Configuration conf) throws Exception {
conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED,
YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED);
isDistributedNodeLabelConfiguration =
YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
isCentralizedNodeLabelConfiguration =
YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL));
}
@ -229,7 +229,7 @@ protected void serviceInit(Configuration conf) throws Exception {
protected void initNodeLabelStore(Configuration conf) throws Exception {
this.store = new FileSystemNodeLabelsStore(this);
this.store.init(conf);
this.store.recover(isDistributedNodeLabelConfiguration);
this.store.recover(!isCentralizedNodeLabelConfiguration);
}
// for UT purpose
@ -624,10 +624,14 @@ protected void internalUpdateLabelsOnNodes(
}
}
if (null != dispatcher && !isDistributedNodeLabelConfiguration) {
// In case of DistributedNodeLabelConfiguration, no need to save the the
if (null != dispatcher && isCentralizedNodeLabelConfiguration) {
// In case of DistributedNodeLabelConfiguration or
// DelegatedCentralizedNodeLabelConfiguration, no need to save the the
// NodeLabels Mapping to the back-end store, as on RM restart/failover
// NodeLabels are collected from NM through Register/Heartbeat again
// in case of DistributedNodeLabelConfiguration and collected from
// RMNodeLabelsMappingProvider in case of
// DelegatedCentralizedNodeLabelConfiguration
dispatcher.getEventHandler().handle(
new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
}

View File

@ -2114,7 +2114,7 @@
<property>
<description>
Set configuration type for node labels. Administrators can specify
"centralized" or "distributed".
"centralized", "delegated-centralized" or "distributed".
</description>
<name>yarn.node-labels.configuration-type</name>
<value>centralized</value>
@ -2176,6 +2176,32 @@
<name>yarn.nodemanager.node-labels.provider.fetch-timeout-ms</name>
<value>1200000</value>
</property>
<!-- Delegated-centralized Node Labels Configuration -->
<property>
<description>
When node labels "yarn.node-labels.configuration-type" is
of type "delegated-centralized", administrators should configure
the class for fetching node labels by ResourceManager. Configured
class needs to extend
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.
RMNodeLabelsMappingProvider.
</description>
<name>yarn.resourcemanager.node-labels.provider</name>
<value></value>
</property>
<property>
<description>
When node labels "yarn.node-labels.configuration-type" is of type
"delegated-centralized" then periodically node labels are retrieved
from the node labels provider. This configuration is to define the
interval. If -1 is configured then node labels are retrieved from
provider only once for each node after it registers. Defaults to 30 mins.
</description>
<name>yarn.resourcemanager.node-labels.provider.fetch-interval-ms</name>
<value>1800000</value>
</property>
<!-- Other Configuration -->
<property>

View File

@ -86,6 +86,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -120,7 +121,7 @@ public class AdminService extends CompositeService implements
private UserGroupInformation daemonUser;
@VisibleForTesting
boolean isDistributedNodeLabelConfiguration = false;
boolean isCentralizedNodeLabelConfiguration = true;
public AdminService(ResourceManager rm, RMContext rmContext) {
super(AdminService.class.getName());
@ -151,8 +152,8 @@ public void serviceInit(Configuration conf) throws Exception {
.getCurrentUser());
rmId = conf.get(YarnConfiguration.RM_HA_ID);
isDistributedNodeLabelConfiguration =
YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
isCentralizedNodeLabelConfiguration =
YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
super.serviceInit(conf);
}
@ -745,7 +746,13 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
String operation = "replaceLabelsOnNode";
final String msg = "set node to labels.";
checkAndThrowIfDistributedNodeLabelConfEnabled(operation);
try {
NodeLabelsUtils.verifyCentralizedNodeLabelConfEnabled(operation,
isCentralizedNodeLabelConfiguration);
} catch (IOException ioe) {
throw RPCUtil.getRemoteException(ioe);
}
UserGroupInformation user = checkAcls(operation);
checkRMStatus(user.getShortUserName(), operation, msg);
@ -780,17 +787,6 @@ private YarnException logAndWrapException(Exception exception, String user,
return RPCUtil.getRemoteException(exception);
}
private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation)
throws YarnException {
if (isDistributedNodeLabelConfiguration) {
String msg =
String.format("Error when invoke method=%s because of "
+ "distributed node label configuration enabled.", operation);
LOG.error(msg);
throw RPCUtil.getRemoteException(new IOException(msg));
}
}
@Override
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@ -94,6 +95,7 @@ public class RMActiveServiceContext {
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMNodeLabelsManager nodeLabelManager;
private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
private long epoch;
private Clock systemClock = new SystemClock();
private long schedulerRecoveryStartTime = 0;
@ -390,6 +392,19 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) {
nodeLabelManager = mgr;
}
@Private
@Unstable
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
return rmDelegatedNodeLabelsUpdater;
}
@Private
@Unstable
public void setRMDelegatedNodeLabelsUpdater(
RMDelegatedNodeLabelsUpdater nodeLablesUpdater) {
rmDelegatedNodeLabelsUpdater = nodeLablesUpdater;
}
@Private
@Unstable
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@ -118,6 +119,11 @@ void setRMApplicationHistoryWriter(
public void setNodeLabelManager(RMNodeLabelsManager mgr);
RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater();
void setRMDelegatedNodeLabelsUpdater(
RMDelegatedNodeLabelsUpdater nodeLabelsUpdater);
long getEpoch();
ReservationSystem getReservationSystem();

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@ -400,6 +401,18 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) {
activeServiceContext.setNodeLabelManager(mgr);
}
@Override
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
return activeServiceContext.getRMDelegatedNodeLabelsUpdater();
}
@Override
public void setRMDelegatedNodeLabelsUpdater(
RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater) {
activeServiceContext.setRMDelegatedNodeLabelsUpdater(
delegatedNodeLabelsUpdater);
}
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
}

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@ -440,6 +441,13 @@ protected void serviceInit(Configuration configuration) throws Exception {
addService(nlm);
rmContext.setNodeLabelManager(nlm);
RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
createRMDelegatedNodeLabelsUpdater();
if (delegatedNodeLabelsUpdater != null) {
addService(delegatedNodeLabelsUpdater);
rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
}
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
@ -1113,6 +1121,20 @@ protected RMSecretManagerService createRMSecretManagerService() {
return new RMSecretManagerService(conf, rmContext);
}
/**
* Create RMDelegatedNodeLabelsUpdater based on configuration.
*/
protected RMDelegatedNodeLabelsUpdater createRMDelegatedNodeLabelsUpdater() {
if (conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED,
YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED)
&& YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(
conf)) {
return new RMDelegatedNodeLabelsUpdater(rmContext);
} else {
return null;
}
}
@Private
public ClientRMService getClientRMService() {
return this.clientRM;

View File

@ -22,7 +22,6 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@ -43,7 +42,6 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -63,6 +61,7 @@
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@ -105,6 +104,7 @@ public class ResourceTrackerService extends AbstractService implements
private int minAllocVcores;
private boolean isDistributedNodeLabelsConf;
private boolean isDelegatedCentralizedNodeLabelsConf;
public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
@ -151,6 +151,8 @@ protected void serviceInit(Configuration conf) throws Exception {
isDistributedNodeLabelsConf =
YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
isDelegatedCentralizedNodeLabelsConf = YarnConfiguration
.isDelegatedCentralizedNodeLabelConfiguration(conf);
super.serviceInit(conf);
}
@ -241,17 +243,6 @@ void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) {
}
}
static Set<String> convertToStringSet(Set<NodeLabel> nodeLabels) {
if (null == nodeLabels) {
return null;
}
Set<String> labels = new HashSet<String>();
for (NodeLabel label : nodeLabels) {
labels.add(label.getName());
}
return labels;
}
@SuppressWarnings("unchecked")
@Override
public RegisterNodeManagerResponse registerNodeManager(
@ -353,7 +344,8 @@ public RegisterNodeManagerResponse registerNodeManager(
}
// Update node's labels to RM's NodeLabelManager.
Set<String> nodeLabels = convertToStringSet(request.getNodeLabels());
Set<String> nodeLabels = NodeLabelsUtils.convertToStringSet(
request.getNodeLabels());
if (isDistributedNodeLabelsConf && nodeLabels != null) {
try {
updateNodeLabelsFromNMReport(nodeLabels, nodeId);
@ -363,6 +355,8 @@ public RegisterNodeManagerResponse registerNodeManager(
response.setDiagnosticsMessage(ex.getMessage());
response.setAreNodeLabelsAcceptedByRM(false);
}
} else if (isDelegatedCentralizedNodeLabelsConf) {
this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId);
}
StringBuilder message = new StringBuilder();
@ -480,7 +474,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) {
try {
updateNodeLabelsFromNMReport(
convertToStringSet(request.getNodeLabels()), nodeId);
NodeLabelsUtils.convertToStringSet(request.getNodeLabels()),
nodeId);
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
} catch (IOException ex) {
//ensure the error message is captured and sent across in response

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.NodeLabel;
/**
* Node labels utilities.
*/
public final class NodeLabelsUtils {
private static final Log LOG = LogFactory.getLog(NodeLabelsUtils.class);
private NodeLabelsUtils() { /* Hidden constructor */ }
public static Set<String> convertToStringSet(Set<NodeLabel> nodeLabels) {
if (null == nodeLabels) {
return null;
}
Set<String> labels = new HashSet<String>();
for (NodeLabel label : nodeLabels) {
labels.add(label.getName());
}
return labels;
}
public static void verifyCentralizedNodeLabelConfEnabled(String operation,
boolean isCentralizedNodeLabelConfiguration) throws IOException {
if (!isCentralizedNodeLabelConfiguration) {
String msg =
String.format("Error when invoke method=%s because "
+ "centralized node label configuration is not enabled.",
operation);
LOG.error(msg);
throw new IOException(msg);
}
}
}

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import com.google.common.annotations.VisibleForTesting;
/**
* Update nodes labels map for Resource Manager periodically. It collects
* nodes labels from {@link RMNodeLabelsMappingProvider} and updates the
* nodes -> labels map via {@link RMNodeLabelsManager}. This service is
* enabled when configuration "yarn.node-labels.configuration-type" is
* set to "delegated-centralized".
*/
public class RMDelegatedNodeLabelsUpdater extends CompositeService {
private static final Log LOG = LogFactory
.getLog(RMDelegatedNodeLabelsUpdater.class);
public static final long DISABLE_DELEGATED_NODE_LABELS_UPDATE = -1;
// Timer used to schedule node labels fetching
private Timer nodeLabelsScheduler;
// 30 seconds
@VisibleForTesting
public long nodeLabelsUpdateInterval = 30 * 1000;
private Set<NodeId> newlyRegisteredNodes = new HashSet<NodeId>();
// Lock to protect newlyRegisteredNodes
private Object lock = new Object();
private long lastAllNodesLabelUpdateMills = 0L;
private long allNodesLabelUpdateInterval;
private RMNodeLabelsMappingProvider rmNodeLabelsMappingProvider;
private RMContext rmContext;
public RMDelegatedNodeLabelsUpdater(RMContext rmContext) {
super("RMDelegatedNodeLabelsUpdater");
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
allNodesLabelUpdateInterval = conf.getLong(
YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS);
rmNodeLabelsMappingProvider = createRMNodeLabelsMappingProvider(conf);
addService(rmNodeLabelsMappingProvider);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
nodeLabelsScheduler = new Timer(
"RMDelegatedNodeLabelsUpdater-Timer", true);
TimerTask delegatedNodeLabelsUpdaterTimerTask =
new RMDelegatedNodeLabelsUpdaterTimerTask();
nodeLabelsScheduler.scheduleAtFixedRate(
delegatedNodeLabelsUpdaterTimerTask,
nodeLabelsUpdateInterval,
nodeLabelsUpdateInterval);
super.serviceStart();
}
/**
* Terminate the timer.
* @throws Exception
*/
@Override
protected void serviceStop() throws Exception {
if (nodeLabelsScheduler != null) {
nodeLabelsScheduler.cancel();
}
super.serviceStop();
}
private class RMDelegatedNodeLabelsUpdaterTimerTask extends TimerTask {
@Override
public void run() {
Set<NodeId> nodesToUpdateLabels = null;
boolean isUpdatingAllNodes = false;
if (allNodesLabelUpdateInterval != DISABLE_DELEGATED_NODE_LABELS_UPDATE) {
long elapsedTimeSinceLastUpdate =
System.currentTimeMillis() - lastAllNodesLabelUpdateMills;
if (elapsedTimeSinceLastUpdate > allNodesLabelUpdateInterval) {
nodesToUpdateLabels =
Collections.unmodifiableSet(rmContext.getRMNodes().keySet());
isUpdatingAllNodes = true;
}
}
if (nodesToUpdateLabels == null && !newlyRegisteredNodes.isEmpty()) {
synchronized (lock) {
if (!newlyRegisteredNodes.isEmpty()) {
nodesToUpdateLabels = new HashSet<NodeId>(newlyRegisteredNodes);
}
}
}
try {
if (nodesToUpdateLabels != null && !nodesToUpdateLabels.isEmpty()) {
updateNodeLabelsInternal(nodesToUpdateLabels);
if (isUpdatingAllNodes) {
lastAllNodesLabelUpdateMills = System.currentTimeMillis();
}
synchronized (lock) {
newlyRegisteredNodes.removeAll(nodesToUpdateLabels);
}
}
} catch (IOException e) {
LOG.error("Failed to update node Labels", e);
}
}
}
private void updateNodeLabelsInternal(Set<NodeId> nodes)
throws IOException {
Map<NodeId, Set<NodeLabel>> labelsUpdated =
rmNodeLabelsMappingProvider.getNodeLabels(nodes);
if (labelsUpdated != null && labelsUpdated.size() != 0) {
Map<NodeId, Set<String>> nodeToLabels =
new HashMap<NodeId, Set<String>>(labelsUpdated.size());
for (Map.Entry<NodeId, Set<NodeLabel>> entry
: labelsUpdated.entrySet()) {
nodeToLabels.put(entry.getKey(),
NodeLabelsUtils.convertToStringSet(entry.getValue()));
}
rmContext.getNodeLabelManager().replaceLabelsOnNode(nodeToLabels);
}
}
/**
* Get the RMNodeLabelsMappingProvider which is used to provide node labels.
*/
private RMNodeLabelsMappingProvider createRMNodeLabelsMappingProvider(
Configuration conf) throws IOException {
RMNodeLabelsMappingProvider nodeLabelsMappingProvider = null;
try {
Class<? extends RMNodeLabelsMappingProvider> labelsProviderClass =
conf.getClass(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG,
null, RMNodeLabelsMappingProvider.class);
if (labelsProviderClass != null) {
nodeLabelsMappingProvider = labelsProviderClass.newInstance();
}
} catch (InstantiationException | IllegalAccessException
| RuntimeException e) {
LOG.error("Failed to create RMNodeLabelsMappingProvider based on"
+ " Configuration", e);
throw new IOException("Failed to create RMNodeLabelsMappingProvider : "
+ e.getMessage(), e);
}
if (nodeLabelsMappingProvider == null) {
String msg = "RMNodeLabelsMappingProvider should be configured when "
+ "delegated-centralized node label configuration is enabled";
LOG.error(msg);
throw new IOException(msg);
} else if (LOG.isDebugEnabled()) {
LOG.debug("RM Node labels mapping provider class is : "
+ nodeLabelsMappingProvider.getClass().toString());
}
return nodeLabelsMappingProvider;
}
/**
* Update node labels for a specified node.
* @param node the node to update node labels
*/
public void updateNodeLabels(NodeId node) {
synchronized (lock) {
newlyRegisteredNodes.add(node);
}
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
/**
* Interface which is responsible for providing the node -> labels map.
*/
public abstract class RMNodeLabelsMappingProvider extends AbstractService {
public RMNodeLabelsMappingProvider(String name) {
super(name);
}
/**
* Provides the labels. It is expected to give same Labels
* continuously until there is a change in labels.
*
* @param nodes to fetch labels
* @return Set of node label strings applicable for a node
*/
public abstract Map<NodeId, Set<NodeLabel>> getNodeLabels(Set<NodeId> nodes);
}

View File

@ -111,6 +111,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -177,7 +178,7 @@ public class RMWebServices {
private @Context HttpServletResponse response;
@VisibleForTesting
boolean isDistributedNodeLabelConfiguration = false;
boolean isCentralizedNodeLabelConfiguration = true;
public final static String DELEGATION_TOKEN_HEADER =
"Hadoop-YARN-RM-Delegation-Token";
@ -186,19 +187,8 @@ public class RMWebServices {
public RMWebServices(final ResourceManager rm, Configuration conf) {
this.rm = rm;
this.conf = conf;
isDistributedNodeLabelConfiguration =
YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
}
private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation)
throws IOException {
if (isDistributedNodeLabelConfiguration) {
String msg =
String.format("Error when invoke method=%s because of "
+ "distributed node label configuration enabled.", operation);
LOG.error(msg);
throw new IOException(msg);
}
isCentralizedNodeLabelConfiguration =
YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
}
RMWebServices(ResourceManager rm, Configuration conf,
@ -892,7 +882,8 @@ private Response replaceLabelsOnNode(
String operation) throws IOException {
init();
checkAndThrowIfDistributedNodeLabelConfEnabled("replaceLabelsOnNode");
NodeLabelsUtils.verifyCentralizedNodeLabelConfEnabled(
"replaceLabelsOnNode", isCentralizedNodeLabelConfiguration);
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI == null) {

View File

@ -872,12 +872,12 @@ public void testModifyLabelsOnNodesWithDistributedConfigurationDisabled()
}
@Test(expected = YarnException.class)
public void testModifyLabelsOnNodesWithDistributedConfigurationEnabled()
public void testModifyLabelsOnNodesWithCentralizedConfigurationDisabled()
throws IOException, YarnException {
// create RM and set it's ACTIVE, and set distributed node label
// configuration to true
MockRM rm = new MockRM();
rm.adminService.isDistributedNodeLabelConfiguration = true;
rm.adminService.isCentralizedNodeLabelConfiguration = false;
((RMContextImpl) rm.getRMContext())
.setHAServiceState(HAServiceState.ACTIVE);
@ -893,14 +893,14 @@ public void testModifyLabelsOnNodesWithDistributedConfigurationEnabled()
}
@Test
public void testRemoveClusterNodeLabelsWithDistributedConfigurationEnabled()
public void testRemoveClusterNodeLabelsWithCentralizedConfigurationDisabled()
throws IOException, YarnException {
// create RM and set it's ACTIVE
MockRM rm = new MockRM();
((RMContextImpl) rm.getRMContext())
.setHAServiceState(HAServiceState.ACTIVE);
RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager();
rm.adminService.isDistributedNodeLabelConfiguration = true;
rm.adminService.isCentralizedNodeLabelConfiguration = false;
// by default, distributed configuration for node label is disabled, this
// should pass

View File

@ -61,6 +61,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -362,7 +363,7 @@ protected RMNodeLabelsManager createNodeLabelManager() {
Assert.assertEquals("Action should be normal on valid Node Labels",
NodeAction.NORMAL, response.getNodeAction());
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
ResourceTrackerService.convertToStringSet(registerReq.getNodeLabels()));
NodeLabelsUtils.convertToStringSet(registerReq.getNodeLabels()));
Assert.assertTrue("Valid Node Labels were not accepted by RM",
response.getAreNodeLabelsAcceptedByRM());
rm.stop();
@ -590,7 +591,7 @@ protected RMNodeLabelsManager createNodeLabelManager() {
Assert.assertEquals("InValid Node Labels were not accepted by RM",
NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction());
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
ResourceTrackerService.convertToStringSet(heartbeatReq.getNodeLabels()));
NodeLabelsUtils.convertToStringSet(heartbeatReq.getNodeLabels()));
Assert.assertTrue("Valid Node Labels were not accepted by RM",
nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
public class TestRMDelegatedNodeLabelsUpdater extends NodeLabelTestBase {
private YarnConfiguration conf;
private static Map<NodeId, Set<NodeLabel>> nodeLabelsMap = Maps.newHashMap();
@Before
public void setup() {
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
YarnConfiguration.DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE);
conf.setClass(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG,
DummyRMNodeLabelsMappingProvider.class,
RMNodeLabelsMappingProvider.class);
}
@Test
public void testRMNodeLabelsMappingProviderConfiguration() {
conf.unset(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG);
try {
MockRM rm = new MockRM(conf);
rm.init(conf);
rm.start();
Assert.fail("Expected an exception");
} catch (Exception e) {
// expected an exception
Assert.assertTrue(e.getMessage().contains(
"RMNodeLabelsMappingProvider should be configured"));
}
}
@Test
public void testWithNodeLabelUpdateEnabled() throws Exception {
conf.setLong(YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
1000);
MockRM rm = new MockRM(conf);
rm.init(conf);
rm.getRMContext().getRMDelegatedNodeLabelsUpdater().nodeLabelsUpdateInterval
= 3 * 1000;
rm.start();
RMNodeLabelsManager mgr = rm.getRMContext().getNodeLabelManager();
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
NodeId nodeId = toNodeId("h1:1234");
assertEquals(0, mgr.getLabelsOnNode(nodeId).size());
updateNodeLabels(nodeId, "x");
registerNode(rm, nodeId);
Thread.sleep(4000);
assertCollectionEquals(ImmutableSet.of("x"), mgr.getLabelsOnNode(nodeId));
// Ensure that node labels are updated if NodeLabelsProvider
// gives different labels
updateNodeLabels(nodeId, "y");
Thread.sleep(4000);
assertCollectionEquals(ImmutableSet.of("y"), mgr.getLabelsOnNode(nodeId));
rm.stop();
}
@Test
public void testWithNodeLabelUpdateDisabled() throws Exception {
conf.setLong(YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
RMDelegatedNodeLabelsUpdater.DISABLE_DELEGATED_NODE_LABELS_UPDATE);
MockRM rm = new MockRM(conf);
rm.init(conf);
rm.getRMContext().getRMDelegatedNodeLabelsUpdater().nodeLabelsUpdateInterval
= 3 * 1000;
rm.start();
RMNodeLabelsManager mgr = rm.getRMContext().getNodeLabelManager();
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
NodeId nodeId = toNodeId("h1:1234");
updateNodeLabels(nodeId, "x");
registerNode(rm, nodeId);
Thread.sleep(4000);
// Ensure that even though timer is not run, node labels are fetched
// when node is registered
assertCollectionEquals(ImmutableSet.of("x"), mgr.getLabelsOnNode(nodeId));
rm.stop();
}
private void registerNode(ResourceManager rm, NodeId nodeId)
throws YarnException, IOException {
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest req =
Records.newRecord(RegisterNodeManagerRequest.class);
Resource capability = BuilderUtils.newResource(1024, 1);
req.setResource(capability);
req.setNodeId(nodeId);
req.setHttpPort(1234);
req.setNMVersion(YarnVersionInfo.getVersion());
resourceTrackerService.registerNodeManager(req);
}
private void updateNodeLabels(NodeId nodeId, String... nodeLabelsStr) {
nodeLabelsMap.put(nodeId, toNodeLabelSet(nodeLabelsStr));
}
public static class DummyRMNodeLabelsMappingProvider extends
RMNodeLabelsMappingProvider {
public DummyRMNodeLabelsMappingProvider() {
super("DummyRMNodeLabelsMappingProvider");
}
@Override
public Map<NodeId, Set<NodeLabel>> getNodeLabels(Set<NodeId> nodes) {
Map<NodeId, Set<NodeLabel>> nodeLabels = Maps.newHashMap();
for(NodeId node : nodes) {
nodeLabels.put(node, nodeLabelsMap.get(node));
}
return nodeLabels;
}
}
}

View File

@ -445,8 +445,8 @@ public void testNodeLabels() throws JSONException, Exception {
.post(ClientResponse.class);
LOG.info("posted node nodelabel");
//setting rmWebService for Distributed NodeLabel Configuration
rmWebService.isDistributedNodeLabelConfiguration = true;
//setting rmWebService for non Centralized NodeLabel Configuration
rmWebService.isCentralizedNodeLabelConfiguration = false;
// Case1 : Replace labels using node-to-labels
ntli = new NodeToLabelsEntryList();