YARN-5951. Changes to allow CapacityScheduler to use configuration store

This commit is contained in:
Jonathan Hung 2017-01-30 19:03:48 -08:00
parent 09ad848b64
commit ef59cbe08a
6 changed files with 170 additions and 22 deletions

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
@ -103,6 +102,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.CSConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.FileBasedCSConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
@ -167,6 +168,8 @@ public class CapacityScheduler extends
private int maxAssignPerHeartbeat; private int maxAssignPerHeartbeat;
private CSConfigurationProvider csConfProvider;
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
yarnConf = conf; yarnConf = conf;
@ -289,7 +292,18 @@ void initScheduler(Configuration configuration) throws
IOException { IOException {
try { try {
writeLock.lock(); writeLock.lock();
this.conf = loadCapacitySchedulerConfiguration(configuration); String confProviderStr = configuration.get(
CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
CapacitySchedulerConfiguration.DEFAULT_CS_CONF_PROVIDER);
if (confProviderStr.equals(
CapacitySchedulerConfiguration.FILE_CS_CONF_PROVIDER)) {
this.csConfProvider = new FileBasedCSConfigurationProvider(rmContext);
} else {
throw new IOException("Invalid CS configuration provider: " +
confProviderStr);
}
this.csConfProvider.init(configuration);
this.conf = this.csConfProvider.loadConfiguration(configuration);
validateConf(this.conf); validateConf(this.conf);
this.minimumAllocation = super.getMinimumAllocation(); this.minimumAllocation = super.getMinimumAllocation();
initMaximumResourceCapability(super.getMaximumAllocation()); initMaximumResourceCapability(super.getMaximumAllocation());
@ -399,7 +413,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext)
writeLock.lock(); writeLock.lock();
Configuration configuration = new Configuration(newConf); Configuration configuration = new Configuration(newConf);
CapacitySchedulerConfiguration oldConf = this.conf; CapacitySchedulerConfiguration oldConf = this.conf;
this.conf = loadCapacitySchedulerConfiguration(configuration); this.conf = csConfProvider.loadConfiguration(configuration);
validateConf(this.conf); validateConf(this.conf);
try { try {
LOG.info("Re-initializing queues..."); LOG.info("Re-initializing queues...");
@ -1831,23 +1845,6 @@ public boolean isSystemAppsLimitReached() {
return true; return true;
} }
private CapacitySchedulerConfiguration loadCapacitySchedulerConfiguration(
Configuration configuration) throws IOException {
try {
InputStream CSInputStream =
this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(configuration,
YarnConfiguration.CS_CONFIGURATION_FILE);
if (CSInputStream != null) {
configuration.addResource(CSInputStream);
return new CapacitySchedulerConfiguration(configuration, false);
}
return new CapacitySchedulerConfiguration(configuration, true);
} catch (Exception e) {
throw new IOException(e);
}
}
private String getDefaultReservationQueueName(String planQueueName) { private String getDefaultReservationQueueName(String planQueueName) {
return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX; return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
} }

View File

@ -316,6 +316,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private @Private
public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1; public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1;
public static final String CS_CONF_PROVIDER = PREFIX
+ "configuration.provider";
@Private
public static final String FILE_CS_CONF_PROVIDER = "file";
@Private
public static final String DEFAULT_CS_CONF_PROVIDER = FILE_CS_CONF_PROVIDER;
AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser(); AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
public CapacitySchedulerConfiguration() { public CapacitySchedulerConfiguration() {

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import java.io.IOException;
/**
* Configuration provider for {@link CapacityScheduler}.
*/
public interface CSConfigurationProvider {
/**
* Initialize the configuration provider with given conf.
* @param conf configuration to initialize with
*/
void init(Configuration conf);
/**
* Loads capacity scheduler configuration object.
* @param conf initial bootstrap configuration
* @return CS configuration
* @throws IOException if fail to retrieve configuration
*/
CapacitySchedulerConfiguration loadConfiguration(Configuration conf)
throws IOException;
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import java.io.IOException;
import java.io.InputStream;
/**
* {@link CapacityScheduler} configuration provider based on local
* {@code capacity-scheduler.xml} file.
*/
public class FileBasedCSConfigurationProvider implements
CSConfigurationProvider {
private RMContext rmContext;
/**
* Construct file based CS configuration provider with given context.
* @param rmContext the RM context
*/
public FileBasedCSConfigurationProvider(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public void init(Configuration conf) {}
@Override
public CapacitySchedulerConfiguration loadConfiguration(Configuration conf)
throws IOException {
try {
InputStream csInputStream =
this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(conf,
YarnConfiguration.CS_CONFIGURATION_FILE);
if (csInputStream != null) {
conf.addResource(csInputStream);
return new CapacitySchedulerConfiguration(conf, false);
}
return new CapacitySchedulerConfiguration(conf, true);
} catch (Exception e) {
throw new IOException(e);
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Package
* org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf
* contains classes related to capacity scheduler configuration management.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -241,13 +241,13 @@ private NodeManager registerNode(ResourceManager rm, String hostName,
@Test (timeout = 30000) @Test (timeout = 30000)
public void testConfValidation() throws Exception { public void testConfValidation() throws Exception {
ResourceScheduler scheduler = new CapacityScheduler(); CapacityScheduler scheduler = new CapacityScheduler();
scheduler.setRMContext(resourceManager.getRMContext()); scheduler.setRMContext(resourceManager.getRMContext());
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
try { try {
scheduler.reinitialize(conf, mockContext); scheduler.init(conf);
fail("Exception is expected because the min memory allocation is" + fail("Exception is expected because the min memory allocation is" +
" larger than the max memory allocation."); " larger than the max memory allocation.");
} catch (YarnRuntimeException e) { } catch (YarnRuntimeException e) {