YARN-998. Keep NM resource updated through dynamic resource config for RM/NM restart. Contributed by Junping Du

This commit is contained in:
Jian He 2016-03-28 11:12:33 -07:00
parent 8cac1bb09f
commit 524bc3c33a
4 changed files with 165 additions and 44 deletions

View File

@ -639,34 +639,32 @@ public RefreshNodesResourcesResponse refreshNodesResources(
try { try {
Configuration conf = getConfig(); Configuration conf = getConfig();
Configuration configuration = new Configuration(conf); Configuration configuration = new Configuration(conf);
DynamicResourceConfiguration newconf; DynamicResourceConfiguration newConf;
InputStream DRInputStream = InputStream drInputStream =
this.rmContext.getConfigurationProvider() this.rmContext.getConfigurationProvider().getConfigurationInputStream(
.getConfigurationInputStream(configuration, configuration, YarnConfiguration.DR_CONFIGURATION_FILE);
YarnConfiguration.DR_CONFIGURATION_FILE);
if (DRInputStream != null) { if (drInputStream != null) {
configuration.addResource(DRInputStream); newConf = new DynamicResourceConfiguration(configuration,
newconf = new DynamicResourceConfiguration(configuration, false); drInputStream);
} else { } else {
newconf = new DynamicResourceConfiguration(configuration, true); newConf = new DynamicResourceConfiguration(configuration);
} }
if (newconf.getNodes() == null || newconf.getNodes().length == 0) { if (newConf.getNodes() != null && newConf.getNodes().length != 0) {
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return response;
} else {
Map<NodeId, ResourceOption> nodeResourceMap = Map<NodeId, ResourceOption> nodeResourceMap =
newconf.getNodeResourceMap(); newConf.getNodeResourceMap();
UpdateNodeResourceRequest updateRequest = UpdateNodeResourceRequest updateRequest =
UpdateNodeResourceRequest.newInstance(nodeResourceMap); UpdateNodeResourceRequest.newInstance(nodeResourceMap);
updateNodeResource(updateRequest); updateNodeResource(updateRequest);
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return response;
} }
// refresh dynamic resource in ResourceTrackerService
this.rmContext.getResourceTrackerService().
updateDynamicResourceConfiguration(newConf);
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return response;
} catch (IOException ioe) { } catch (IOException ioe) {
throw logAndWrapException(ioe, user.getShortUserName(), argName, msg); throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
} }

View File

@ -21,6 +21,7 @@
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -62,6 +63,7 @@
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@ -105,6 +107,7 @@ public class ResourceTrackerService extends AbstractService implements
private boolean isDistributedNodeLabelsConf; private boolean isDistributedNodeLabelsConf;
private boolean isDelegatedCentralizedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf;
private volatile DynamicResourceConfiguration drConf;
public ResourceTrackerService(RMContext rmContext, public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager, NodesListManager nodesListManager,
@ -139,11 +142,11 @@ protected void serviceInit(Configuration conf) throws Exception {
} }
minAllocMb = conf.getInt( minAllocMb = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB); YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
minAllocVcores = conf.getInt( minAllocVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
minimumNodeManagerVersion = conf.get( minimumNodeManagerVersion = conf.get(
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION, YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
@ -156,9 +159,42 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf); YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
} }
loadDynamicResourceConfiguration(conf);
super.serviceInit(conf); super.serviceInit(conf);
} }
/**
* Load DynamicResourceConfiguration from dynamic-resources.xml.
* @param conf
* @throws IOException
*/
public void loadDynamicResourceConfiguration(Configuration conf)
throws IOException {
try {
// load dynamic-resources.xml
InputStream drInputStream = this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(conf,
YarnConfiguration.DR_CONFIGURATION_FILE);
if (drInputStream != null) {
this.drConf = new DynamicResourceConfiguration(conf, drInputStream);
} else {
this.drConf = new DynamicResourceConfiguration(conf);
}
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* Update DynamicResourceConfiguration with new configuration.
* @param conf
*/
public void updateDynamicResourceConfiguration(
DynamicResourceConfiguration conf) {
this.drConf = conf;
}
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
super.serviceStart(); super.serviceStart();
@ -166,15 +202,14 @@ protected void serviceStart() throws Exception {
// security is enabled, so no secretManager. // security is enabled, so no secretManager.
Configuration conf = getConfig(); Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf); YarnRPC rpc = YarnRPC.create(conf);
this.server = this.server = rpc.getServer(
rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress, ResourceTracker.class, this, resourceTrackerAddress, conf, null,
conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
// Enable service authorization? // Enable service authorization?
if (conf.getBoolean( if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) { false)) {
InputStream inputStream = InputStream inputStream =
this.rmContext.getConfigurationProvider() this.rmContext.getConfigurationProvider()
@ -185,12 +220,12 @@ protected void serviceStart() throws Exception {
} }
refreshServiceAcls(conf, RMPolicyProvider.getInstance()); refreshServiceAcls(conf, RMPolicyProvider.getInstance());
} }
this.server.start(); this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
server.getListenerAddress()); server.getListenerAddress());
} }
@Override @Override
@ -295,6 +330,19 @@ public RegisterNodeManagerResponse registerNodeManager(
return response; return response;
} }
// check if node's capacity is load from dynamic-resources.xml
String[] nodes = this.drConf.getNodes();
String nid = nodeId.toString();
if (nodes != null && Arrays.asList(nodes).contains(nid)) {
capability.setMemory(this.drConf.getMemoryPerNode(nid));
capability.setVirtualCores(this.drConf.getVcoresPerNode(nid));
if (LOG.isDebugEnabled()) {
LOG.debug("Resource for node: " + nid + " is adjusted to " +
capability + " due to settings in dynamic-resources.xml.");
}
}
// Check if this node has minimum allocations // Check if this node has minimum allocations
if (capability.getMemory() < minAllocMb if (capability.getMemory() < minAllocMb
|| capability.getVirtualCores() < minAllocVcores) { || capability.getVirtualCores() < minAllocVcores) {
@ -311,7 +359,7 @@ public RegisterNodeManagerResponse registerNodeManager(
response.setContainerTokenMasterKey(containerTokenSecretManager response.setContainerTokenMasterKey(containerTokenSecretManager
.getCurrentKey()); .getCurrentKey());
response.setNMTokenMasterKey(nmTokenSecretManager response.setNMTokenMasterKey(nmTokenSecretManager
.getCurrentKey()); .getCurrentKey());
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), capability, nodeManagerVersion); resolve(host), capability, nodeManagerVersion);

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.resource; package org.apache.hadoop.yarn.server.resourcemanager.resource;
import java.io.InputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -38,8 +40,6 @@ public class DynamicResourceConfiguration extends Configuration {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(DynamicResourceConfiguration.class); LogFactory.getLog(DynamicResourceConfiguration.class);
private static final String DR_CONFIGURATION_FILE = "dynamic-resources.xml";
@Private @Private
public static final String PREFIX = "yarn.resource.dynamic."; public static final String PREFIX = "yarn.resource.dynamic.";
@ -63,15 +63,14 @@ public DynamicResourceConfiguration() {
} }
public DynamicResourceConfiguration(Configuration configuration) { public DynamicResourceConfiguration(Configuration configuration) {
this(configuration, true); super(configuration);
addResource(YarnConfiguration.DR_CONFIGURATION_FILE);
} }
public DynamicResourceConfiguration(Configuration configuration, public DynamicResourceConfiguration(Configuration configuration,
boolean useLocalConfigurationProvider) { InputStream drInputStream) {
super(configuration); super(configuration);
if (useLocalConfigurationProvider) { addResource(drInputStream);
addResource(DR_CONFIGURATION_FILE);
}
} }
private String getNodePrefix(String node) { private String getNodePrefix(String node) {

View File

@ -27,7 +27,9 @@
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -202,7 +204,7 @@ public void testAdminRefreshNodesWithoutConfiguration()
} }
@Test @Test
public void testAdminRefreshNodesResourcesWithFileSystemBasedConfigurationProvider() public void testRefreshNodesResourceWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException { throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
@ -239,6 +241,75 @@ public void testAdminRefreshNodesResourcesWithFileSystemBasedConfigurationProvid
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString()); Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
} }
@Test
public void testResourcePersistentForNMRegistrationWithNewResource()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
//upload default configurations
uploadDefaultConfiguration();
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
rm.registerNode("h1:1234", 5120);
} catch(Exception ex) {
fail("Should not get any exceptions");
}
NodeId nid = ConverterUtils.toNodeId("h1:1234");
RMNode ni = rm.getRMContext().getRMNodes().get(nid);
Resource resource = ni.getTotalCapability();
Assert.assertEquals("<memory:5120, vCores:5>", resource.toString());
DynamicResourceConfiguration drConf =
new DynamicResourceConfiguration();
drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
uploadConfiguration(drConf, "dynamic-resources.xml");
rm.adminService.refreshNodesResources(
RefreshNodesResourcesRequest.newInstance());
try {
// register the same node again with a different resource.
// validate this won't work as resource cached in RM side.
rm.registerNode("h1:1234", 8192, 8);
} catch (Exception ex) {
fail("Should not get any exceptions");
}
RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
Resource resourceAfter = niAfter.getTotalCapability();
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
// Replace original dr file with an empty dr file, and validate node
// registration with new resources will take effective now.
deleteOnRemoteFileSystem("dynamic-resources.xml");
DynamicResourceConfiguration emptyDRConf =
new DynamicResourceConfiguration();
uploadConfiguration(emptyDRConf, "dynamic-resources.xml");
rm.adminService.refreshNodesResources(
RefreshNodesResourcesRequest.newInstance());
try {
// register the same node third time, this time the register resource
// should work.
rm.registerNode("h1:1234", 8192, 8);
} catch (Exception ex) {
fail("Should not get any exceptions");
}
niAfter = rm.getRMContext().getRMNodes().get(nid);
resourceAfter = niAfter.getTotalCapability();
// new resource in registration should take effective as we empty
// dynamic resource file already.
Assert.assertEquals("<memory:8192, vCores:8>", resourceAfter.toString());
}
@Test @Test
public void testAdminAclsWithLocalConfigurationProvider() { public void testAdminAclsWithLocalConfigurationProvider() {
rm = new MockRM(configuration); rm = new MockRM(configuration);
@ -1006,6 +1077,11 @@ private void uploadConfiguration(Configuration conf, String confFileName)
uploadToRemoteFileSystem(new Path(csConfFile)); uploadToRemoteFileSystem(new Path(csConfFile));
} }
private void deleteOnRemoteFileSystem(String fileName)
throws IOException {
fs.delete(new Path(workingPath, fileName));
}
private void uploadDefaultConfiguration() throws IOException { private void uploadDefaultConfiguration() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
uploadConfiguration(conf, "core-site.xml"); uploadConfiguration(conf, "core-site.xml");