YARN-8606. Opportunistic scheduling does not work post RM failover. Contributed by Bibin A Chundatt.
This commit is contained in:
parent
5cc8e99147
commit
a48a0cc7fd
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -417,7 +418,8 @@ public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
|
|||||||
return nodeMonitor.getThresholdCalculator();
|
return nodeMonitor.getThresholdCalculator();
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized List<RemoteNode> getLeastLoadedNodes() {
|
@VisibleForTesting
|
||||||
|
synchronized List<RemoteNode> getLeastLoadedNodes() {
|
||||||
long currTime = System.currentTimeMillis();
|
long currTime = System.currentTimeMillis();
|
||||||
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
|
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
|
||||||
|| (cachedNodes == null)) {
|
|| (cachedNodes == null)) {
|
||||||
|
@ -757,9 +757,11 @@ protected void serviceInit(Configuration configuration) throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
masterService = createApplicationMasterService();
|
masterService = createApplicationMasterService();
|
||||||
|
createAndRegisterOpportunisticDispatcher(masterService);
|
||||||
addService(masterService) ;
|
addService(masterService) ;
|
||||||
rmContext.setApplicationMasterService(masterService);
|
rmContext.setApplicationMasterService(masterService);
|
||||||
|
|
||||||
|
|
||||||
applicationACLsManager = new ApplicationACLsManager(conf);
|
applicationACLsManager = new ApplicationACLsManager(conf);
|
||||||
|
|
||||||
queueACLsManager = createQueueACLsManager(scheduler, conf);
|
queueACLsManager = createQueueACLsManager(scheduler, conf);
|
||||||
@ -807,6 +809,23 @@ protected void serviceInit(Configuration configuration) throws Exception {
|
|||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createAndRegisterOpportunisticDispatcher(
|
||||||
|
ApplicationMasterService service) {
|
||||||
|
if (!isOpportunisticSchedulingEnabled(conf)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
EventDispatcher oppContainerAllocEventDispatcher = new EventDispatcher(
|
||||||
|
(OpportunisticContainerAllocatorAMService) service,
|
||||||
|
OpportunisticContainerAllocatorAMService.class.getName());
|
||||||
|
// Add an event dispatcher for the
|
||||||
|
// OpportunisticContainerAllocatorAMService to handle node
|
||||||
|
// additions, updates and removals. Since the SchedulerEvent is currently
|
||||||
|
// a super set of theses, we register interest for it.
|
||||||
|
addService(oppContainerAllocEventDispatcher);
|
||||||
|
rmDispatcher
|
||||||
|
.register(SchedulerEventType.class, oppContainerAllocEventDispatcher);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
RMStateStore rmStore = rmContext.getStateStore();
|
RMStateStore rmStore = rmContext.getStateStore();
|
||||||
@ -1335,8 +1354,7 @@ protected ClientRMService createClientRMService() {
|
|||||||
|
|
||||||
protected ApplicationMasterService createApplicationMasterService() {
|
protected ApplicationMasterService createApplicationMasterService() {
|
||||||
Configuration config = this.rmContext.getYarnConfiguration();
|
Configuration config = this.rmContext.getYarnConfiguration();
|
||||||
if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
|
if (isOpportunisticSchedulingEnabled(conf)) {
|
||||||
|| YarnConfiguration.isDistSchedulingEnabled(config)) {
|
|
||||||
if (YarnConfiguration.isDistSchedulingEnabled(config) &&
|
if (YarnConfiguration.isDistSchedulingEnabled(config) &&
|
||||||
!YarnConfiguration
|
!YarnConfiguration
|
||||||
.isOpportunisticContainerAllocationEnabled(config)) {
|
.isOpportunisticContainerAllocationEnabled(config)) {
|
||||||
@ -1348,16 +1366,6 @@ protected ApplicationMasterService createApplicationMasterService() {
|
|||||||
oppContainerAllocatingAMService =
|
oppContainerAllocatingAMService =
|
||||||
new OpportunisticContainerAllocatorAMService(this.rmContext,
|
new OpportunisticContainerAllocatorAMService(this.rmContext,
|
||||||
scheduler);
|
scheduler);
|
||||||
EventDispatcher oppContainerAllocEventDispatcher =
|
|
||||||
new EventDispatcher(oppContainerAllocatingAMService,
|
|
||||||
OpportunisticContainerAllocatorAMService.class.getName());
|
|
||||||
// Add an event dispatcher for the
|
|
||||||
// OpportunisticContainerAllocatorAMService to handle node
|
|
||||||
// additions, updates and removals. Since the SchedulerEvent is currently
|
|
||||||
// a super set of theses, we register interest for it.
|
|
||||||
addService(oppContainerAllocEventDispatcher);
|
|
||||||
rmDispatcher.register(SchedulerEventType.class,
|
|
||||||
oppContainerAllocEventDispatcher);
|
|
||||||
this.rmContext.setContainerQueueLimitCalculator(
|
this.rmContext.setContainerQueueLimitCalculator(
|
||||||
oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator());
|
oppContainerAllocatingAMService.getNodeManagerQueueLimitCalculator());
|
||||||
return oppContainerAllocatingAMService;
|
return oppContainerAllocatingAMService;
|
||||||
@ -1373,6 +1381,11 @@ protected RMSecretManagerService createRMSecretManagerService() {
|
|||||||
return new RMSecretManagerService(conf, rmContext);
|
return new RMSecretManagerService(conf, rmContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean isOpportunisticSchedulingEnabled(Configuration conf) {
|
||||||
|
return YarnConfiguration.isOpportunisticContainerAllocationEnabled(conf)
|
||||||
|
|| YarnConfiguration.isDistSchedulingEnabled(conf);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create RMDelegatedNodeLabelsUpdater based on configuration.
|
* Create RMDelegatedNodeLabelsUpdater based on configuration.
|
||||||
*/
|
*/
|
||||||
|
@ -18,6 +18,10 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -658,6 +662,46 @@ protected Dispatcher createDispatcher() {
|
|||||||
assertEquals(HAServiceState.STANDBY, rm.getRMContext().getHAServiceState());
|
assertEquals(HAServiceState.STANDBY, rm.getRMContext().getHAServiceState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOpportunisticAllocatorAfterFailover() throws Exception {
|
||||||
|
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||||
|
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
|
Configuration conf = new YarnConfiguration(configuration);
|
||||||
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
conf.setBoolean(
|
||||||
|
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
|
||||||
|
// 1. start RM
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.init(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||||
|
// 2. Transition to active
|
||||||
|
rm.adminService.transitionToActive(requestInfo);
|
||||||
|
// 3. Transition to standby
|
||||||
|
rm.adminService.transitionToStandby(requestInfo);
|
||||||
|
// 4. Transition to active
|
||||||
|
rm.adminService.transitionToActive(requestInfo);
|
||||||
|
|
||||||
|
MockNM nm1 = rm.registerNode("h1:1234", 8 * 1024);
|
||||||
|
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||||
|
rmNode1.getRMContext().getDispatcher().getEventHandler()
|
||||||
|
.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||||
|
OpportunisticContainerAllocatorAMService appMaster =
|
||||||
|
(OpportunisticContainerAllocatorAMService) rm.getRMContext()
|
||||||
|
.getApplicationMasterService();
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return appMaster.getLeastLoadedNodes().size() == 1;
|
||||||
|
}
|
||||||
|
}, 100, 3000);
|
||||||
|
rm.stop();
|
||||||
|
Assert.assertEquals(1, appMaster.getLeastLoadedNodes().size());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testResourceProfilesManagerAfterRMWentStandbyThenBackToActive()
|
public void testResourceProfilesManagerAfterRMWentStandbyThenBackToActive()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
Loading…
Reference in New Issue
Block a user