YARN-9051. Integrate multiple CustomResourceTypesConfigurationProvider implementations into one. (Contributed by Szilard Nemeth)
This commit is contained in:
parent
60af851e59
commit
881230da21
@ -28,9 +28,7 @@
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
@ -39,7 +37,9 @@
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
@ -91,7 +91,6 @@
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
@ -102,7 +101,6 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
@ -128,30 +126,6 @@ public FileStatus getFileStatus(Path f) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
private static class CustomResourceTypesConfigurationProvider
|
||||
extends LocalConfigurationProvider {
|
||||
|
||||
@Override
|
||||
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
||||
String name) throws YarnException, IOException {
|
||||
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
|
||||
return new ByteArrayInputStream(
|
||||
("<configuration>\n" +
|
||||
" <property>\n" +
|
||||
" <name>yarn.resource-types</name>\n" +
|
||||
" <value>a-custom-resource</value>\n" +
|
||||
" </property>\n" +
|
||||
" <property>\n" +
|
||||
" <name>yarn.resource-types.a-custom-resource.units</name>\n" +
|
||||
" <value>G</value>\n" +
|
||||
" </property>\n" +
|
||||
"</configuration>\n").getBytes());
|
||||
} else {
|
||||
return super.getConfigurationInputStream(bootstrapConf, name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestAppender extends AppenderSkeleton {
|
||||
|
||||
private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
|
||||
@ -1776,10 +1750,10 @@ public void testReducerCustomResourceTypeWithInvalidUnit() {
|
||||
}
|
||||
|
||||
private void initResourceTypes() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
CustomResourceTypesConfigurationProvider.class.getName());
|
||||
ResourceUtils.resetResourceTypes(conf);
|
||||
CustomResourceTypesConfigurationProvider.initResourceTypes(
|
||||
ImmutableMap.<String, String>builder()
|
||||
.put(CUSTOM_RESOURCE_NAME, "G")
|
||||
.build());
|
||||
}
|
||||
|
||||
private void setupTaskAttemptFinishingMonitor(
|
||||
|
@ -33,12 +33,10 @@
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
@ -48,6 +46,7 @@
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
@ -70,7 +69,6 @@
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||
@ -104,12 +102,12 @@
|
||||
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
|
||||
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.log4j.Appender;
|
||||
import org.apache.log4j.AppenderSkeleton;
|
||||
@ -143,30 +141,7 @@ public class TestYARNRunner {
|
||||
private static final String PROFILE_PARAMS =
|
||||
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0,
|
||||
MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%"));
|
||||
|
||||
private static class CustomResourceTypesConfigurationProvider
|
||||
extends LocalConfigurationProvider {
|
||||
|
||||
@Override
|
||||
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
||||
String name) throws YarnException, IOException {
|
||||
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
|
||||
return new ByteArrayInputStream(
|
||||
("<configuration>\n" +
|
||||
" <property>\n" +
|
||||
" <name>yarn.resource-types</name>\n" +
|
||||
" <value>a-custom-resource</value>\n" +
|
||||
" </property>\n" +
|
||||
" <property>\n" +
|
||||
" <name>yarn.resource-types.a-custom-resource.units</name>\n" +
|
||||
" <value>G</value>\n" +
|
||||
" </property>\n" +
|
||||
"</configuration>\n").getBytes());
|
||||
} else {
|
||||
return super.getConfigurationInputStream(bootstrapConf, name);
|
||||
}
|
||||
}
|
||||
}
|
||||
private static final String CUSTOM_RESOURCE_NAME = "a-custom-resource";
|
||||
|
||||
private static class TestAppender extends AppenderSkeleton {
|
||||
|
||||
@ -967,12 +942,11 @@ public void testSendJobConf() throws IOException {
|
||||
@Test
|
||||
public void testCustomAMRMResourceType() throws Exception {
|
||||
initResourceTypes();
|
||||
String customResourceName = "a-custom-resource";
|
||||
|
||||
JobConf jobConf = new JobConf();
|
||||
|
||||
jobConf.setInt(MRJobConfig.MR_AM_RESOURCE_PREFIX +
|
||||
customResourceName, 5);
|
||||
CUSTOM_RESOURCE_NAME, 5);
|
||||
jobConf.setInt(MRJobConfig.MR_AM_CPU_VCORES, 3);
|
||||
|
||||
yarnRunner = new YARNRunner(jobConf);
|
||||
@ -986,7 +960,7 @@ public void testCustomAMRMResourceType() throws Exception {
|
||||
ResourceRequest resourceRequest = resourceRequests.get(0);
|
||||
|
||||
ResourceInformation resourceInformation = resourceRequest.getCapability()
|
||||
.getResourceInformation(customResourceName);
|
||||
.getResourceInformation(CUSTOM_RESOURCE_NAME);
|
||||
Assert.assertEquals("Expecting the default unit (G)",
|
||||
"G", resourceInformation.getUnits());
|
||||
Assert.assertEquals(5L, resourceInformation.getValue());
|
||||
@ -1054,9 +1028,9 @@ public void testAMRMemoryRequestOverriding() throws Exception {
|
||||
}
|
||||
|
||||
private void initResourceTypes() {
|
||||
Configuration configuration = new Configuration();
|
||||
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
CustomResourceTypesConfigurationProvider.class.getName());
|
||||
ResourceUtils.resetResourceTypes(configuration);
|
||||
CustomResourceTypesConfigurationProvider.initResourceTypes(
|
||||
ImmutableMap.<String, String>builder()
|
||||
.put(CUSTOM_RESOURCE_NAME, "G")
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,186 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.util.resource;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
/**
|
||||
* This class can generate an XML configuration file of custom resource types.
|
||||
* See createInitial ResourceTypes for the default values. All custom resource
|
||||
* type is prefixed with CUSTOM_RESOURCE_PREFIX. Please use the
|
||||
* getConfigurationInputStream method to get an InputStream of the XML.
|
||||
*
|
||||
*/
|
||||
public class CustomResourceTypesConfigurationProvider
|
||||
extends LocalConfigurationProvider {
|
||||
|
||||
@Override
|
||||
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
||||
String name) throws YarnException, IOException {
|
||||
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
|
||||
return new ByteArrayInputStream(
|
||||
customResourceTypes.getXml().getBytes());
|
||||
} else {
|
||||
return super.getConfigurationInputStream(bootstrapConf, name);
|
||||
}
|
||||
}
|
||||
|
||||
private static final String CUSTOM_RESOURCE_PREFIX = "custom-resource-";
|
||||
private static final String UNIT_KILO = "k";
|
||||
private static CustomResourceTypes customResourceTypes =
|
||||
createCustomResourceTypes(2, UNIT_KILO);
|
||||
|
||||
public static void initResourceTypes(Map<String, String> resourcesWithUnits) {
|
||||
CustomResourceTypesConfigurationProvider.setResourceTypes(
|
||||
resourcesWithUnits);
|
||||
initResourceTypesInternal();
|
||||
}
|
||||
|
||||
public static void initResourceTypes(int count, String units) {
|
||||
CustomResourceTypesConfigurationProvider.setResourceTypes(count, units);
|
||||
initResourceTypesInternal();
|
||||
}
|
||||
|
||||
public static void initResourceTypes(String... resourceTypes) {
|
||||
// Initialize resource map
|
||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
|
||||
// Initialize mandatory resources
|
||||
riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
|
||||
riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);
|
||||
|
||||
for (String newResource : resourceTypes) {
|
||||
riMap.put(newResource, ResourceInformation
|
||||
.newInstance(newResource, "", 0, ResourceTypes.COUNTABLE, 0,
|
||||
Integer.MAX_VALUE));
|
||||
}
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
}
|
||||
|
||||
private static void initResourceTypesInternal() {
|
||||
Configuration configuration = new Configuration();
|
||||
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
CustomResourceTypesConfigurationProvider.class.getName());
|
||||
ResourceUtils.resetResourceTypes(configuration);
|
||||
}
|
||||
|
||||
private static CustomResourceTypes createCustomResourceTypes(
|
||||
int count, String units) {
|
||||
List<String> resourceNames = generateResourceTypeNames(count);
|
||||
Map<String, String> resourcesWithUnits = resourceNames.stream().collect(
|
||||
Collectors.toMap(e -> e, e -> units));
|
||||
return createCustomResourceTypes(resourcesWithUnits);
|
||||
}
|
||||
|
||||
private static CustomResourceTypes createCustomResourceTypes(
|
||||
Map<String, String> resourcesWithUnits) {
|
||||
int count = resourcesWithUnits.size();
|
||||
List<String> resourceNames = Lists.newArrayList(
|
||||
resourcesWithUnits.keySet());
|
||||
|
||||
List<String> resourceUnitXmlElements = IntStream.range(0, count)
|
||||
.boxed()
|
||||
.map(i -> getResourceUnitsXml(resourceNames.get(i),
|
||||
resourcesWithUnits.get(resourceNames.get(i))))
|
||||
.collect(toList());
|
||||
|
||||
StringBuilder sb = new StringBuilder("<configuration>\n");
|
||||
sb.append(getResourceTypesXml(resourceNames));
|
||||
|
||||
for (String resourceUnitXml : resourceUnitXmlElements) {
|
||||
sb.append(resourceUnitXml);
|
||||
|
||||
}
|
||||
sb.append("</configuration>");
|
||||
|
||||
return new CustomResourceTypes(sb.toString(), count);
|
||||
}
|
||||
|
||||
private static List<String> generateResourceTypeNames(int count) {
|
||||
return IntStream.range(0, count)
|
||||
.boxed()
|
||||
.map(i -> CUSTOM_RESOURCE_PREFIX + (i + 1))
|
||||
.collect(toList());
|
||||
}
|
||||
|
||||
private static String getResourceUnitsXml(String resource, String units) {
|
||||
return "<property>\n" +
|
||||
"<name>yarn.resource-types." + resource+ ".units</name>\n" +
|
||||
"<value>" + units + "</value>\n" +
|
||||
"</property>\n";
|
||||
}
|
||||
|
||||
private static String getResourceTypesXml(List<String> resources) {
|
||||
final String resourceTypes = String.join(",", resources);
|
||||
|
||||
return "<property>\n" +
|
||||
"<name>yarn.resource-types</name>\n" +
|
||||
"<value>" + resourceTypes + "</value>\n" + "</property>\n";
|
||||
}
|
||||
|
||||
public static void reset() {
|
||||
customResourceTypes = createCustomResourceTypes(2, UNIT_KILO);
|
||||
}
|
||||
|
||||
public static void setResourceTypes(int count, String units) {
|
||||
customResourceTypes = createCustomResourceTypes(count, units);
|
||||
}
|
||||
|
||||
public static void setResourceTypes(Map<String, String> resourcesWithUnits) {
|
||||
customResourceTypes = createCustomResourceTypes(resourcesWithUnits);
|
||||
}
|
||||
|
||||
public static List<String> getCustomResourceTypes() {
|
||||
return generateResourceTypeNames(customResourceTypes.getCount());
|
||||
}
|
||||
|
||||
private static class CustomResourceTypes {
|
||||
private int count;
|
||||
private String xml;
|
||||
|
||||
CustomResourceTypes(String xml, int count) {
|
||||
this.xml = xml;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public int getCount() {
|
||||
return count;
|
||||
}
|
||||
public String getXml() {
|
||||
return xml;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -54,23 +54,6 @@ public ResourceFileInformation(String name, int count) {
|
||||
}
|
||||
}
|
||||
|
||||
public static void addNewTypesToResources(String... resourceTypes) {
|
||||
// Initialize resource map
|
||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
|
||||
// Initialize mandatory resources
|
||||
riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
|
||||
riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);
|
||||
|
||||
for (String newResource : resourceTypes) {
|
||||
riMap.put(newResource, ResourceInformation
|
||||
.newInstance(newResource, "", 0, ResourceTypes.COUNTABLE, 0,
|
||||
Integer.MAX_VALUE));
|
||||
}
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
ResourceUtils.resetResourceTypes();
|
||||
|
@ -34,7 +34,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaDiscoverer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -61,7 +61,8 @@ public class TestFpgaResourceHandler {
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
TestResourceUtils.addNewTypesToResources(ResourceInformation.FPGA_URI);
|
||||
CustomResourceTypesConfigurationProvider.
|
||||
initResourceTypes(ResourceInformation.FPGA_URI);
|
||||
configuration = new YarnConfiguration();
|
||||
|
||||
mockCGroupsHandler = mock(CGroupsHandler.class);
|
||||
|
@ -40,7 +40,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -74,7 +74,8 @@ public class TestGpuResourceHandler {
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
TestResourceUtils.addNewTypesToResources(ResourceInformation.GPU_URI);
|
||||
CustomResourceTypesConfigurationProvider.
|
||||
initResourceTypes(ResourceInformation.GPU_URI);
|
||||
|
||||
mockCGroupsHandler = mock(CGroupsHandler.class);
|
||||
mockPrivilegedExecutor = mock(PrivilegedOperationExecutor.class);
|
||||
|
@ -18,11 +18,12 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
|
||||
import static org.apache.hadoop.yarn.exceptions
|
||||
.InvalidResourceRequestException.InvalidResourceType
|
||||
.GREATER_THEN_MAX_ALLOCATION;
|
||||
.InvalidResourceRequestException.InvalidResourceType
|
||||
.GREATER_THEN_MAX_ALLOCATION;
|
||||
import static org.apache.hadoop.yarn.exceptions
|
||||
.InvalidResourceRequestException.InvalidResourceType.LESS_THAN_ZERO;
|
||||
.InvalidResourceRequestException.InvalidResourceType.LESS_THAN_ZERO;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
@ -31,9 +32,7 @@
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Arrays;
|
||||
@ -50,7 +49,6 @@
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
@ -73,8 +71,7 @@
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException
|
||||
.InvalidResourceType;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
.InvalidResourceType;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
|
||||
@ -93,10 +90,10 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -108,41 +105,12 @@
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
|
||||
public class TestSchedulerUtils {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class);
|
||||
private static Resource configuredMaxAllocation;
|
||||
|
||||
private static class CustomResourceTypesConfigurationProvider
|
||||
extends LocalConfigurationProvider {
|
||||
|
||||
@Override
|
||||
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
||||
String name) throws YarnException, IOException {
|
||||
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
|
||||
return new ByteArrayInputStream(
|
||||
("<configuration>\n" +
|
||||
" <property>\n" +
|
||||
" <name>yarn.resource-types</name>\n" +
|
||||
" <value>custom-resource-1," +
|
||||
"custom-resource-2,custom-resource-3</value>\n" +
|
||||
" </property>\n" +
|
||||
" <property>\n" +
|
||||
" <name>yarn.resource-types" +
|
||||
".custom-resource-1.units</name>\n" +
|
||||
" <value>G</value>\n" +
|
||||
" </property>\n" +
|
||||
" <property>\n" +
|
||||
" <name>yarn.resource-types" +
|
||||
".custom-resource-2.units</name>\n" +
|
||||
" <value>G</value>\n" +
|
||||
" </property>\n" +
|
||||
"</configuration>\n").getBytes());
|
||||
} else {
|
||||
return super.getConfigurationInputStream(bootstrapConf, name);
|
||||
}
|
||||
}
|
||||
}
|
||||
private RMContext rmContext = getMockRMContext();
|
||||
|
||||
private static YarnConfiguration conf = new YarnConfiguration();
|
||||
@ -151,10 +119,7 @@ public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
private void initResourceTypes() {
|
||||
Configuration yarnConf = new Configuration();
|
||||
yarnConf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
CustomResourceTypesConfigurationProvider.class.getName());
|
||||
ResourceUtils.resetResourceTypes(yarnConf);
|
||||
CustomResourceTypesConfigurationProvider.initResourceTypes(3, "G");
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -61,10 +61,10 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -1010,7 +1010,8 @@ public void testUnreserveWhenClusterResourceHasEmptyResourceType()
|
||||
* After nm2 do next node heartbeat, scheduler should unreserve the reserved
|
||||
* container on nm1 then allocate a container on nm2.
|
||||
*/
|
||||
TestResourceUtils.addNewTypesToResources("resource1");
|
||||
CustomResourceTypesConfigurationProvider.
|
||||
initResourceTypes("resource1");
|
||||
CapacitySchedulerConfiguration newConf =
|
||||
(CapacitySchedulerConfiguration) TestUtils
|
||||
.getConfigurationWithMultipleQueues(conf);
|
||||
|
@ -31,9 +31,8 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
|
||||
import org.junit.Test;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
@ -44,7 +43,6 @@
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -210,7 +208,8 @@ public void testReload() throws Exception {
|
||||
@Test
|
||||
public void testAllocationFileParsing() throws Exception {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
TestResourceUtils.addNewTypesToResources(A_CUSTOM_RESOURCE);
|
||||
CustomResourceTypesConfigurationProvider.
|
||||
initResourceTypes(A_CUSTOM_RESOURCE);
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||
|
||||
|
@ -17,28 +17,17 @@
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration.parseResourceConfigValue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
@ -50,6 +39,14 @@
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration.parseResourceConfigValue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Tests fair scheduler configuration.
|
||||
*/
|
||||
@ -57,31 +54,6 @@ public class TestFairSchedulerConfiguration {
|
||||
|
||||
private static final String A_CUSTOM_RESOURCE = "a-custom-resource";
|
||||
|
||||
private static class CustomResourceTypesConfigurationProvider
|
||||
extends LocalConfigurationProvider {
|
||||
|
||||
@Override
|
||||
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
||||
String name) throws YarnException, IOException {
|
||||
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
|
||||
return new ByteArrayInputStream((
|
||||
"<configuration>\n" +
|
||||
" <property>\n" +
|
||||
" <name>yarn.resource-types</name>\n" +
|
||||
" <value>" + A_CUSTOM_RESOURCE + "</value>\n" +
|
||||
" </property>\n" +
|
||||
" <property>\n" +
|
||||
" <name>yarn.resource-types.a-custom-resource.units</name>\n" +
|
||||
" <value>k</value>\n" +
|
||||
" </property>\n" +
|
||||
"</configuration>\n")
|
||||
.getBytes());
|
||||
} else {
|
||||
return super.getConfigurationInputStream(bootstrapConf, name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestAppender extends AppenderSkeleton {
|
||||
|
||||
private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
|
||||
@ -637,7 +609,7 @@ public void testAllocationIncrementVCoreWithUnit() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllocationIncrementCustomResource() throws Exception {
|
||||
public void testAllocationIncrementCustomResource() {
|
||||
try {
|
||||
initResourceTypes();
|
||||
Configuration conf = new Configuration();
|
||||
@ -686,10 +658,10 @@ private ResourceInformation customResourceInformation(long value,
|
||||
}
|
||||
|
||||
private void initResourceTypes() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
CustomResourceTypesConfigurationProvider.class.getName());
|
||||
ResourceUtils.resetResourceTypes(conf);
|
||||
CustomResourceTypesConfigurationProvider.initResourceTypes(
|
||||
ImmutableMap.<String, String>builder()
|
||||
.put(A_CUSTOM_RESOURCE, "k")
|
||||
.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -31,7 +31,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.fairscheduler.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.helper.BufferedClientResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.helper.JsonCustomResourceTypeTestcase;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.helper.XmlCustomResourceTypeTestCase;
|
||||
|
@ -34,7 +34,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.fairscheduler.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.helper.BufferedClientResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.helper.JsonCustomResourceTypeTestcase;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.helper.XmlCustomResourceTypeTestCase;
|
||||
|
@ -22,8 +22,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.fairscheduler
|
||||
.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.helper.AppInfoJsonVerifications;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.helper.AppInfoXmlVerifications;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.helper.ResourceRequestsJsonVerifications;
|
||||
|
@ -1,140 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp.fairscheduler;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
/**
|
||||
* This class can generate an XML configuration file of custom resource types.
|
||||
* See createInitialResourceTypes for the default values. All custom resource
|
||||
* type is prefixed with CUSTOM_RESOURCE_PREFIX. Please use the
|
||||
* getConfigurationInputStream method to get an InputStream of the XML. If you
|
||||
* want to have different number of resources in your tests, please see usages
|
||||
* of this class in this test class:
|
||||
* {@link TestRMWebServicesFairSchedulerCustomResourceTypes}
|
||||
*
|
||||
*/
|
||||
public class CustomResourceTypesConfigurationProvider
|
||||
extends LocalConfigurationProvider {
|
||||
|
||||
private static class CustomResourceTypes {
|
||||
private int count;
|
||||
private String xml;
|
||||
|
||||
CustomResourceTypes(String xml, int count) {
|
||||
this.xml = xml;
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
public int getCount() {
|
||||
return count;
|
||||
}
|
||||
|
||||
public String getXml() {
|
||||
return xml;
|
||||
}
|
||||
}
|
||||
|
||||
private static final String CUSTOM_RESOURCE_PREFIX = "customResource-";
|
||||
|
||||
private static CustomResourceTypes customResourceTypes =
|
||||
createInitialResourceTypes();
|
||||
|
||||
private static CustomResourceTypes createInitialResourceTypes() {
|
||||
return createCustomResourceTypes(2);
|
||||
}
|
||||
|
||||
private static CustomResourceTypes createCustomResourceTypes(int count) {
|
||||
List<String> resourceTypeNames = generateResourceTypeNames(count);
|
||||
|
||||
List<String> resourceUnitXmlElements = IntStream.range(0, count)
|
||||
.boxed()
|
||||
.map(i -> getResourceUnitsXml(resourceTypeNames.get(i)))
|
||||
.collect(toList());
|
||||
|
||||
StringBuilder sb = new StringBuilder("<configuration>\n");
|
||||
sb.append(getResourceTypesXml(resourceTypeNames));
|
||||
|
||||
for (String resourceUnitXml : resourceUnitXmlElements) {
|
||||
sb.append(resourceUnitXml);
|
||||
|
||||
}
|
||||
sb.append("</configuration>");
|
||||
|
||||
return new CustomResourceTypes(sb.toString(), count);
|
||||
}
|
||||
|
||||
private static List<String> generateResourceTypeNames(int count) {
|
||||
return IntStream.range(0, count)
|
||||
.boxed()
|
||||
.map(i -> CUSTOM_RESOURCE_PREFIX + i)
|
||||
.collect(toList());
|
||||
}
|
||||
|
||||
private static String getResourceUnitsXml(String resource) {
|
||||
return "<property>\n" + "<name>yarn.resource-types." + resource
|
||||
+ ".units</name>\n" + "<value>k</value>\n" + "</property>\n";
|
||||
}
|
||||
|
||||
private static String getResourceTypesXml(List<String> resources) {
|
||||
final String resourceTypes = makeCommaSeparatedString(resources);
|
||||
|
||||
return "<property>\n" + "<name>yarn.resource-types</name>\n" + "<value>"
|
||||
+ resourceTypes + "</value>\n" + "</property>\n";
|
||||
}
|
||||
|
||||
private static String makeCommaSeparatedString(List<String> resources) {
|
||||
return resources.stream().collect(Collectors.joining(","));
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
||||
String name) throws YarnException, IOException {
|
||||
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
|
||||
return new ByteArrayInputStream(
|
||||
customResourceTypes.getXml().getBytes());
|
||||
} else {
|
||||
return super.getConfigurationInputStream(bootstrapConf, name);
|
||||
}
|
||||
}
|
||||
|
||||
public static void reset() {
|
||||
customResourceTypes = createInitialResourceTypes();
|
||||
}
|
||||
|
||||
public static void setNumberOfResourceTypes(int count) {
|
||||
customResourceTypes = createCustomResourceTypes(count);
|
||||
}
|
||||
|
||||
public static List<String> getCustomResourceTypes() {
|
||||
return generateResourceTypeNames(customResourceTypes.getCount());
|
||||
}
|
||||
}
|
@ -36,6 +36,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.helper.*;
|
||||
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||
@ -162,7 +163,7 @@ public void testClusterSchedulerWithCustomResourceTypesXml() {
|
||||
|
||||
@Test
|
||||
public void testClusterSchedulerWithElevenCustomResourceTypesXml() {
|
||||
CustomResourceTypesConfigurationProvider.setNumberOfResourceTypes(11);
|
||||
CustomResourceTypesConfigurationProvider.setResourceTypes(11, "k");
|
||||
createInjectorForWebServletModule();
|
||||
|
||||
FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
|
||||
@ -186,7 +187,7 @@ public void testClusterSchedulerWithElevenCustomResourceTypesXml() {
|
||||
|
||||
@Test
|
||||
public void testClusterSchedulerElevenWithCustomResourceTypesJson() {
|
||||
CustomResourceTypesConfigurationProvider.setNumberOfResourceTypes(11);
|
||||
CustomResourceTypesConfigurationProvider.setResourceTypes(11, "k");
|
||||
createInjectorForWebServletModule();
|
||||
|
||||
FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
|
||||
|
Loading…
Reference in New Issue
Block a user