From 524bc3c33aff301c1a8d60ed8e6a3b240e305045 Mon Sep 17 00:00:00 2001 From: Jian He Date: Mon, 28 Mar 2016 11:12:33 -0700 Subject: [PATCH] YARN-998. Keep NM resource updated through dynamic resource config for RM/NM restart. Contributed by Junping Du --- .../server/resourcemanager/AdminService.java | 38 +++++---- .../ResourceTrackerService.java | 80 +++++++++++++++---- .../DynamicResourceConfiguration.java | 13 ++- .../resourcemanager/TestRMAdminService.java | 78 +++++++++++++++++- 4 files changed, 165 insertions(+), 44 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index fbc2d6f068..fc530e3809 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -639,34 +639,32 @@ public RefreshNodesResourcesResponse refreshNodesResources( try { Configuration conf = getConfig(); Configuration configuration = new Configuration(conf); - DynamicResourceConfiguration newconf; + DynamicResourceConfiguration newConf; - InputStream DRInputStream = - this.rmContext.getConfigurationProvider() - .getConfigurationInputStream(configuration, - YarnConfiguration.DR_CONFIGURATION_FILE); - if (DRInputStream != null) { - configuration.addResource(DRInputStream); - newconf = new DynamicResourceConfiguration(configuration, false); + InputStream drInputStream = + this.rmContext.getConfigurationProvider().getConfigurationInputStream( + configuration, YarnConfiguration.DR_CONFIGURATION_FILE); + + if (drInputStream != null) { + newConf = new DynamicResourceConfiguration(configuration, + drInputStream); } else { - newconf = new DynamicResourceConfiguration(configuration, true); + newConf = new DynamicResourceConfiguration(configuration); } - if (newconf.getNodes() == null || newconf.getNodes().length == 0) { - RMAuditLogger.logSuccess(user.getShortUserName(), argName, - "AdminService"); - return response; - } else { + if (newConf.getNodes() != null && newConf.getNodes().length != 0) { Map nodeResourceMap = - newconf.getNodeResourceMap(); - + newConf.getNodeResourceMap(); UpdateNodeResourceRequest updateRequest = - UpdateNodeResourceRequest.newInstance(nodeResourceMap); + UpdateNodeResourceRequest.newInstance(nodeResourceMap); updateNodeResource(updateRequest); - RMAuditLogger.logSuccess(user.getShortUserName(), argName, - "AdminService"); - return response; } + // refresh dynamic resource in ResourceTrackerService + this.rmContext.getResourceTrackerService(). + updateDynamicResourceConfiguration(newConf); + RMAuditLogger.logSuccess(user.getShortUserName(), argName, + "AdminService"); + return response; } catch (IOException ioe) { throw logAndWrapException(ioe, user.getShortUserName(), argName, msg); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 902244b3be..b0bc565e6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -21,6 +21,7 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; +import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; @@ -105,6 +107,7 @@ public class ResourceTrackerService extends AbstractService implements private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; + private volatile DynamicResourceConfiguration drConf; public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, @@ -139,11 +142,11 @@ protected void serviceInit(Configuration conf) throws Exception { } minAllocMb = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); minAllocVcores = conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); minimumNodeManagerVersion = conf.get( YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, @@ -156,9 +159,42 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf); } + loadDynamicResourceConfiguration(conf); + super.serviceInit(conf); } + /** + * Load DynamicResourceConfiguration from dynamic-resources.xml. + * @param conf + * @throws IOException + */ + public void loadDynamicResourceConfiguration(Configuration conf) + throws IOException { + try { + // load dynamic-resources.xml + InputStream drInputStream = this.rmContext.getConfigurationProvider() + .getConfigurationInputStream(conf, + YarnConfiguration.DR_CONFIGURATION_FILE); + if (drInputStream != null) { + this.drConf = new DynamicResourceConfiguration(conf, drInputStream); + } else { + this.drConf = new DynamicResourceConfiguration(conf); + } + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * Update DynamicResourceConfiguration with new configuration. + * @param conf + */ + public void updateDynamicResourceConfiguration( + DynamicResourceConfiguration conf) { + this.drConf = conf; + } + @Override protected void serviceStart() throws Exception { super.serviceStart(); @@ -166,15 +202,14 @@ protected void serviceStart() throws Exception { // security is enabled, so no secretManager. Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); - this.server = - rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress, - conf, null, - conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT)); - + this.server = rpc.getServer( + ResourceTracker.class, this, resourceTrackerAddress, conf, null, + conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT)); + // Enable service authorization? if (conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { InputStream inputStream = this.rmContext.getConfigurationProvider() @@ -185,12 +220,12 @@ protected void serviceStart() throws Exception { } refreshServiceAcls(conf, RMPolicyProvider.getInstance()); } - + this.server.start(); conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, - YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, - YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, - server.getListenerAddress()); + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, + server.getListenerAddress()); } @Override @@ -295,6 +330,19 @@ public RegisterNodeManagerResponse registerNodeManager( return response; } + // check if node's capacity is load from dynamic-resources.xml + String[] nodes = this.drConf.getNodes(); + String nid = nodeId.toString(); + + if (nodes != null && Arrays.asList(nodes).contains(nid)) { + capability.setMemory(this.drConf.getMemoryPerNode(nid)); + capability.setVirtualCores(this.drConf.getVcoresPerNode(nid)); + if (LOG.isDebugEnabled()) { + LOG.debug("Resource for node: " + nid + " is adjusted to " + + capability + " due to settings in dynamic-resources.xml."); + } + } + // Check if this node has minimum allocations if (capability.getMemory() < minAllocMb || capability.getVirtualCores() < minAllocVcores) { @@ -311,7 +359,7 @@ public RegisterNodeManagerResponse registerNodeManager( response.setContainerTokenMasterKey(containerTokenSecretManager .getCurrentKey()); response.setNMTokenMasterKey(nmTokenSecretManager - .getCurrentKey()); + .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, resolve(host), capability, nodeManagerVersion); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java index dd37801a24..045c7bdc95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/DynamicResourceConfiguration.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.resource; +import java.io.InputStream; + import java.util.HashMap; import java.util.Map; @@ -38,8 +40,6 @@ public class DynamicResourceConfiguration extends Configuration { private static final Log LOG = LogFactory.getLog(DynamicResourceConfiguration.class); - private static final String DR_CONFIGURATION_FILE = "dynamic-resources.xml"; - @Private public static final String PREFIX = "yarn.resource.dynamic."; @@ -63,15 +63,14 @@ public DynamicResourceConfiguration() { } public DynamicResourceConfiguration(Configuration configuration) { - this(configuration, true); + super(configuration); + addResource(YarnConfiguration.DR_CONFIGURATION_FILE); } public DynamicResourceConfiguration(Configuration configuration, - boolean useLocalConfigurationProvider) { + InputStream drInputStream) { super(configuration); - if (useLocalConfigurationProvider) { - addResource(DR_CONFIGURATION_FILE); - } + addResource(drInputStream); } private String getNodePrefix(String node) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index 639b95586a..4513cbb337 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -27,7 +27,9 @@ import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -202,7 +204,7 @@ public void testAdminRefreshNodesWithoutConfiguration() } @Test - public void testAdminRefreshNodesResourcesWithFileSystemBasedConfigurationProvider() + public void testRefreshNodesResourceWithFileSystemBasedConfigurationProvider() throws IOException, YarnException { configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); @@ -239,6 +241,75 @@ public void testAdminRefreshNodesResourcesWithFileSystemBasedConfigurationProvid Assert.assertEquals("", resourceAfter.toString()); } + @Test + public void testResourcePersistentForNMRegistrationWithNewResource() + throws IOException, YarnException { + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + + //upload default configurations + uploadDefaultConfiguration(); + + try { + rm = new MockRM(configuration); + rm.init(configuration); + rm.start(); + rm.registerNode("h1:1234", 5120); + } catch(Exception ex) { + fail("Should not get any exceptions"); + } + + NodeId nid = ConverterUtils.toNodeId("h1:1234"); + RMNode ni = rm.getRMContext().getRMNodes().get(nid); + Resource resource = ni.getTotalCapability(); + Assert.assertEquals("", resource.toString()); + + DynamicResourceConfiguration drConf = + new DynamicResourceConfiguration(); + drConf.set("yarn.resource.dynamic.nodes", "h1:1234"); + drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4"); + drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096"); + uploadConfiguration(drConf, "dynamic-resources.xml"); + + rm.adminService.refreshNodesResources( + RefreshNodesResourcesRequest.newInstance()); + + try { + // register the same node again with a different resource. + // validate this won't work as resource cached in RM side. + rm.registerNode("h1:1234", 8192, 8); + } catch (Exception ex) { + fail("Should not get any exceptions"); + } + + RMNode niAfter = rm.getRMContext().getRMNodes().get(nid); + Resource resourceAfter = niAfter.getTotalCapability(); + Assert.assertEquals("", resourceAfter.toString()); + + // Replace original dr file with an empty dr file, and validate node + // registration with new resources will take effective now. + deleteOnRemoteFileSystem("dynamic-resources.xml"); + DynamicResourceConfiguration emptyDRConf = + new DynamicResourceConfiguration(); + + uploadConfiguration(emptyDRConf, "dynamic-resources.xml"); + rm.adminService.refreshNodesResources( + RefreshNodesResourcesRequest.newInstance()); + try { + // register the same node third time, this time the register resource + // should work. + rm.registerNode("h1:1234", 8192, 8); + } catch (Exception ex) { + fail("Should not get any exceptions"); + } + + niAfter = rm.getRMContext().getRMNodes().get(nid); + resourceAfter = niAfter.getTotalCapability(); + // new resource in registration should take effective as we empty + // dynamic resource file already. + Assert.assertEquals("", resourceAfter.toString()); + } + @Test public void testAdminAclsWithLocalConfigurationProvider() { rm = new MockRM(configuration); @@ -1006,6 +1077,11 @@ private void uploadConfiguration(Configuration conf, String confFileName) uploadToRemoteFileSystem(new Path(csConfFile)); } + private void deleteOnRemoteFileSystem(String fileName) + throws IOException { + fs.delete(new Path(workingPath, fileName)); + } + private void uploadDefaultConfiguration() throws IOException { Configuration conf = new Configuration(); uploadConfiguration(conf, "core-site.xml");