YARN-2800. Remove MemoryNodeLabelsStore and add a way to enable/disable node labels feature. Contributed by Wangda Tan.

This commit is contained in:
Tsuyoshi Ozawa 2015-01-23 20:37:05 +09:00
parent 3aab354e66
commit 24aa462673
16 changed files with 152 additions and 37 deletions

View File

@ -197,6 +197,9 @@ Release 2.7.0 - UNRELEASED
YARN-2984. Metrics for container's actual memory usage. (kasha)
YARN-2800. Remove MemoryNodeLabelsStore and add a way to enable/disable
node labels feature. (Wangda Tan via ozawa)
OPTIMIZATIONS
BUG FIXES

View File

@ -1648,14 +1648,10 @@ private static void addDeprecatedKeys() {
public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
.name();
public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels.";
/**
* Class for RMNodeLabelsManager Please note this value should be consistent
* in client nodes and RM node(s)
* Node-labels configurations
*/
public static final String RM_NODE_LABELS_MANAGER_CLASS = NODE_LABELS_PREFIX
+ "manager-class";
public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels.";
/** URI for NodeLabelManager */
public static final String FS_NODE_LABELS_STORE_ROOT_DIR = NODE_LABELS_PREFIX
@ -1665,6 +1661,14 @@ private static void addDeprecatedKeys() {
public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC =
"2000, 500";
/**
* Flag to indicate if the node labels feature enabled, by default it's
* disabled
*/
public static final String NODE_LABELS_ENABLED = NODE_LABELS_PREFIX
+ "enabled";
public static final boolean DEFAULT_NODE_LABELS_ENABLED = false;
public YarnConfiguration() {
super();
}

View File

@ -80,6 +80,7 @@ protected void setupInternal(int numNodeManager) throws Exception {
conf.set("yarn.log.dir", "target");
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getName());
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
if (yarnCluster == null) {
yarnCluster =

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -52,6 +53,7 @@
import org.apache.hadoop.yarn.nodelabels.event.UpdateNodeToLabelsMappingsEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
public class CommonNodeLabelsManager extends AbstractService {
@ -65,6 +67,14 @@ public class CommonNodeLabelsManager extends AbstractService {
.compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*");
public static final int WILDCARD_PORT = 0;
/**
* Error messages
*/
@VisibleForTesting
public static final String NODE_LABELS_NOT_ENABLED_ERR =
"Node-label-based scheduling is disabled. Please check "
+ YarnConfiguration.NODE_LABELS_ENABLED;
/**
* If a user doesn't specify label of a queue or node, it belongs
* DEFAULT_LABEL
@ -82,6 +92,7 @@ public class CommonNodeLabelsManager extends AbstractService {
protected final WriteLock writeLock;
protected NodeLabelsStore store;
private boolean nodeLabelsEnabled = false;
/**
* A <code>Host</code> can have multiple <code>Node</code>s
@ -193,7 +204,13 @@ protected void initDispatcher(Configuration conf) {
@Override
protected void serviceInit(Configuration conf) throws Exception {
// set if node labels enabled
nodeLabelsEnabled =
conf.getBoolean(YarnConfiguration.NODE_LABELS_ENABLED,
YarnConfiguration.DEFAULT_NODE_LABELS_ENABLED);
if (nodeLabelsEnabled) {
initNodeLabelStore(conf);
}
labelCollections.put(NO_LABEL, new NodeLabel(NO_LABEL));
}
@ -250,6 +267,10 @@ protected void serviceStop() throws Exception {
*/
@SuppressWarnings("unchecked")
public void addToCluserNodeLabels(Set<String> labels) throws IOException {
if (!nodeLabelsEnabled) {
LOG.error(NODE_LABELS_NOT_ENABLED_ERR);
throw new IOException(NODE_LABELS_NOT_ENABLED_ERR);
}
if (null == labels || labels.isEmpty()) {
return;
}
@ -304,6 +325,10 @@ protected void checkAddLabelsToNode(
*/
public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode)
throws IOException {
if (!nodeLabelsEnabled) {
LOG.error(NODE_LABELS_NOT_ENABLED_ERR);
throw new IOException(NODE_LABELS_NOT_ENABLED_ERR);
}
addedLabelsToNode = normalizeNodeIdToLabels(addedLabelsToNode);
checkAddLabelsToNode(addedLabelsToNode);
internalUpdateLabelsOnNodes(addedLabelsToNode, NodeLabelUpdateOperation.ADD);
@ -370,6 +395,11 @@ protected void internalRemoveFromClusterNodeLabels(Collection<String> labelsToRe
*/
public void removeFromClusterNodeLabels(Collection<String> labelsToRemove)
throws IOException {
if (!nodeLabelsEnabled) {
LOG.error(NODE_LABELS_NOT_ENABLED_ERR);
throw new IOException(NODE_LABELS_NOT_ENABLED_ERR);
}
labelsToRemove = normalizeLabels(labelsToRemove);
checkRemoveFromClusterNodeLabels(labelsToRemove);
@ -532,6 +562,11 @@ protected void internalUpdateLabelsOnNodes(
public void
removeLabelsFromNode(Map<NodeId, Set<String>> removeLabelsFromNode)
throws IOException {
if (!nodeLabelsEnabled) {
LOG.error(NODE_LABELS_NOT_ENABLED_ERR);
throw new IOException(NODE_LABELS_NOT_ENABLED_ERR);
}
removeLabelsFromNode = normalizeNodeIdToLabels(removeLabelsFromNode);
checkRemoveLabelsFromNode(removeLabelsFromNode);
@ -567,6 +602,11 @@ protected void checkReplaceLabelsOnNode(
*/
public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode)
throws IOException {
if (!nodeLabelsEnabled) {
LOG.error(NODE_LABELS_NOT_ENABLED_ERR);
throw new IOException(NODE_LABELS_NOT_ENABLED_ERR);
}
replaceLabelsToNode = normalizeNodeIdToLabels(replaceLabelsToNode);
checkReplaceLabelsOnNode(replaceLabelsToNode);

View File

@ -26,6 +26,8 @@
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -41,7 +43,9 @@ public class TestCommonNodeLabelsManager extends NodeLabelTestBase {
@Before
public void before() {
mgr = new DummyCommonNodeLabelsManager();
mgr.init(new Configuration());
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
mgr.init(conf);
mgr.start();
}
@ -319,4 +323,58 @@ public void testReplaceLabelsOnHostsShouldUpdateNodesBelongTo()
toSet("p1"), toNodeId("n1:1"), toSet("p2"), toNodeId("n1:2"),
toSet("p1")));
}
private void assertNodeLabelsDisabledErrorMessage(IOException e) {
Assert.assertEquals(CommonNodeLabelsManager.NODE_LABELS_NOT_ENABLED_ERR,
e.getMessage());
}
@Test(timeout = 5000)
public void testNodeLabelsDisabled() throws IOException {
DummyCommonNodeLabelsManager mgr = new DummyCommonNodeLabelsManager();
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
mgr.init(conf);
mgr.start();
// add labels
try {
mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
} catch (IOException e) {
assertNodeLabelsDisabledErrorMessage(e);
}
// remove labels
try {
mgr.removeFromClusterNodeLabels(ImmutableSet.of("x"));
} catch (IOException e) {
assertNodeLabelsDisabledErrorMessage(e);
}
// add labels to node
try {
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("host", 0),
CommonNodeLabelsManager.EMPTY_STRING_SET));
} catch (IOException e) {
assertNodeLabelsDisabledErrorMessage(e);
}
// remove labels from node
try {
mgr.removeLabelsFromNode(ImmutableMap.of(NodeId.newInstance("host", 0),
CommonNodeLabelsManager.EMPTY_STRING_SET));
} catch (IOException e) {
assertNodeLabelsDisabledErrorMessage(e);
}
// replace labels on node
try {
mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("host", 0),
CommonNodeLabelsManager.EMPTY_STRING_SET));
} catch (IOException e) {
assertNodeLabelsDisabledErrorMessage(e);
}
mgr.close();
}
}

View File

@ -63,6 +63,7 @@ private FileSystemNodeLabelsStore getStore() {
public void before() throws IOException {
mgr = new MockNodeLabelManager();
conf = new Configuration();
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
File tempDir = File.createTempFile("nlb", ".tmp");
tempDir.delete();
tempDir.mkdirs();

View File

@ -59,7 +59,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
@ -334,10 +333,7 @@ protected AMLivelinessMonitor createAMLivelinessMonitor() {
protected RMNodeLabelsManager createNodeLabelManager()
throws InstantiationException, IllegalAccessException {
Class<? extends RMNodeLabelsManager> nlmCls =
conf.getClass(YarnConfiguration.RM_NODE_LABELS_MANAGER_CLASS,
MemoryRMNodeLabelsManager.class, RMNodeLabelsManager.class);
return nlmCls.newInstance();
return new RMNodeLabelsManager();
}
protected DelegationTokenRenewer createDelegationTokenRenewer() {

View File

@ -59,7 +59,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -115,7 +115,7 @@ public MockRM(Configuration conf, RMStateStore store) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new MemoryRMNodeLabelsManager();
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}

View File

@ -2073,6 +2073,7 @@ public void testRMRestartRecoveringNodeLabelManager() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
MockRM rm1 = new MockRM(conf, memStore) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {

View File

@ -25,9 +25,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore;
public class MemoryRMNodeLabelsManager extends RMNodeLabelsManager {
public class NullRMNodeLabelsManager extends RMNodeLabelsManager {
Map<NodeId, Set<String>> lastNodeToLabels = null;
Collection<String> lastAddedlabels = null;
Collection<String> lastRemovedlabels = null;
@ -79,4 +80,11 @@ protected void startDispatcher() {
protected void stopDispatcher() {
// do nothing
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
// always enable node labels while using MemoryRMNodeLabelsManager
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
super.serviceInit(conf);
}
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabel;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
@ -44,12 +45,14 @@ public class TestRMNodeLabelsManager extends NodeLabelTestBase {
private final Resource SMALL_RESOURCE = Resource.newInstance(100, 0);
private final Resource LARGE_NODE = Resource.newInstance(1000, 0);
MemoryRMNodeLabelsManager mgr = null;
NullRMNodeLabelsManager mgr = null;
@Before
public void before() {
mgr = new MemoryRMNodeLabelsManager();
mgr.init(new Configuration());
mgr = new NullRMNodeLabelsManager();
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
mgr.init(conf);
mgr.start();
}

View File

@ -84,7 +84,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@ -154,7 +154,7 @@ public void setUp() throws Exception {
resourceManager = new ResourceManager() {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new MemoryRMNodeLabelsManager();
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}
@ -1485,7 +1485,7 @@ public void testMoveAppViolateQueueState() throws Exception {
resourceManager = new ResourceManager() {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
RMNodeLabelsManager mgr = new MemoryRMNodeLabelsManager();
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(getConfig());
return mgr;
}

View File

@ -29,7 +29,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@ -54,7 +54,7 @@ public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mgr = new MemoryRMNodeLabelsManager();
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}

View File

@ -45,7 +45,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -81,7 +81,7 @@ public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mgr = new MemoryRMNodeLabelsManager();
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
@ -451,7 +451,7 @@ private Configuration getComplexConfigurationWithQueueLabels(
@Test(timeout = 300000)
public void testContainerAllocationWithSingleUserLimits() throws Exception {
final RMNodeLabelsManager mgr = new MemoryRMNodeLabelsManager();
final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
// set node -> label

View File

@ -26,7 +26,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@ -47,7 +47,7 @@ public class TestQueueParsing {
@Before
public void setup() {
nodeLabelManager = new MemoryRMNodeLabelsManager();
nodeLabelManager = new NullRMNodeLabelsManager();
nodeLabelManager.init(new YarnConfiguration());
nodeLabelManager.start();
}
@ -566,7 +566,7 @@ public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager()
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
nodeLabelsManager.init(conf);
nodeLabelsManager.start();
@ -594,7 +594,7 @@ public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager()
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
nodeLabelsManager.init(conf);
nodeLabelsManager.start();
@ -622,7 +622,7 @@ public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager()
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
nodeLabelsManager.init(conf);
nodeLabelsManager.start();
@ -649,7 +649,7 @@ public void testQueueParsingWhenLabelsNotExist() throws IOException {
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
nodeLabelsManager.init(conf);
nodeLabelsManager.start();

View File

@ -39,7 +39,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.MockAsm;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -179,7 +179,7 @@ public ConcurrentMap<NodeId, RMNode> getRMNodes() {
return nodesMap;
}
};
rmContext.setNodeLabelManager(new MemoryRMNodeLabelsManager());
rmContext.setNodeLabelManager(new NullRMNodeLabelsManager());
return rmContext;
}
@ -211,7 +211,7 @@ public static CapacityScheduler mockCapacityScheduler() throws IOException {
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(new MemoryRMNodeLabelsManager());
rmContext.setNodeLabelManager(new NullRMNodeLabelsManager());
cs.setRMContext(rmContext);
cs.init(conf);
return cs;