YARN-3971. Skip RMNodeLabelsManager#checkRemoveFromClusterNodeLabelsOfQueue on nodelabel recovery. (addendum patch). Contributed by Bibin A chundatt
This commit is contained in:
parent
4b0f55b6ea
commit
b4078bd17b
@ -75,6 +75,8 @@ public class CommonNodeLabelsManager extends AbstractService {
|
||||
private static final Pattern LABEL_PATTERN = Pattern
|
||||
.compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*");
|
||||
public static final int WILDCARD_PORT = 0;
|
||||
// Flag to identify startup for removelabel
|
||||
private boolean initNodeLabelStoreInProgress = false;
|
||||
|
||||
/**
|
||||
* Error messages
|
||||
@ -226,6 +228,13 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
labelCollections.put(NO_LABEL, new RMNodeLabel(NO_LABEL));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the isStartup
|
||||
*/
|
||||
protected boolean isInitNodeLabelStoreInProgress() {
|
||||
return initNodeLabelStoreInProgress;
|
||||
}
|
||||
|
||||
boolean isCentralizedConfiguration() {
|
||||
return isCentralizedNodeLabelConfiguration;
|
||||
}
|
||||
@ -252,7 +261,9 @@ protected void startDispatcher() {
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
if (nodeLabelsEnabled) {
|
||||
setInitNodeLabelStoreInProgress(true);
|
||||
initNodeLabelStore(getConfig());
|
||||
setInitNodeLabelStoreInProgress(false);
|
||||
}
|
||||
|
||||
// init dispatcher only when service start, because recover will happen in
|
||||
@ -1083,4 +1094,9 @@ protected Map<NodeId, Set<String>> normalizeNodeIdToLabels(
|
||||
}
|
||||
return newMap;
|
||||
}
|
||||
|
||||
public void setInitNodeLabelStoreInProgress(
|
||||
boolean initNodeLabelStoreInProgress) {
|
||||
this.initNodeLabelStoreInProgress = initNodeLabelStoreInProgress;
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,6 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
@ -114,13 +113,13 @@ public void removeFromClusterNodeLabels(Collection<String> labelsToRemove)
|
||||
throws IOException {
|
||||
try {
|
||||
writeLock.lock();
|
||||
if (getServiceState() == Service.STATE.STARTED) {
|
||||
if (!isInitNodeLabelStoreInProgress()) {
|
||||
// We cannot remove node labels from collection when some queue(s) are
|
||||
// using any of them.
|
||||
// We will only do this check when service starting finished. Before
|
||||
// We will not do remove when recovery is in prpgress. During
|
||||
// service starting, we will replay edit logs and recover state. It is
|
||||
// possible that a history operation removed some labels which were being
|
||||
// used by some queues in the past but not used by current queues.
|
||||
// possible that a history operation removed some labels which were not
|
||||
// used by some queues in the past but are used by current queues.
|
||||
checkRemoveFromClusterNodeLabelsOfQueue(labelsToRemove);
|
||||
}
|
||||
// copy before NMs
|
||||
|
@ -33,7 +33,9 @@
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
@ -44,6 +46,7 @@
|
||||
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||
@ -606,16 +609,7 @@ public void testPullRMNodeLabelsInfo() throws IOException {
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testcheckRemoveFromClusterNodeLabelsOfQueue() throws Exception {
|
||||
class TestRMLabelManger extends RMNodeLabelsManager {
|
||||
@Override
|
||||
protected void checkRemoveFromClusterNodeLabelsOfQueue(
|
||||
Collection<String> labelsToRemove) throws IOException {
|
||||
checkQueueCall = true;
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
}
|
||||
lmgr = new TestRMLabelManger();
|
||||
lmgr = new RMNodeLabelsManager();
|
||||
Configuration conf = new Configuration();
|
||||
File tempDir = File.createTempFile("nlb", ".tmp");
|
||||
tempDir.delete();
|
||||
@ -624,23 +618,60 @@ protected void checkRemoveFromClusterNodeLabelsOfQueue(
|
||||
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
|
||||
tempDir.getAbsolutePath());
|
||||
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_SCHEDULER,
|
||||
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
|
||||
Configuration withQueueLabels = getConfigurationWithQueueLabels(conf);
|
||||
MockRM rm = initRM(conf);
|
||||
lmgr.addToCluserNodeLabels(toSet(NodeLabel.newInstance("x", false)));
|
||||
lmgr.removeFromClusterNodeLabels(Arrays.asList(new String[] { "x" }));
|
||||
lmgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("x"));
|
||||
rm.stop();
|
||||
class TestRMLabelManger extends RMNodeLabelsManager {
|
||||
@Override
|
||||
protected void checkRemoveFromClusterNodeLabelsOfQueue(
|
||||
Collection<String> labelsToRemove) throws IOException {
|
||||
checkQueueCall = true;
|
||||
// Do nothing
|
||||
}
|
||||
}
|
||||
lmgr = new TestRMLabelManger();
|
||||
MockRM rm2 = initRM(withQueueLabels);
|
||||
Assert.assertFalse(
|
||||
"checkRemoveFromClusterNodeLabelsOfQueue should not be called"
|
||||
+ "on recovery",
|
||||
checkQueueCall);
|
||||
lmgr.removeFromClusterNodeLabels(Arrays.asList(new String[] { "x" }));
|
||||
Assert
|
||||
.assertTrue("checkRemoveFromClusterNodeLabelsOfQueue should be called "
|
||||
+ "since its not recovery", checkQueueCall);
|
||||
rm2.stop();
|
||||
}
|
||||
|
||||
private MockRM initRM(Configuration conf) {
|
||||
MockRM rm = new MockRM(conf) {
|
||||
@Override
|
||||
public RMNodeLabelsManager createNodeLabelManager() {
|
||||
return lmgr;
|
||||
}
|
||||
};
|
||||
lmgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("a"));
|
||||
lmgr.removeFromClusterNodeLabels(Arrays.asList(new String[] { "a" }));
|
||||
rm.getRMContext().setNodeLabelManager(lmgr);
|
||||
rm.start();
|
||||
lmgr.addToCluserNodeLabelsWithDefaultExclusivity(toSet("a"));
|
||||
Assert.assertEquals(false, checkQueueCall);
|
||||
lmgr.removeFromClusterNodeLabels(Arrays.asList(new String[] { "a" }));
|
||||
Assert.assertEquals(true, checkQueueCall);
|
||||
lmgr.stop();
|
||||
lmgr.close();
|
||||
rm.stop();
|
||||
Assert.assertEquals(Service.STATE.STARTED, rm.getServiceState());
|
||||
return rm;
|
||||
}
|
||||
|
||||
private Configuration getConfigurationWithQueueLabels(Configuration config) {
|
||||
CapacitySchedulerConfiguration conf =
|
||||
new CapacitySchedulerConfiguration(config);
|
||||
// Define top-level queues
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a" });
|
||||
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
||||
|
||||
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
conf.setCapacity(A, 100);
|
||||
conf.setAccessibleNodeLabels(A, ImmutableSet.of("x"));
|
||||
conf.setCapacityByLabel(A, "x", 100);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
|
Loading…
Reference in New Issue
Block a user