diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index de8f678007..3762e45fa7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -229,6 +229,9 @@ Release 2.8.0 - UNRELEASED
YARN-261. Ability to fail AM attempts (Andrey Klochkov and
Rohith Sharma K S via jlowe)
+ YARN-3964. Support NodeLabelsProvider at Resource Manager side.
+ (Dian Fu via devaraj)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1e102e5370..8d34f4ec69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1999,14 +1999,17 @@ private static void addDeprecatedKeys() {
public static final String NODELABEL_CONFIGURATION_TYPE =
NODE_LABELS_PREFIX + "configuration-type";
- public static final String CENTALIZED_NODELABEL_CONFIGURATION_TYPE =
+ public static final String CENTRALIZED_NODELABEL_CONFIGURATION_TYPE =
"centralized";
-
+
+ public static final String DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE =
+ "delegated-centralized";
+
public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE =
"distributed";
public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
- CENTALIZED_NODELABEL_CONFIGURATION_TYPE;
+ CENTRALIZED_NODELABEL_CONFIGURATION_TYPE;
public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY =
YARN_PREFIX + "cluster.max-application-priority";
@@ -2019,6 +2022,20 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
}
+ @Private
+ public static boolean isCentralizedNodeLabelConfiguration(
+ Configuration conf) {
+ return CENTRALIZED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get(
+ NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
+ }
+
+ @Private
+ public static boolean isDelegatedCentralizedNodeLabelConfiguration(
+ Configuration conf) {
+ return DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get(
+ NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE));
+ }
+
private static final String NM_NODE_LABELS_PREFIX = NM_PREFIX
+ "node-labels.";
@@ -2055,6 +2072,23 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) {
public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS =
NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels";
+ private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX
+ + "node-labels.";
+
+ public static final String RM_NODE_LABELS_PROVIDER_CONFIG =
+ RM_NODE_LABELS_PREFIX + "provider";
+
+ private static final String RM_NODE_LABELS_PROVIDER_PREFIX =
+ RM_NODE_LABELS_PREFIX + "provider.";
+
+ //If -1 is configured then no timer task should be created
+ public static final String RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
+ RM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";
+
+ //once in 30 mins
+ public static final long DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
+ 30 * 60 * 1000;
+
public static final String AM_BLACKLISTING_ENABLED =
YARN_PREFIX + "am.blacklisting.enabled";
public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
index deec6ab0b8..ef5462f430 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
@@ -101,7 +101,7 @@ public class CommonNodeLabelsManager extends AbstractService {
protected NodeLabelsStore store;
private boolean nodeLabelsEnabled = false;
- private boolean isDistributedNodeLabelConfiguration = false;
+ private boolean isCentralizedNodeLabelConfiguration = true;
/**
* A Host
can have multiple Node
s
@@ -220,16 +220,16 @@ protected void serviceInit(Configuration conf) throws Exception {
conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED,
YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED);
- isDistributedNodeLabelConfiguration =
- YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
-
+ isCentralizedNodeLabelConfiguration =
+ YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
+
labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL));
}
protected void initNodeLabelStore(Configuration conf) throws Exception {
this.store = new FileSystemNodeLabelsStore(this);
this.store.init(conf);
- this.store.recover(isDistributedNodeLabelConfiguration);
+ this.store.recover(!isCentralizedNodeLabelConfiguration);
}
// for UT purpose
@@ -624,10 +624,14 @@ protected void internalUpdateLabelsOnNodes(
}
}
- if (null != dispatcher && !isDistributedNodeLabelConfiguration) {
- // In case of DistributedNodeLabelConfiguration, no need to save the the
+ if (null != dispatcher && isCentralizedNodeLabelConfiguration) {
+ // In case of DistributedNodeLabelConfiguration or
+ // DelegatedCentralizedNodeLabelConfiguration, no need to save the the
// NodeLabels Mapping to the back-end store, as on RM restart/failover
// NodeLabels are collected from NM through Register/Heartbeat again
+ // in case of DistributedNodeLabelConfiguration and collected from
+ // RMNodeLabelsMappingProvider in case of
+ // DelegatedCentralizedNodeLabelConfiguration
dispatcher.getEventHandler().handle(
new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 53793d2c47..e3d2c699d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2114,7 +2114,7 @@
Set configuration type for node labels. Administrators can specify
- "centralized" or "distributed".
+ "centralized", "delegated-centralized" or "distributed".
yarn.node-labels.configuration-type
centralized
@@ -2176,6 +2176,32 @@
yarn.nodemanager.node-labels.provider.fetch-timeout-ms
1200000
+
+
+
+
+ When node labels "yarn.node-labels.configuration-type" is
+ of type "delegated-centralized", administrators should configure
+ the class for fetching node labels by ResourceManager. Configured
+ class needs to extend
+ org.apache.hadoop.yarn.server.resourcemanager.nodelabels.
+ RMNodeLabelsMappingProvider.
+
+ yarn.resourcemanager.node-labels.provider
+
+
+
+
+
+ When node labels "yarn.node-labels.configuration-type" is of type
+ "delegated-centralized" then periodically node labels are retrieved
+ from the node labels provider. This configuration is to define the
+ interval. If -1 is configured then node labels are retrieved from
+ provider only once for each node after it registers. Defaults to 30 mins.
+
+ yarn.resourcemanager.node-labels.provider.fetch-interval-ms
+ 1800000
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index ab46419388..353e72d626 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -86,6 +86,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -120,7 +121,7 @@ public class AdminService extends CompositeService implements
private UserGroupInformation daemonUser;
@VisibleForTesting
- boolean isDistributedNodeLabelConfiguration = false;
+ boolean isCentralizedNodeLabelConfiguration = true;
public AdminService(ResourceManager rm, RMContext rmContext) {
super(AdminService.class.getName());
@@ -151,8 +152,8 @@ public void serviceInit(Configuration conf) throws Exception {
.getCurrentUser());
rmId = conf.get(YarnConfiguration.RM_HA_ID);
- isDistributedNodeLabelConfiguration =
- YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
+ isCentralizedNodeLabelConfiguration =
+ YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
super.serviceInit(conf);
}
@@ -745,7 +746,13 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
String operation = "replaceLabelsOnNode";
final String msg = "set node to labels.";
- checkAndThrowIfDistributedNodeLabelConfEnabled(operation);
+ try {
+ NodeLabelsUtils.verifyCentralizedNodeLabelConfEnabled(operation,
+ isCentralizedNodeLabelConfiguration);
+ } catch (IOException ioe) {
+ throw RPCUtil.getRemoteException(ioe);
+ }
+
UserGroupInformation user = checkAcls(operation);
checkRMStatus(user.getShortUserName(), operation, msg);
@@ -780,17 +787,6 @@ private YarnException logAndWrapException(Exception exception, String user,
return RPCUtil.getRemoteException(exception);
}
- private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation)
- throws YarnException {
- if (isDistributedNodeLabelConfiguration) {
- String msg =
- String.format("Error when invoke method=%s because of "
- + "distributed node label configuration enabled.", operation);
- LOG.error(msg);
- throw RPCUtil.getRemoteException(new IOException(msg));
- }
- }
-
@Override
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index c71323fcff..dd2153ac05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@@ -94,6 +95,7 @@ public class RMActiveServiceContext {
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
private RMNodeLabelsManager nodeLabelManager;
+ private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
private long epoch;
private Clock systemClock = new SystemClock();
private long schedulerRecoveryStartTime = 0;
@@ -390,6 +392,19 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) {
nodeLabelManager = mgr;
}
+ @Private
+ @Unstable
+ public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
+ return rmDelegatedNodeLabelsUpdater;
+ }
+
+ @Private
+ @Unstable
+ public void setRMDelegatedNodeLabelsUpdater(
+ RMDelegatedNodeLabelsUpdater nodeLablesUpdater) {
+ rmDelegatedNodeLabelsUpdater = nodeLablesUpdater;
+ }
+
@Private
@Unstable
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index b64c83494e..9802a3796b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -118,6 +119,11 @@ void setRMApplicationHistoryWriter(
public void setNodeLabelManager(RMNodeLabelsManager mgr);
+ RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater();
+
+ void setRMDelegatedNodeLabelsUpdater(
+ RMDelegatedNodeLabelsUpdater nodeLabelsUpdater);
+
long getEpoch();
ReservationSystem getReservationSystem();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 840cea7266..ed9942bce2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -400,6 +401,18 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) {
activeServiceContext.setNodeLabelManager(mgr);
}
+ @Override
+ public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
+ return activeServiceContext.getRMDelegatedNodeLabelsUpdater();
+ }
+
+ @Override
+ public void setRMDelegatedNodeLabelsUpdater(
+ RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater) {
+ activeServiceContext.setRMDelegatedNodeLabelsUpdater(
+ delegatedNodeLabelsUpdater);
+ }
+
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index d1f339a1a8..b38f188838 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -440,6 +441,13 @@ protected void serviceInit(Configuration configuration) throws Exception {
addService(nlm);
rmContext.setNodeLabelManager(nlm);
+ RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
+ createRMDelegatedNodeLabelsUpdater();
+ if (delegatedNodeLabelsUpdater != null) {
+ addService(delegatedNodeLabelsUpdater);
+ rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater);
+ }
+
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
@@ -1113,6 +1121,20 @@ protected RMSecretManagerService createRMSecretManagerService() {
return new RMSecretManagerService(conf, rmContext);
}
+ /**
+ * Create RMDelegatedNodeLabelsUpdater based on configuration.
+ */
+ protected RMDelegatedNodeLabelsUpdater createRMDelegatedNodeLabelsUpdater() {
+ if (conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED,
+ YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED)
+ && YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(
+ conf)) {
+ return new RMDelegatedNodeLabelsUpdater(rmContext);
+ } else {
+ return null;
+ }
+ }
+
@Private
public ClientRMService getClientRMService() {
return this.clientRM;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 248cdc60c9..d6c78837a4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -22,7 +22,6 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@@ -43,7 +42,6 @@
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -63,6 +61,7 @@
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@@ -105,6 +104,7 @@ public class ResourceTrackerService extends AbstractService implements
private int minAllocVcores;
private boolean isDistributedNodeLabelsConf;
+ private boolean isDelegatedCentralizedNodeLabelsConf;
public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
@@ -151,6 +151,8 @@ protected void serviceInit(Configuration conf) throws Exception {
isDistributedNodeLabelsConf =
YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
+ isDelegatedCentralizedNodeLabelsConf = YarnConfiguration
+ .isDelegatedCentralizedNodeLabelConfiguration(conf);
super.serviceInit(conf);
}
@@ -241,17 +243,6 @@ void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) {
}
}
- static Set convertToStringSet(Set nodeLabels) {
- if (null == nodeLabels) {
- return null;
- }
- Set labels = new HashSet();
- for (NodeLabel label : nodeLabels) {
- labels.add(label.getName());
- }
- return labels;
- }
-
@SuppressWarnings("unchecked")
@Override
public RegisterNodeManagerResponse registerNodeManager(
@@ -353,7 +344,8 @@ public RegisterNodeManagerResponse registerNodeManager(
}
// Update node's labels to RM's NodeLabelManager.
- Set nodeLabels = convertToStringSet(request.getNodeLabels());
+ Set nodeLabels = NodeLabelsUtils.convertToStringSet(
+ request.getNodeLabels());
if (isDistributedNodeLabelsConf && nodeLabels != null) {
try {
updateNodeLabelsFromNMReport(nodeLabels, nodeId);
@@ -363,6 +355,8 @@ public RegisterNodeManagerResponse registerNodeManager(
response.setDiagnosticsMessage(ex.getMessage());
response.setAreNodeLabelsAcceptedByRM(false);
}
+ } else if (isDelegatedCentralizedNodeLabelsConf) {
+ this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId);
}
StringBuilder message = new StringBuilder();
@@ -480,7 +474,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) {
try {
updateNodeLabelsFromNMReport(
- convertToStringSet(request.getNodeLabels()), nodeId);
+ NodeLabelsUtils.convertToStringSet(request.getNodeLabels()),
+ nodeId);
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
} catch (IOException ex) {
//ensure the error message is captured and sent across in response
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java
new file mode 100644
index 0000000000..1645d13836
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+
+/**
+ * Node labels utilities.
+ */
+public final class NodeLabelsUtils {
+ private static final Log LOG = LogFactory.getLog(NodeLabelsUtils.class);
+
+ private NodeLabelsUtils() { /* Hidden constructor */ }
+
+ public static Set convertToStringSet(Set nodeLabels) {
+ if (null == nodeLabels) {
+ return null;
+ }
+ Set labels = new HashSet();
+ for (NodeLabel label : nodeLabels) {
+ labels.add(label.getName());
+ }
+ return labels;
+ }
+
+ public static void verifyCentralizedNodeLabelConfEnabled(String operation,
+ boolean isCentralizedNodeLabelConfiguration) throws IOException {
+ if (!isCentralizedNodeLabelConfiguration) {
+ String msg =
+ String.format("Error when invoke method=%s because "
+ + "centralized node label configuration is not enabled.",
+ operation);
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMDelegatedNodeLabelsUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMDelegatedNodeLabelsUpdater.java
new file mode 100644
index 0000000000..0c062117b7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMDelegatedNodeLabelsUpdater.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Update nodes labels map for Resource Manager periodically. It collects
+ * nodes labels from {@link RMNodeLabelsMappingProvider} and updates the
+ * nodes -> labels map via {@link RMNodeLabelsManager}. This service is
+ * enabled when configuration "yarn.node-labels.configuration-type" is
+ * set to "delegated-centralized".
+ */
+public class RMDelegatedNodeLabelsUpdater extends CompositeService {
+
+ private static final Log LOG = LogFactory
+ .getLog(RMDelegatedNodeLabelsUpdater.class);
+
+ public static final long DISABLE_DELEGATED_NODE_LABELS_UPDATE = -1;
+
+ // Timer used to schedule node labels fetching
+ private Timer nodeLabelsScheduler;
+ // 30 seconds
+ @VisibleForTesting
+ public long nodeLabelsUpdateInterval = 30 * 1000;
+
+ private Set newlyRegisteredNodes = new HashSet();
+ // Lock to protect newlyRegisteredNodes
+ private Object lock = new Object();
+ private long lastAllNodesLabelUpdateMills = 0L;
+ private long allNodesLabelUpdateInterval;
+
+ private RMNodeLabelsMappingProvider rmNodeLabelsMappingProvider;
+
+ private RMContext rmContext;
+
+ public RMDelegatedNodeLabelsUpdater(RMContext rmContext) {
+ super("RMDelegatedNodeLabelsUpdater");
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ allNodesLabelUpdateInterval = conf.getLong(
+ YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS);
+ rmNodeLabelsMappingProvider = createRMNodeLabelsMappingProvider(conf);
+ addService(rmNodeLabelsMappingProvider);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ nodeLabelsScheduler = new Timer(
+ "RMDelegatedNodeLabelsUpdater-Timer", true);
+ TimerTask delegatedNodeLabelsUpdaterTimerTask =
+ new RMDelegatedNodeLabelsUpdaterTimerTask();
+ nodeLabelsScheduler.scheduleAtFixedRate(
+ delegatedNodeLabelsUpdaterTimerTask,
+ nodeLabelsUpdateInterval,
+ nodeLabelsUpdateInterval);
+
+ super.serviceStart();
+ }
+
+ /**
+ * Terminate the timer.
+ * @throws Exception
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ if (nodeLabelsScheduler != null) {
+ nodeLabelsScheduler.cancel();
+ }
+ super.serviceStop();
+ }
+
+ private class RMDelegatedNodeLabelsUpdaterTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ Set nodesToUpdateLabels = null;
+ boolean isUpdatingAllNodes = false;
+
+ if (allNodesLabelUpdateInterval != DISABLE_DELEGATED_NODE_LABELS_UPDATE) {
+ long elapsedTimeSinceLastUpdate =
+ System.currentTimeMillis() - lastAllNodesLabelUpdateMills;
+ if (elapsedTimeSinceLastUpdate > allNodesLabelUpdateInterval) {
+ nodesToUpdateLabels =
+ Collections.unmodifiableSet(rmContext.getRMNodes().keySet());
+ isUpdatingAllNodes = true;
+ }
+ }
+
+ if (nodesToUpdateLabels == null && !newlyRegisteredNodes.isEmpty()) {
+ synchronized (lock) {
+ if (!newlyRegisteredNodes.isEmpty()) {
+ nodesToUpdateLabels = new HashSet(newlyRegisteredNodes);
+ }
+ }
+ }
+
+ try {
+ if (nodesToUpdateLabels != null && !nodesToUpdateLabels.isEmpty()) {
+ updateNodeLabelsInternal(nodesToUpdateLabels);
+ if (isUpdatingAllNodes) {
+ lastAllNodesLabelUpdateMills = System.currentTimeMillis();
+ }
+ synchronized (lock) {
+ newlyRegisteredNodes.removeAll(nodesToUpdateLabels);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to update node Labels", e);
+ }
+ }
+ }
+
+ private void updateNodeLabelsInternal(Set nodes)
+ throws IOException {
+ Map> labelsUpdated =
+ rmNodeLabelsMappingProvider.getNodeLabels(nodes);
+ if (labelsUpdated != null && labelsUpdated.size() != 0) {
+ Map> nodeToLabels =
+ new HashMap>(labelsUpdated.size());
+ for (Map.Entry> entry
+ : labelsUpdated.entrySet()) {
+ nodeToLabels.put(entry.getKey(),
+ NodeLabelsUtils.convertToStringSet(entry.getValue()));
+ }
+ rmContext.getNodeLabelManager().replaceLabelsOnNode(nodeToLabels);
+ }
+ }
+
+ /**
+ * Get the RMNodeLabelsMappingProvider which is used to provide node labels.
+ */
+ private RMNodeLabelsMappingProvider createRMNodeLabelsMappingProvider(
+ Configuration conf) throws IOException {
+ RMNodeLabelsMappingProvider nodeLabelsMappingProvider = null;
+ try {
+ Class extends RMNodeLabelsMappingProvider> labelsProviderClass =
+ conf.getClass(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG,
+ null, RMNodeLabelsMappingProvider.class);
+ if (labelsProviderClass != null) {
+ nodeLabelsMappingProvider = labelsProviderClass.newInstance();
+ }
+ } catch (InstantiationException | IllegalAccessException
+ | RuntimeException e) {
+ LOG.error("Failed to create RMNodeLabelsMappingProvider based on"
+ + " Configuration", e);
+ throw new IOException("Failed to create RMNodeLabelsMappingProvider : "
+ + e.getMessage(), e);
+ }
+
+ if (nodeLabelsMappingProvider == null) {
+ String msg = "RMNodeLabelsMappingProvider should be configured when "
+ + "delegated-centralized node label configuration is enabled";
+ LOG.error(msg);
+ throw new IOException(msg);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("RM Node labels mapping provider class is : "
+ + nodeLabelsMappingProvider.getClass().toString());
+ }
+
+ return nodeLabelsMappingProvider;
+ }
+
+ /**
+ * Update node labels for a specified node.
+ * @param node the node to update node labels
+ */
+ public void updateNodeLabels(NodeId node) {
+ synchronized (lock) {
+ newlyRegisteredNodes.add(node);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsMappingProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsMappingProvider.java
new file mode 100644
index 0000000000..28bc54bad5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsMappingProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+
+/**
+ * Interface which is responsible for providing the node -> labels map.
+ */
+public abstract class RMNodeLabelsMappingProvider extends AbstractService {
+
+ public RMNodeLabelsMappingProvider(String name) {
+ super(name);
+ }
+
+ /**
+ * Provides the labels. It is expected to give same Labels
+ * continuously until there is a change in labels.
+ *
+ * @param nodes to fetch labels
+ * @return Set of node label strings applicable for a node
+ */
+ public abstract Map> getNodeLabels(Set nodes);
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 24100533fa..a0a6bda652 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -111,6 +111,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -177,7 +178,7 @@ public class RMWebServices {
private @Context HttpServletResponse response;
@VisibleForTesting
- boolean isDistributedNodeLabelConfiguration = false;
+ boolean isCentralizedNodeLabelConfiguration = true;
public final static String DELEGATION_TOKEN_HEADER =
"Hadoop-YARN-RM-Delegation-Token";
@@ -186,19 +187,8 @@ public class RMWebServices {
public RMWebServices(final ResourceManager rm, Configuration conf) {
this.rm = rm;
this.conf = conf;
- isDistributedNodeLabelConfiguration =
- YarnConfiguration.isDistributedNodeLabelConfiguration(conf);
- }
-
- private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation)
- throws IOException {
- if (isDistributedNodeLabelConfiguration) {
- String msg =
- String.format("Error when invoke method=%s because of "
- + "distributed node label configuration enabled.", operation);
- LOG.error(msg);
- throw new IOException(msg);
- }
+ isCentralizedNodeLabelConfiguration =
+ YarnConfiguration.isCentralizedNodeLabelConfiguration(conf);
}
RMWebServices(ResourceManager rm, Configuration conf,
@@ -892,7 +882,8 @@ private Response replaceLabelsOnNode(
String operation) throws IOException {
init();
- checkAndThrowIfDistributedNodeLabelConfEnabled("replaceLabelsOnNode");
+ NodeLabelsUtils.verifyCentralizedNodeLabelConfEnabled(
+ "replaceLabelsOnNode", isCentralizedNodeLabelConfiguration);
UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true);
if (callerUGI == null) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
index 7e75cfa0f2..e61d9fedd0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
@@ -872,12 +872,12 @@ public void testModifyLabelsOnNodesWithDistributedConfigurationDisabled()
}
@Test(expected = YarnException.class)
- public void testModifyLabelsOnNodesWithDistributedConfigurationEnabled()
+ public void testModifyLabelsOnNodesWithCentralizedConfigurationDisabled()
throws IOException, YarnException {
// create RM and set it's ACTIVE, and set distributed node label
// configuration to true
MockRM rm = new MockRM();
- rm.adminService.isDistributedNodeLabelConfiguration = true;
+ rm.adminService.isCentralizedNodeLabelConfiguration = false;
((RMContextImpl) rm.getRMContext())
.setHAServiceState(HAServiceState.ACTIVE);
@@ -893,14 +893,14 @@ public void testModifyLabelsOnNodesWithDistributedConfigurationEnabled()
}
@Test
- public void testRemoveClusterNodeLabelsWithDistributedConfigurationEnabled()
+ public void testRemoveClusterNodeLabelsWithCentralizedConfigurationDisabled()
throws IOException, YarnException {
// create RM and set it's ACTIVE
MockRM rm = new MockRM();
((RMContextImpl) rm.getRMContext())
.setHAServiceState(HAServiceState.ACTIVE);
RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager();
- rm.adminService.isDistributedNodeLabelConfiguration = true;
+ rm.adminService.isCentralizedNodeLabelConfiguration = false;
// by default, distributed configuration for node label is disabled, this
// should pass
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 94a0e4cc8c..e42ed91a8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -61,6 +61,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -362,7 +363,7 @@ protected RMNodeLabelsManager createNodeLabelManager() {
Assert.assertEquals("Action should be normal on valid Node Labels",
NodeAction.NORMAL, response.getNodeAction());
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
- ResourceTrackerService.convertToStringSet(registerReq.getNodeLabels()));
+ NodeLabelsUtils.convertToStringSet(registerReq.getNodeLabels()));
Assert.assertTrue("Valid Node Labels were not accepted by RM",
response.getAreNodeLabelsAcceptedByRM());
rm.stop();
@@ -590,7 +591,7 @@ protected RMNodeLabelsManager createNodeLabelManager() {
Assert.assertEquals("InValid Node Labels were not accepted by RM",
NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction());
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
- ResourceTrackerService.convertToStringSet(heartbeatReq.getNodeLabels()));
+ NodeLabelsUtils.convertToStringSet(heartbeatReq.getNodeLabels()));
Assert.assertTrue("Valid Node Labels were not accepted by RM",
nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMDelegatedNodeLabelsUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMDelegatedNodeLabelsUpdater.java
new file mode 100644
index 0000000000..928124d356
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMDelegatedNodeLabelsUpdater.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.YarnVersionInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+public class TestRMDelegatedNodeLabelsUpdater extends NodeLabelTestBase {
+ private YarnConfiguration conf;
+ private static Map> nodeLabelsMap = Maps.newHashMap();
+
+ @Before
+ public void setup() {
+ conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+ conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
+ YarnConfiguration.DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE);
+ conf.setClass(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG,
+ DummyRMNodeLabelsMappingProvider.class,
+ RMNodeLabelsMappingProvider.class);
+ }
+
+ @Test
+ public void testRMNodeLabelsMappingProviderConfiguration() {
+ conf.unset(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG);
+ try {
+ MockRM rm = new MockRM(conf);
+ rm.init(conf);
+ rm.start();
+ Assert.fail("Expected an exception");
+ } catch (Exception e) {
+ // expected an exception
+ Assert.assertTrue(e.getMessage().contains(
+ "RMNodeLabelsMappingProvider should be configured"));
+ }
+ }
+
+ @Test
+ public void testWithNodeLabelUpdateEnabled() throws Exception {
+ conf.setLong(YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
+ 1000);
+ MockRM rm = new MockRM(conf);
+ rm.init(conf);
+ rm.getRMContext().getRMDelegatedNodeLabelsUpdater().nodeLabelsUpdateInterval
+ = 3 * 1000;
+ rm.start();
+
+ RMNodeLabelsManager mgr = rm.getRMContext().getNodeLabelManager();
+ mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
+
+ NodeId nodeId = toNodeId("h1:1234");
+ assertEquals(0, mgr.getLabelsOnNode(nodeId).size());
+ updateNodeLabels(nodeId, "x");
+ registerNode(rm, nodeId);
+ Thread.sleep(4000);
+ assertCollectionEquals(ImmutableSet.of("x"), mgr.getLabelsOnNode(nodeId));
+
+ // Ensure that node labels are updated if NodeLabelsProvider
+ // gives different labels
+ updateNodeLabels(nodeId, "y");
+ Thread.sleep(4000);
+ assertCollectionEquals(ImmutableSet.of("y"), mgr.getLabelsOnNode(nodeId));
+
+ rm.stop();
+ }
+
+ @Test
+ public void testWithNodeLabelUpdateDisabled() throws Exception {
+ conf.setLong(YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS,
+ RMDelegatedNodeLabelsUpdater.DISABLE_DELEGATED_NODE_LABELS_UPDATE);
+ MockRM rm = new MockRM(conf);
+ rm.init(conf);
+ rm.getRMContext().getRMDelegatedNodeLabelsUpdater().nodeLabelsUpdateInterval
+ = 3 * 1000;
+ rm.start();
+
+ RMNodeLabelsManager mgr = rm.getRMContext().getNodeLabelManager();
+ mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
+
+ NodeId nodeId = toNodeId("h1:1234");
+ updateNodeLabels(nodeId, "x");
+ registerNode(rm, nodeId);
+ Thread.sleep(4000);
+ // Ensure that even though timer is not run, node labels are fetched
+ // when node is registered
+ assertCollectionEquals(ImmutableSet.of("x"), mgr.getLabelsOnNode(nodeId));
+
+ rm.stop();
+ }
+
+ private void registerNode(ResourceManager rm, NodeId nodeId)
+ throws YarnException, IOException {
+ ResourceTrackerService resourceTrackerService =
+ rm.getResourceTrackerService();
+ RegisterNodeManagerRequest req =
+ Records.newRecord(RegisterNodeManagerRequest.class);
+ Resource capability = BuilderUtils.newResource(1024, 1);
+ req.setResource(capability);
+ req.setNodeId(nodeId);
+ req.setHttpPort(1234);
+ req.setNMVersion(YarnVersionInfo.getVersion());
+ resourceTrackerService.registerNodeManager(req);
+ }
+
+ private void updateNodeLabels(NodeId nodeId, String... nodeLabelsStr) {
+ nodeLabelsMap.put(nodeId, toNodeLabelSet(nodeLabelsStr));
+ }
+
+ public static class DummyRMNodeLabelsMappingProvider extends
+ RMNodeLabelsMappingProvider {
+ public DummyRMNodeLabelsMappingProvider() {
+ super("DummyRMNodeLabelsMappingProvider");
+ }
+
+ @Override
+ public Map> getNodeLabels(Set nodes) {
+ Map> nodeLabels = Maps.newHashMap();
+ for(NodeId node : nodes) {
+ nodeLabels.put(node, nodeLabelsMap.get(node));
+ }
+ return nodeLabels;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java
index 53a99029cd..472dab6ac1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java
@@ -445,8 +445,8 @@ public void testNodeLabels() throws JSONException, Exception {
.post(ClientResponse.class);
LOG.info("posted node nodelabel");
- //setting rmWebService for Distributed NodeLabel Configuration
- rmWebService.isDistributedNodeLabelConfiguration = true;
+ //setting rmWebService for non Centralized NodeLabel Configuration
+ rmWebService.isCentralizedNodeLabelConfiguration = false;
// Case1 : Replace labels using node-to-labels
ntli = new NodeToLabelsEntryList();