From db9304788187c700647c4d84caeb3b5ad6d868d8 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Sun, 11 Oct 2015 11:21:29 +0530 Subject: [PATCH] YARN-3964. Support NodeLabelsProvider at Resource Manager side. Contributed by Dian Fu. --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 40 +++- .../nodelabels/CommonNodeLabelsManager.java | 18 +- .../src/main/resources/yarn-default.xml | 28 ++- .../server/resourcemanager/AdminService.java | 26 +-- .../RMActiveServiceContext.java | 15 ++ .../server/resourcemanager/RMContext.java | 6 + .../server/resourcemanager/RMContextImpl.java | 13 ++ .../resourcemanager/ResourceManager.java | 22 ++ .../ResourceTrackerService.java | 25 +-- .../nodelabels/NodeLabelsUtils.java | 59 +++++ .../RMDelegatedNodeLabelsUpdater.java | 211 ++++++++++++++++++ .../RMNodeLabelsMappingProvider.java | 45 ++++ .../resourcemanager/webapp/RMWebServices.java | 21 +- .../resourcemanager/TestRMAdminService.java | 8 +- .../TestResourceTrackerService.java | 5 +- .../TestRMDelegatedNodeLabelsUpdater.java | 163 ++++++++++++++ .../webapp/TestRMWebServicesNodeLabels.java | 4 +- 18 files changed, 648 insertions(+), 64 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMDelegatedNodeLabelsUpdater.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsMappingProvider.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMDelegatedNodeLabelsUpdater.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index de8f678007..3762e45fa7 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -229,6 +229,9 @@ Release 2.8.0 - UNRELEASED YARN-261. Ability to fail AM attempts (Andrey Klochkov and Rohith Sharma K S via jlowe) + YARN-3964. Support NodeLabelsProvider at Resource Manager side. + (Dian Fu via devaraj) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1e102e5370..8d34f4ec69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1999,14 +1999,17 @@ private static void addDeprecatedKeys() { public static final String NODELABEL_CONFIGURATION_TYPE = NODE_LABELS_PREFIX + "configuration-type"; - public static final String CENTALIZED_NODELABEL_CONFIGURATION_TYPE = + public static final String CENTRALIZED_NODELABEL_CONFIGURATION_TYPE = "centralized"; - + + public static final String DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE = + "delegated-centralized"; + public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE = "distributed"; public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE = - CENTALIZED_NODELABEL_CONFIGURATION_TYPE; + CENTRALIZED_NODELABEL_CONFIGURATION_TYPE; public static final String MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY = YARN_PREFIX + "cluster.max-application-priority"; @@ -2019,6 +2022,20 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE)); } + @Private + public static boolean isCentralizedNodeLabelConfiguration( + Configuration conf) { + return CENTRALIZED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get( + NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE)); + } + + @Private + public static boolean isDelegatedCentralizedNodeLabelConfiguration( + Configuration conf) { + return DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE.equals(conf.get( + NODELABEL_CONFIGURATION_TYPE, DEFAULT_NODELABEL_CONFIGURATION_TYPE)); + } + private static final String NM_NODE_LABELS_PREFIX = NM_PREFIX + "node-labels."; @@ -2055,6 +2072,23 @@ public static boolean isDistributedNodeLabelConfiguration(Configuration conf) { public static final String NM_PROVIDER_CONFIGURED_NODE_LABELS = NM_NODE_LABELS_PROVIDER_PREFIX + "configured-node-labels"; + private static final String RM_NODE_LABELS_PREFIX = RM_PREFIX + + "node-labels."; + + public static final String RM_NODE_LABELS_PROVIDER_CONFIG = + RM_NODE_LABELS_PREFIX + "provider"; + + private static final String RM_NODE_LABELS_PROVIDER_PREFIX = + RM_NODE_LABELS_PREFIX + "provider."; + + //If -1 is configured then no timer task should be created + public static final String RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS = + RM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms"; + + //once in 30 mins + public static final long DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS = + 30 * 60 * 1000; + public static final String AM_BLACKLISTING_ENABLED = YARN_PREFIX + "am.blacklisting.enabled"; public static final boolean DEFAULT_AM_BLACKLISTING_ENABLED = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java index deec6ab0b8..ef5462f430 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java @@ -101,7 +101,7 @@ public class CommonNodeLabelsManager extends AbstractService { protected NodeLabelsStore store; private boolean nodeLabelsEnabled = false; - private boolean isDistributedNodeLabelConfiguration = false; + private boolean isCentralizedNodeLabelConfiguration = true; /** * A Host can have multiple Nodes @@ -220,16 +220,16 @@ protected void serviceInit(Configuration conf) throws Exception { conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED, YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED); - isDistributedNodeLabelConfiguration = - YarnConfiguration.isDistributedNodeLabelConfiguration(conf); - + isCentralizedNodeLabelConfiguration = + YarnConfiguration.isCentralizedNodeLabelConfiguration(conf); + labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL)); } protected void initNodeLabelStore(Configuration conf) throws Exception { this.store = new FileSystemNodeLabelsStore(this); this.store.init(conf); - this.store.recover(isDistributedNodeLabelConfiguration); + this.store.recover(!isCentralizedNodeLabelConfiguration); } // for UT purpose @@ -624,10 +624,14 @@ protected void internalUpdateLabelsOnNodes( } } - if (null != dispatcher && !isDistributedNodeLabelConfiguration) { - // In case of DistributedNodeLabelConfiguration, no need to save the the + if (null != dispatcher && isCentralizedNodeLabelConfiguration) { + // In case of DistributedNodeLabelConfiguration or + // DelegatedCentralizedNodeLabelConfiguration, no need to save the the // NodeLabels Mapping to the back-end store, as on RM restart/failover // NodeLabels are collected from NM through Register/Heartbeat again + // in case of DistributedNodeLabelConfiguration and collected from + // RMNodeLabelsMappingProvider in case of + // DelegatedCentralizedNodeLabelConfiguration dispatcher.getEventHandler().handle( new UpdateNodeToLabelsMappingsEvent(newNMToLabels)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 53793d2c47..e3d2c699d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2114,7 +2114,7 @@ Set configuration type for node labels. Administrators can specify - "centralized" or "distributed". + "centralized", "delegated-centralized" or "distributed". yarn.node-labels.configuration-type centralized @@ -2176,6 +2176,32 @@ yarn.nodemanager.node-labels.provider.fetch-timeout-ms 1200000 + + + + + When node labels "yarn.node-labels.configuration-type" is + of type "delegated-centralized", administrators should configure + the class for fetching node labels by ResourceManager. Configured + class needs to extend + org.apache.hadoop.yarn.server.resourcemanager.nodelabels. + RMNodeLabelsMappingProvider. + + yarn.resourcemanager.node-labels.provider + + + + + + When node labels "yarn.node-labels.configuration-type" is of type + "delegated-centralized" then periodically node labels are retrieved + from the node labels provider. This configuration is to define the + interval. If -1 is configured then node labels are retrieved from + provider only once for each node after it registers. Defaults to 30 mins. + + yarn.resourcemanager.node-labels.provider.fetch-interval-ms + 1800000 + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index ab46419388..353e72d626 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -120,7 +121,7 @@ public class AdminService extends CompositeService implements private UserGroupInformation daemonUser; @VisibleForTesting - boolean isDistributedNodeLabelConfiguration = false; + boolean isCentralizedNodeLabelConfiguration = true; public AdminService(ResourceManager rm, RMContext rmContext) { super(AdminService.class.getName()); @@ -151,8 +152,8 @@ public void serviceInit(Configuration conf) throws Exception { .getCurrentUser()); rmId = conf.get(YarnConfiguration.RM_HA_ID); - isDistributedNodeLabelConfiguration = - YarnConfiguration.isDistributedNodeLabelConfiguration(conf); + isCentralizedNodeLabelConfiguration = + YarnConfiguration.isCentralizedNodeLabelConfiguration(conf); super.serviceInit(conf); } @@ -745,7 +746,13 @@ public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( String operation = "replaceLabelsOnNode"; final String msg = "set node to labels."; - checkAndThrowIfDistributedNodeLabelConfEnabled(operation); + try { + NodeLabelsUtils.verifyCentralizedNodeLabelConfEnabled(operation, + isCentralizedNodeLabelConfiguration); + } catch (IOException ioe) { + throw RPCUtil.getRemoteException(ioe); + } + UserGroupInformation user = checkAcls(operation); checkRMStatus(user.getShortUserName(), operation, msg); @@ -780,17 +787,6 @@ private YarnException logAndWrapException(Exception exception, String user, return RPCUtil.getRemoteException(exception); } - private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation) - throws YarnException { - if (isDistributedNodeLabelConfiguration) { - String msg = - String.format("Error when invoke method=%s because of " - + "distributed node label configuration enabled.", operation); - LOG.error(msg); - throw RPCUtil.getRemoteException(new IOException(msg)); - } - } - @Override public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index c71323fcff..dd2153ac05 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -94,6 +95,7 @@ public class RMActiveServiceContext { private ResourceTrackerService resourceTrackerService; private ApplicationMasterService applicationMasterService; private RMNodeLabelsManager nodeLabelManager; + private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater; private long epoch; private Clock systemClock = new SystemClock(); private long schedulerRecoveryStartTime = 0; @@ -390,6 +392,19 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) { nodeLabelManager = mgr; } + @Private + @Unstable + public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() { + return rmDelegatedNodeLabelsUpdater; + } + + @Private + @Unstable + public void setRMDelegatedNodeLabelsUpdater( + RMDelegatedNodeLabelsUpdater nodeLablesUpdater) { + rmDelegatedNodeLabelsUpdater = nodeLablesUpdater; + } + @Private @Unstable public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index b64c83494e..9802a3796b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -118,6 +119,11 @@ void setRMApplicationHistoryWriter( public void setNodeLabelManager(RMNodeLabelsManager mgr); + RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater(); + + void setRMDelegatedNodeLabelsUpdater( + RMDelegatedNodeLabelsUpdater nodeLabelsUpdater); + long getEpoch(); ReservationSystem getReservationSystem(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 840cea7266..ed9942bce2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; @@ -400,6 +401,18 @@ public void setNodeLabelManager(RMNodeLabelsManager mgr) { activeServiceContext.setNodeLabelManager(mgr); } + @Override + public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() { + return activeServiceContext.getRMDelegatedNodeLabelsUpdater(); + } + + @Override + public void setRMDelegatedNodeLabelsUpdater( + RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater) { + activeServiceContext.setRMDelegatedNodeLabelsUpdater( + delegatedNodeLabelsUpdater); + } + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index d1f339a1a8..b38f188838 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -440,6 +441,13 @@ protected void serviceInit(Configuration configuration) throws Exception { addService(nlm); rmContext.setNodeLabelManager(nlm); + RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater = + createRMDelegatedNodeLabelsUpdater(); + if (delegatedNodeLabelsUpdater != null) { + addService(delegatedNodeLabelsUpdater); + rmContext.setRMDelegatedNodeLabelsUpdater(delegatedNodeLabelsUpdater); + } + boolean isRecoveryEnabled = conf.getBoolean( YarnConfiguration.RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); @@ -1113,6 +1121,20 @@ protected RMSecretManagerService createRMSecretManagerService() { return new RMSecretManagerService(conf, rmContext); } + /** + * Create RMDelegatedNodeLabelsUpdater based on configuration. + */ + protected RMDelegatedNodeLabelsUpdater createRMDelegatedNodeLabelsUpdater() { + if (conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED, + YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED) + && YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration( + conf)) { + return new RMDelegatedNodeLabelsUpdater(rmContext); + } else { + return null; + } + } + @Private public ClientRMService getClientRMService() { return this.clientRM; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 248cdc60c9..d6c78837a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -22,7 +22,6 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -43,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -63,6 +61,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; @@ -105,6 +104,7 @@ public class ResourceTrackerService extends AbstractService implements private int minAllocVcores; private boolean isDistributedNodeLabelsConf; + private boolean isDelegatedCentralizedNodeLabelsConf; public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, @@ -151,6 +151,8 @@ protected void serviceInit(Configuration conf) throws Exception { isDistributedNodeLabelsConf = YarnConfiguration.isDistributedNodeLabelConfiguration(conf); + isDelegatedCentralizedNodeLabelsConf = YarnConfiguration + .isDelegatedCentralizedNodeLabelConfiguration(conf); super.serviceInit(conf); } @@ -241,17 +243,6 @@ void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) { } } - static Set convertToStringSet(Set nodeLabels) { - if (null == nodeLabels) { - return null; - } - Set labels = new HashSet(); - for (NodeLabel label : nodeLabels) { - labels.add(label.getName()); - } - return labels; - } - @SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( @@ -353,7 +344,8 @@ public RegisterNodeManagerResponse registerNodeManager( } // Update node's labels to RM's NodeLabelManager. - Set nodeLabels = convertToStringSet(request.getNodeLabels()); + Set nodeLabels = NodeLabelsUtils.convertToStringSet( + request.getNodeLabels()); if (isDistributedNodeLabelsConf && nodeLabels != null) { try { updateNodeLabelsFromNMReport(nodeLabels, nodeId); @@ -363,6 +355,8 @@ public RegisterNodeManagerResponse registerNodeManager( response.setDiagnosticsMessage(ex.getMessage()); response.setAreNodeLabelsAcceptedByRM(false); } + } else if (isDelegatedCentralizedNodeLabelsConf) { + this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId); } StringBuilder message = new StringBuilder(); @@ -480,7 +474,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) if (isDistributedNodeLabelsConf && request.getNodeLabels() != null) { try { updateNodeLabelsFromNMReport( - convertToStringSet(request.getNodeLabels()), nodeId); + NodeLabelsUtils.convertToStringSet(request.getNodeLabels()), + nodeId); nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true); } catch (IOException ex) { //ensure the error message is captured and sent across in response diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java new file mode 100644 index 0000000000..1645d13836 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeLabelsUtils.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeLabel; + +/** + * Node labels utilities. + */ +public final class NodeLabelsUtils { + private static final Log LOG = LogFactory.getLog(NodeLabelsUtils.class); + + private NodeLabelsUtils() { /* Hidden constructor */ } + + public static Set convertToStringSet(Set nodeLabels) { + if (null == nodeLabels) { + return null; + } + Set labels = new HashSet(); + for (NodeLabel label : nodeLabels) { + labels.add(label.getName()); + } + return labels; + } + + public static void verifyCentralizedNodeLabelConfEnabled(String operation, + boolean isCentralizedNodeLabelConfiguration) throws IOException { + if (!isCentralizedNodeLabelConfiguration) { + String msg = + String.format("Error when invoke method=%s because " + + "centralized node label configuration is not enabled.", + operation); + LOG.error(msg); + throw new IOException(msg); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMDelegatedNodeLabelsUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMDelegatedNodeLabelsUpdater.java new file mode 100644 index 0000000000..0c062117b7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMDelegatedNodeLabelsUpdater.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Update nodes labels map for Resource Manager periodically. It collects + * nodes labels from {@link RMNodeLabelsMappingProvider} and updates the + * nodes -> labels map via {@link RMNodeLabelsManager}. This service is + * enabled when configuration "yarn.node-labels.configuration-type" is + * set to "delegated-centralized". + */ +public class RMDelegatedNodeLabelsUpdater extends CompositeService { + + private static final Log LOG = LogFactory + .getLog(RMDelegatedNodeLabelsUpdater.class); + + public static final long DISABLE_DELEGATED_NODE_LABELS_UPDATE = -1; + + // Timer used to schedule node labels fetching + private Timer nodeLabelsScheduler; + // 30 seconds + @VisibleForTesting + public long nodeLabelsUpdateInterval = 30 * 1000; + + private Set newlyRegisteredNodes = new HashSet(); + // Lock to protect newlyRegisteredNodes + private Object lock = new Object(); + private long lastAllNodesLabelUpdateMills = 0L; + private long allNodesLabelUpdateInterval; + + private RMNodeLabelsMappingProvider rmNodeLabelsMappingProvider; + + private RMContext rmContext; + + public RMDelegatedNodeLabelsUpdater(RMContext rmContext) { + super("RMDelegatedNodeLabelsUpdater"); + this.rmContext = rmContext; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + allNodesLabelUpdateInterval = conf.getLong( + YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS); + rmNodeLabelsMappingProvider = createRMNodeLabelsMappingProvider(conf); + addService(rmNodeLabelsMappingProvider); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + nodeLabelsScheduler = new Timer( + "RMDelegatedNodeLabelsUpdater-Timer", true); + TimerTask delegatedNodeLabelsUpdaterTimerTask = + new RMDelegatedNodeLabelsUpdaterTimerTask(); + nodeLabelsScheduler.scheduleAtFixedRate( + delegatedNodeLabelsUpdaterTimerTask, + nodeLabelsUpdateInterval, + nodeLabelsUpdateInterval); + + super.serviceStart(); + } + + /** + * Terminate the timer. + * @throws Exception + */ + @Override + protected void serviceStop() throws Exception { + if (nodeLabelsScheduler != null) { + nodeLabelsScheduler.cancel(); + } + super.serviceStop(); + } + + private class RMDelegatedNodeLabelsUpdaterTimerTask extends TimerTask { + @Override + public void run() { + Set nodesToUpdateLabels = null; + boolean isUpdatingAllNodes = false; + + if (allNodesLabelUpdateInterval != DISABLE_DELEGATED_NODE_LABELS_UPDATE) { + long elapsedTimeSinceLastUpdate = + System.currentTimeMillis() - lastAllNodesLabelUpdateMills; + if (elapsedTimeSinceLastUpdate > allNodesLabelUpdateInterval) { + nodesToUpdateLabels = + Collections.unmodifiableSet(rmContext.getRMNodes().keySet()); + isUpdatingAllNodes = true; + } + } + + if (nodesToUpdateLabels == null && !newlyRegisteredNodes.isEmpty()) { + synchronized (lock) { + if (!newlyRegisteredNodes.isEmpty()) { + nodesToUpdateLabels = new HashSet(newlyRegisteredNodes); + } + } + } + + try { + if (nodesToUpdateLabels != null && !nodesToUpdateLabels.isEmpty()) { + updateNodeLabelsInternal(nodesToUpdateLabels); + if (isUpdatingAllNodes) { + lastAllNodesLabelUpdateMills = System.currentTimeMillis(); + } + synchronized (lock) { + newlyRegisteredNodes.removeAll(nodesToUpdateLabels); + } + } + } catch (IOException e) { + LOG.error("Failed to update node Labels", e); + } + } + } + + private void updateNodeLabelsInternal(Set nodes) + throws IOException { + Map> labelsUpdated = + rmNodeLabelsMappingProvider.getNodeLabels(nodes); + if (labelsUpdated != null && labelsUpdated.size() != 0) { + Map> nodeToLabels = + new HashMap>(labelsUpdated.size()); + for (Map.Entry> entry + : labelsUpdated.entrySet()) { + nodeToLabels.put(entry.getKey(), + NodeLabelsUtils.convertToStringSet(entry.getValue())); + } + rmContext.getNodeLabelManager().replaceLabelsOnNode(nodeToLabels); + } + } + + /** + * Get the RMNodeLabelsMappingProvider which is used to provide node labels. + */ + private RMNodeLabelsMappingProvider createRMNodeLabelsMappingProvider( + Configuration conf) throws IOException { + RMNodeLabelsMappingProvider nodeLabelsMappingProvider = null; + try { + Class labelsProviderClass = + conf.getClass(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, + null, RMNodeLabelsMappingProvider.class); + if (labelsProviderClass != null) { + nodeLabelsMappingProvider = labelsProviderClass.newInstance(); + } + } catch (InstantiationException | IllegalAccessException + | RuntimeException e) { + LOG.error("Failed to create RMNodeLabelsMappingProvider based on" + + " Configuration", e); + throw new IOException("Failed to create RMNodeLabelsMappingProvider : " + + e.getMessage(), e); + } + + if (nodeLabelsMappingProvider == null) { + String msg = "RMNodeLabelsMappingProvider should be configured when " + + "delegated-centralized node label configuration is enabled"; + LOG.error(msg); + throw new IOException(msg); + } else if (LOG.isDebugEnabled()) { + LOG.debug("RM Node labels mapping provider class is : " + + nodeLabelsMappingProvider.getClass().toString()); + } + + return nodeLabelsMappingProvider; + } + + /** + * Update node labels for a specified node. + * @param node the node to update node labels + */ + public void updateNodeLabels(NodeId node) { + synchronized (lock) { + newlyRegisteredNodes.add(node); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsMappingProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsMappingProvider.java new file mode 100644 index 0000000000..28bc54bad5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsMappingProvider.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; + +/** + * Interface which is responsible for providing the node -> labels map. + */ +public abstract class RMNodeLabelsMappingProvider extends AbstractService { + + public RMNodeLabelsMappingProvider(String name) { + super(name); + } + + /** + * Provides the labels. It is expected to give same Labels + * continuously until there is a change in labels. + * + * @param nodes to fetch labels + * @return Set of node label strings applicable for a node + */ + public abstract Map> getNodeLabels(Set nodes); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 24100533fa..a0a6bda652 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -177,7 +178,7 @@ public class RMWebServices { private @Context HttpServletResponse response; @VisibleForTesting - boolean isDistributedNodeLabelConfiguration = false; + boolean isCentralizedNodeLabelConfiguration = true; public final static String DELEGATION_TOKEN_HEADER = "Hadoop-YARN-RM-Delegation-Token"; @@ -186,19 +187,8 @@ public class RMWebServices { public RMWebServices(final ResourceManager rm, Configuration conf) { this.rm = rm; this.conf = conf; - isDistributedNodeLabelConfiguration = - YarnConfiguration.isDistributedNodeLabelConfiguration(conf); - } - - private void checkAndThrowIfDistributedNodeLabelConfEnabled(String operation) - throws IOException { - if (isDistributedNodeLabelConfiguration) { - String msg = - String.format("Error when invoke method=%s because of " - + "distributed node label configuration enabled.", operation); - LOG.error(msg); - throw new IOException(msg); - } + isCentralizedNodeLabelConfiguration = + YarnConfiguration.isCentralizedNodeLabelConfiguration(conf); } RMWebServices(ResourceManager rm, Configuration conf, @@ -892,7 +882,8 @@ private Response replaceLabelsOnNode( String operation) throws IOException { init(); - checkAndThrowIfDistributedNodeLabelConfEnabled("replaceLabelsOnNode"); + NodeLabelsUtils.verifyCentralizedNodeLabelConfEnabled( + "replaceLabelsOnNode", isCentralizedNodeLabelConfiguration); UserGroupInformation callerUGI = getCallerUserGroupInformation(hsr, true); if (callerUGI == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index 7e75cfa0f2..e61d9fedd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -872,12 +872,12 @@ public void testModifyLabelsOnNodesWithDistributedConfigurationDisabled() } @Test(expected = YarnException.class) - public void testModifyLabelsOnNodesWithDistributedConfigurationEnabled() + public void testModifyLabelsOnNodesWithCentralizedConfigurationDisabled() throws IOException, YarnException { // create RM and set it's ACTIVE, and set distributed node label // configuration to true MockRM rm = new MockRM(); - rm.adminService.isDistributedNodeLabelConfiguration = true; + rm.adminService.isCentralizedNodeLabelConfiguration = false; ((RMContextImpl) rm.getRMContext()) .setHAServiceState(HAServiceState.ACTIVE); @@ -893,14 +893,14 @@ public void testModifyLabelsOnNodesWithDistributedConfigurationEnabled() } @Test - public void testRemoveClusterNodeLabelsWithDistributedConfigurationEnabled() + public void testRemoveClusterNodeLabelsWithCentralizedConfigurationDisabled() throws IOException, YarnException { // create RM and set it's ACTIVE MockRM rm = new MockRM(); ((RMContextImpl) rm.getRMContext()) .setHAServiceState(HAServiceState.ACTIVE); RMNodeLabelsManager labelMgr = rm.rmContext.getNodeLabelManager(); - rm.adminService.isDistributedNodeLabelConfiguration = true; + rm.adminService.isCentralizedNodeLabelConfiguration = false; // by default, distributed configuration for node label is disabled, this // should pass diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 94a0e4cc8c..e42ed91a8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -362,7 +363,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals("Action should be normal on valid Node Labels", NodeAction.NORMAL, response.getNodeAction()); assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId), - ResourceTrackerService.convertToStringSet(registerReq.getNodeLabels())); + NodeLabelsUtils.convertToStringSet(registerReq.getNodeLabels())); Assert.assertTrue("Valid Node Labels were not accepted by RM", response.getAreNodeLabelsAcceptedByRM()); rm.stop(); @@ -590,7 +591,7 @@ protected RMNodeLabelsManager createNodeLabelManager() { Assert.assertEquals("InValid Node Labels were not accepted by RM", NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction()); assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId), - ResourceTrackerService.convertToStringSet(heartbeatReq.getNodeLabels())); + NodeLabelsUtils.convertToStringSet(heartbeatReq.getNodeLabels())); Assert.assertTrue("Valid Node Labels were not accepted by RM", nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMDelegatedNodeLabelsUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMDelegatedNodeLabelsUpdater.java new file mode 100644 index 0000000000..928124d356 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMDelegatedNodeLabelsUpdater.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.nodelabels; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.YarnVersionInfo; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +public class TestRMDelegatedNodeLabelsUpdater extends NodeLabelTestBase { + private YarnConfiguration conf; + private static Map> nodeLabelsMap = Maps.newHashMap(); + + @Before + public void setup() { + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true); + conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE, + YarnConfiguration.DELEGATED_CENTALIZED_NODELABEL_CONFIGURATION_TYPE); + conf.setClass(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG, + DummyRMNodeLabelsMappingProvider.class, + RMNodeLabelsMappingProvider.class); + } + + @Test + public void testRMNodeLabelsMappingProviderConfiguration() { + conf.unset(YarnConfiguration.RM_NODE_LABELS_PROVIDER_CONFIG); + try { + MockRM rm = new MockRM(conf); + rm.init(conf); + rm.start(); + Assert.fail("Expected an exception"); + } catch (Exception e) { + // expected an exception + Assert.assertTrue(e.getMessage().contains( + "RMNodeLabelsMappingProvider should be configured")); + } + } + + @Test + public void testWithNodeLabelUpdateEnabled() throws Exception { + conf.setLong(YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, + 1000); + MockRM rm = new MockRM(conf); + rm.init(conf); + rm.getRMContext().getRMDelegatedNodeLabelsUpdater().nodeLabelsUpdateInterval + = 3 * 1000; + rm.start(); + + RMNodeLabelsManager mgr = rm.getRMContext().getNodeLabelManager(); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); + + NodeId nodeId = toNodeId("h1:1234"); + assertEquals(0, mgr.getLabelsOnNode(nodeId).size()); + updateNodeLabels(nodeId, "x"); + registerNode(rm, nodeId); + Thread.sleep(4000); + assertCollectionEquals(ImmutableSet.of("x"), mgr.getLabelsOnNode(nodeId)); + + // Ensure that node labels are updated if NodeLabelsProvider + // gives different labels + updateNodeLabels(nodeId, "y"); + Thread.sleep(4000); + assertCollectionEquals(ImmutableSet.of("y"), mgr.getLabelsOnNode(nodeId)); + + rm.stop(); + } + + @Test + public void testWithNodeLabelUpdateDisabled() throws Exception { + conf.setLong(YarnConfiguration.RM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS, + RMDelegatedNodeLabelsUpdater.DISABLE_DELEGATED_NODE_LABELS_UPDATE); + MockRM rm = new MockRM(conf); + rm.init(conf); + rm.getRMContext().getRMDelegatedNodeLabelsUpdater().nodeLabelsUpdateInterval + = 3 * 1000; + rm.start(); + + RMNodeLabelsManager mgr = rm.getRMContext().getNodeLabelManager(); + mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x")); + + NodeId nodeId = toNodeId("h1:1234"); + updateNodeLabels(nodeId, "x"); + registerNode(rm, nodeId); + Thread.sleep(4000); + // Ensure that even though timer is not run, node labels are fetched + // when node is registered + assertCollectionEquals(ImmutableSet.of("x"), mgr.getLabelsOnNode(nodeId)); + + rm.stop(); + } + + private void registerNode(ResourceManager rm, NodeId nodeId) + throws YarnException, IOException { + ResourceTrackerService resourceTrackerService = + rm.getResourceTrackerService(); + RegisterNodeManagerRequest req = + Records.newRecord(RegisterNodeManagerRequest.class); + Resource capability = BuilderUtils.newResource(1024, 1); + req.setResource(capability); + req.setNodeId(nodeId); + req.setHttpPort(1234); + req.setNMVersion(YarnVersionInfo.getVersion()); + resourceTrackerService.registerNodeManager(req); + } + + private void updateNodeLabels(NodeId nodeId, String... nodeLabelsStr) { + nodeLabelsMap.put(nodeId, toNodeLabelSet(nodeLabelsStr)); + } + + public static class DummyRMNodeLabelsMappingProvider extends + RMNodeLabelsMappingProvider { + public DummyRMNodeLabelsMappingProvider() { + super("DummyRMNodeLabelsMappingProvider"); + } + + @Override + public Map> getNodeLabels(Set nodes) { + Map> nodeLabels = Maps.newHashMap(); + for(NodeId node : nodes) { + nodeLabels.put(node, nodeLabelsMap.get(node)); + } + return nodeLabels; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java index 53a99029cd..472dab6ac1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodeLabels.java @@ -445,8 +445,8 @@ public void testNodeLabels() throws JSONException, Exception { .post(ClientResponse.class); LOG.info("posted node nodelabel"); - //setting rmWebService for Distributed NodeLabel Configuration - rmWebService.isDistributedNodeLabelConfiguration = true; + //setting rmWebService for non Centralized NodeLabel Configuration + rmWebService.isCentralizedNodeLabelConfiguration = false; // Case1 : Replace labels using node-to-labels ntli = new NodeToLabelsEntryList();