YARN-5587. Add support for resource profiles. (vvasudev via asuresh)

This commit is contained in:
Arun Suresh 2016-11-15 01:01:07 -08:00 committed by Wangda Tan
parent c2032e251e
commit 6708ac3301
36 changed files with 1103 additions and 176 deletions

View File

@ -153,6 +153,10 @@
<Class name="org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceRetentionSet$LRUComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl$ProfileCapabilityComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl" />
<Field name="builder" />

View File

@ -204,4 +204,12 @@ public abstract void setContainersFromPreviousAttempts(
@Unstable
public abstract void setSchedulerResourceTypes(
EnumSet<SchedulerResourceTypes> types);
@Public
@Unstable
public abstract Map<String, Resource> getResourceProfiles();
@Private
@Unstable
public abstract void setResourceProfiles(Map<String, Resource> profiles);
}

View File

@ -18,41 +18,93 @@
package org.apache.hadoop.yarn.api.records;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.util.Records;
import java.util.Map;
/**
* Class to capture capability requirements when using resource profiles. The
* ProfileCapability is meant to be used as part of the ResourceRequest. A
* profile capability has two pieces - the resource profile name and the
* overrides. The resource profile specifies the name of the resource profile
* to be used and the capability override is the overrides desired on specific
* resource types. For example, you could use the "minimum" profile and set the
* memory in the capability override to 4096M. This implies that you wish for
* the resources specified in the "minimum" profile but with 4096M memory. The
* conversion from the ProfileCapability to the Resource class with the actual
* resource requirements will be done by the ResourceManager, which has the
* actual profile to Resource mapping.
* resource types.
*
* For example, if you have a resource profile "small" that maps to
* {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to
* {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on
* the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}.
*
* Note that the conversion from the ProfileCapability to the Resource class
* with the actual resource requirements will be done by the ResourceManager,
* which has the actual profile to Resource mapping.
*
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract class ProfileCapability {
public static final String DEFAULT_PROFILE = "default";
public static ProfileCapability newInstance(Resource override) {
return newInstance(DEFAULT_PROFILE, override);
}
public static ProfileCapability newInstance(String profile) {
Preconditions
.checkArgument(profile != null, "The profile name cannot be null");
ProfileCapability obj = Records.newRecord(ProfileCapability.class);
obj.setProfileName(profile);
obj.setProfileCapabilityOverride(Resource.newInstance(0, 0));
return obj;
}
public static ProfileCapability newInstance(String profile,
Resource override) {
Preconditions
.checkArgument(profile != null, "The profile name cannot be null");
ProfileCapability obj = Records.newRecord(ProfileCapability.class);
obj.setProfileName(profile);
obj.setProfileCapabilityOverride(override);
return obj;
}
/**
* Get the profile name.
* @return the profile name
*/
public abstract String getProfileName();
/**
* Get the profile capability override.
* @return Resource object containing the override.
*/
public abstract Resource getProfileCapabilityOverride();
/**
* Set the resource profile name.
* @param profileName the resource profile name
*/
public abstract void setProfileName(String profileName);
/**
* Set the capability override to override specific resource types on the
* resource profile.
*
* For example, if you have a resource profile "small" that maps to
* {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to
* {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on
* the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}.
*
* Note that the conversion from the ProfileCapability to the Resource class
* with the actual resource requirements will be done by the ResourceManager,
* which has the actual profile to Resource mapping.
*
* @param r Resource object containing the capability override
*/
public abstract void setProfileCapabilityOverride(Resource r);
@Override
@ -85,4 +137,34 @@ public String toString() {
return "{ profile: " + this.getProfileName() + ", capabilityOverride: "
+ this.getProfileCapabilityOverride() + " }";
}
/**
* Get a representation of the capability as a Resource object.
* @param capability the capability we wish to convert
* @param resourceProfilesMap map of profile name to Resource object
* @return Resource object representing the capability
*/
public static Resource toResource(ProfileCapability capability,
Map<String, Resource> resourceProfilesMap) {
Preconditions
.checkArgument(capability != null, "Capability cannot be null");
Preconditions.checkArgument(resourceProfilesMap != null,
"Resource profiles map cannot be null");
Resource resource = Resource.newInstance(0, 0);
if (resourceProfilesMap.containsKey(capability.getProfileName())) {
resource = Resource
.newInstance(resourceProfilesMap.get(capability.getProfileName()));
}
if(capability.getProfileCapabilityOverride()!= null) {
for (Map.Entry<String, ResourceInformation> entry : capability
.getProfileCapabilityOverride().getResources().entrySet()) {
if (entry.getValue() != null && entry.getValue().getValue() != 0) {
resource.setResourceInformation(entry.getKey(), entry.getValue());
}
}
}
return resource;
}
}

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.yarn.api.records;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@ -101,6 +103,18 @@ public static Resource newInstance(long memory, int vCores) {
return new SimpleResource(memory, vCores);
}
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static Resource newInstance(Resource resource) {
Resource ret = Resource.newInstance(0, 0);
for (Map.Entry<String, ResourceInformation> entry : resource.getResources()
.entrySet()) {
ret.setResourceInformation(entry.getKey(),
ResourceInformation.newInstance(entry.getValue()));
}
return ret;
}
/**
* This method is DEPRECATED:
* Use {@link Resource#getMemorySize()} instead

View File

@ -31,6 +31,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
private String units;
private ResourceTypes resourceType;
private Long value;
private Long minimumAllocation;
private Long maximumAllocation;
private static final String MEMORY_URI = "memory-mb";
private static final String VCORES_URI = "vcores";
@ -117,6 +119,42 @@ public void setValue(Long rValue) {
this.value = rValue;
}
/**
* Get the minimum allocation for the resource.
*
* @return the minimum allocation for the resource
*/
public Long getMinimumAllocation() {
return minimumAllocation;
}
/**
* Set the minimum allocation for the resource.
*
* @param minimumAllocation the minimum allocation for the resource
*/
public void setMinimumAllocation(Long minimumAllocation) {
this.minimumAllocation = minimumAllocation;
}
/**
* Get the maximum allocation for the resource.
*
* @return the maximum allocation for the resource
*/
public Long getMaximumAllocation() {
return maximumAllocation;
}
/**
* Set the maximum allocation for the resource.
*
* @param maximumAllocation the maximum allocation for the resource
*/
public void setMaximumAllocation(Long maximumAllocation) {
this.maximumAllocation = maximumAllocation;
}
/**
* Create a new instance of ResourceInformation from another object.
*
@ -129,33 +167,41 @@ public static ResourceInformation newInstance(ResourceInformation other) {
ret.setResourceType(other.getResourceType());
ret.setUnits(other.getUnits());
ret.setValue(other.getValue());
ret.setMinimumAllocation(other.getMinimumAllocation());
ret.setMaximumAllocation(other.getMaximumAllocation());
return ret;
}
public static ResourceInformation newInstance(String name, String units,
Long value, ResourceTypes type) {
Long value, ResourceTypes type, Long minimumAllocation,
Long maximumAllocation) {
ResourceInformation ret = new ResourceInformation();
ret.setName(name);
ret.setResourceType(type);
ret.setUnits(units);
ret.setValue(value);
ret.setMinimumAllocation(minimumAllocation);
ret.setMaximumAllocation(maximumAllocation);
return ret;
}
public static ResourceInformation newInstance(String name, String units,
Long value) {
return ResourceInformation
.newInstance(name, units, value, ResourceTypes.COUNTABLE);
.newInstance(name, units, value, ResourceTypes.COUNTABLE, 0L,
Long.MAX_VALUE);
}
public static ResourceInformation newInstance(String name, String units) {
return ResourceInformation
.newInstance(name, units, 0L, ResourceTypes.COUNTABLE);
.newInstance(name, units, 0L, ResourceTypes.COUNTABLE, 0L,
Long.MAX_VALUE);
}
public static ResourceInformation newInstance(String name, Long value) {
return ResourceInformation
.newInstance(name, "", value, ResourceTypes.COUNTABLE);
.newInstance(name, "", value, ResourceTypes.COUNTABLE, 0L,
Long.MAX_VALUE);
}
public static ResourceInformation newInstance(String name) {
@ -165,7 +211,8 @@ public static ResourceInformation newInstance(String name) {
@Override
public String toString() {
return "name: " + this.name + ", units: " + this.units + ", type: "
+ resourceType + ", value: " + value;
+ resourceType + ", value: " + value + ", minimum allocation: "
+ minimumAllocation + ", maximum allocation: " + maximumAllocation;
}
public String getShorthandRepresentation() {

View File

@ -98,7 +98,22 @@ public static ResourceRequest newInstance(Priority priority, String hostName,
.resourceName(hostName).capability(capability)
.numContainers(numContainers).relaxLocality(relaxLocality)
.nodeLabelExpression(labelExpression)
.executionTypeRequest(executionTypeRequest).build();
.executionTypeRequest(executionTypeRequest).profileCapability(null)
.build();
}
@Public
@Evolving
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality,
String labelExpression, ExecutionTypeRequest executionTypeRequest,
ProfileCapability profile) {
return ResourceRequest.newBuilder().priority(priority)
.resourceName(hostName).capability(capability)
.numContainers(numContainers).relaxLocality(relaxLocality)
.nodeLabelExpression(labelExpression)
.executionTypeRequest(executionTypeRequest).profileCapability(profile)
.build();
}
@Public
@ -124,6 +139,7 @@ private ResourceRequestBuilder() {
resourceRequest.setRelaxLocality(true);
resourceRequest.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance());
resourceRequest.setProfileCapability(null);
}
/**
@ -237,6 +253,21 @@ public ResourceRequestBuilder allocationRequestId(
return this;
}
/**
* Set the <code>resourceProfile</code> of the request.
* @see ResourceRequest#setProfileCapability(ProfileCapability)
* @param profileCapability
* <code>profileCapability</code> of the request
* @return {@link ResourceRequestBuilder}
*/
@Public
@Evolving
public ResourceRequestBuilder profileCapability(
ProfileCapability profileCapability) {
resourceRequest.setProfileCapability(profileCapability);
return this;
}
/**
* Return generated {@link ResourceRequest} object.
* @return {@link ResourceRequest}
@ -454,6 +485,14 @@ public ExecutionTypeRequest getExecutionTypeRequest() {
@Evolving
public abstract void setNodeLabelExpression(String nodelabelExpression);
@Public
@Evolving
public abstract ProfileCapability getProfileCapability();
@Public
@Evolving
public abstract void setProfileCapability(ProfileCapability p);
/**
* Get the optional <em>ID</em> corresponding to this allocation request. This
* ID is an identifier for different {@code ResourceRequest}s from the <b>same
@ -529,12 +568,14 @@ public int hashCode() {
Resource capability = getCapability();
String hostName = getResourceName();
Priority priority = getPriority();
ProfileCapability profile = getProfileCapability();
result =
prime * result + ((capability == null) ? 0 : capability.hashCode());
result = prime * result + ((hostName == null) ? 0 : hostName.hashCode());
result = prime * result + getNumContainers();
result = prime * result + ((priority == null) ? 0 : priority.hashCode());
result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode();
result = prime * result + ((profile == null) ? 0 : profile.hashCode());
return result;
}

View File

@ -144,6 +144,7 @@
<exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude>
<exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude>
<exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude>
<exclude>src/test/resources/resource-profiles.json</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
@ -117,6 +118,7 @@ public static class ContainerRequest {
private String nodeLabelsExpression;
private ExecutionTypeRequest executionTypeRequest =
ExecutionTypeRequest.newInstance();
private String resourceProfile;
/**
* Instantiates a {@link ContainerRequest} with the given constraints and
@ -163,6 +165,26 @@ public ContainerRequest(Resource capability, String[] nodes,
this(capability, nodes, racks, priority, allocationRequestId, true, null,
ExecutionTypeRequest.newInstance());
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints and
* locality relaxation enabled.
*
* @param capability
* The {@link ProfileCapability} to be requested for each container.
* @param nodes
* Any hosts to request that the containers are placed on.
* @param racks
* Any racks to request that the containers are placed on. The
* racks corresponding to any hosts requested will be automatically
* added to this list.
* @param priority
* The priority at which to request the containers. Higher
* priorities have lower numerical values.
*/
public ContainerRequest(ProfileCapability capability, String[] nodes,
String[] racks, Priority priority) {
this(capability, nodes, racks, priority, 0, true, null);
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
@ -187,6 +209,29 @@ public ContainerRequest(Resource capability, String[] nodes,
this(capability, nodes, racks, priority, relaxLocality, null);
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
*
* @param capability
* The {@link ProfileCapability} to be requested for each container.
* @param nodes
* Any hosts to request that the containers are placed on.
* @param racks
* Any racks to request that the containers are placed on. The
* racks corresponding to any hosts requested will be automatically
* added to this list.
* @param priority
* The priority at which to request the containers. Higher
* priorities have lower numerical values.
* @param relaxLocality
* If true, containers for this request may be assigned on hosts
* and racks other than the ones explicitly requested.
*/
public ContainerRequest(ProfileCapability capability, String[] nodes,
String[] racks, Priority priority, boolean relaxLocality) {
this(capability, nodes, racks, priority, 0, relaxLocality, null);
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
*
@ -278,6 +323,14 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks,
ExecutionTypeRequest.newInstance());
}
public ContainerRequest(ProfileCapability capability, String[] nodes,
String[] racks, Priority priority, long allocationRequestId,
boolean relaxLocality, String nodeLabelsExpression) {
this(capability, nodes, racks, priority, allocationRequestId,
relaxLocality, nodeLabelsExpression,
ExecutionTypeRequest.newInstance());
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
*
@ -309,6 +362,53 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, long allocationRequestId, boolean relaxLocality,
String nodeLabelsExpression,
ExecutionTypeRequest executionTypeRequest) {
this(capability, nodes, racks, priority, allocationRequestId,
relaxLocality, nodeLabelsExpression, executionTypeRequest,
ProfileCapability.DEFAULT_PROFILE);
}
public ContainerRequest(ProfileCapability capability, String[] nodes,
String[] racks, Priority priority, long allocationRequestId,
boolean relaxLocality, String nodeLabelsExpression,
ExecutionTypeRequest executionTypeRequest) {
this(capability.getProfileCapabilityOverride(), nodes, racks, priority,
allocationRequestId, relaxLocality, nodeLabelsExpression,
executionTypeRequest, capability.getProfileName());
}
/**
* Instantiates a {@link ContainerRequest} with the given constraints.
*
* @param capability
* The {@link Resource} to be requested for each container.
* @param nodes
* Any hosts to request that the containers are placed on.
* @param racks
* Any racks to request that the containers are placed on. The
* racks corresponding to any hosts requested will be automatically
* added to this list.
* @param priority
* The priority at which to request the containers. Higher
* priorities have lower numerical values.
* @param allocationRequestId
* The allocationRequestId of the request. To be used as a tracking
* id to match Containers allocated against this request. Will
* default to 0 if not specified.
* @param relaxLocality
* If true, containers for this request may be assigned on hosts
* and racks other than the ones explicitly requested.
* @param nodeLabelsExpression
* Set node labels to allocate resource, now we only support
* asking for only a single node label
* @param executionTypeRequest
* Set the execution type of the container request.
* @param profile
* Set the resource profile for the container request
*/
public ContainerRequest(Resource capability, String[] nodes, String[] racks,
Priority priority, long allocationRequestId, boolean relaxLocality,
String nodeLabelsExpression,
ExecutionTypeRequest executionTypeRequest, String profile) {
this.allocationRequestId = allocationRequestId;
this.capability = capability;
this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
@ -317,6 +417,7 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks,
this.relaxLocality = relaxLocality;
this.nodeLabelsExpression = nodeLabelsExpression;
this.executionTypeRequest = executionTypeRequest;
this.resourceProfile = profile;
sanityCheck();
}
@ -368,6 +469,10 @@ public ExecutionTypeRequest getExecutionTypeRequest() {
return executionTypeRequest;
}
public String getResourceProfile() {
return resourceProfile;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Capability[").append(capability).append("]");
@ -375,6 +480,7 @@ public String toString() {
sb.append("AllocationRequestId[").append(allocationRequestId).append("]");
sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
.append("]");
sb.append("Resource Profile[").append(resourceProfile).append("]");
return sb.toString();
}
@ -627,6 +733,15 @@ public List<? extends Collection<T>> getMatchingRequests(
" AMRMClient is expected to implement this !!");
}
@InterfaceStability.Evolving
public List<? extends Collection<T>> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
ProfileCapability capability) {
throw new UnsupportedOperationException("The sub-class extending" +
" AMRMClient is expected to implement this !!");
}
/**
* Get outstanding <code>ContainerRequest</code>s matching the given
* allocationRequestId. These ContainerRequests should have been added via

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -106,55 +107,55 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected final Set<String> blacklistAdditions = new HashSet<String>();
protected final Set<String> blacklistRemovals = new HashSet<String>();
protected Map<String, Resource> resourceProfilesMap;
static class ResourceRequestInfo<T> {
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
ResourceRequestInfo(Long allocationRequestId, Priority priority,
String resourceName, Resource capability, boolean relaxLocality) {
String resourceName, Resource capability, boolean relaxLocality,
String resourceProfile) {
ProfileCapability profileCapability = ProfileCapability
.newInstance(resourceProfile, capability);
remoteRequest = ResourceRequest.newBuilder().priority(priority)
.resourceName(resourceName).capability(capability).numContainers(0)
.allocationRequestId(allocationRequestId)
.relaxLocality(relaxLocality).build();
.allocationRequestId(allocationRequestId).relaxLocality(relaxLocality)
.profileCapability(profileCapability).build();
containerRequests = new LinkedHashSet<T>();
}
}
/**
* Class compares Resource by memory then cpu in reverse order
* Class compares Resource by memory, then cpu and then the remaining resource
* types in reverse order.
*/
static class ResourceReverseMemoryThenCpuComparator implements
Comparator<Resource>, Serializable {
static final long serialVersionUID = 12345L;
@Override
public int compare(Resource arg0, Resource arg1) {
long mem0 = arg0.getMemorySize();
long mem1 = arg1.getMemorySize();
long cpu0 = arg0.getVirtualCores();
long cpu1 = arg1.getVirtualCores();
if(mem0 == mem1) {
if(cpu0 == cpu1) {
return 0;
static class ProfileCapabilityComparator<T extends ProfileCapability>
implements Comparator<T> {
HashMap<String, Resource> resourceProfilesMap;
public ProfileCapabilityComparator(
HashMap<String, Resource> resourceProfileMap) {
this.resourceProfilesMap = resourceProfileMap;
}
if(cpu0 < cpu1) {
return 1;
}
return -1;
}
if(mem0 < mem1) {
return 1;
}
return -1;
public int compare(T arg0, T arg1) {
Resource resource0 =
ProfileCapability.toResource(arg0, resourceProfilesMap);
Resource resource1 =
ProfileCapability.toResource(arg1, resourceProfilesMap);
return resource1.compareTo(resource0);
}
}
static boolean canFit(Resource arg0, Resource arg1) {
long mem0 = arg0.getMemorySize();
long mem1 = arg1.getMemorySize();
long cpu0 = arg0.getVirtualCores();
long cpu1 = arg1.getVirtualCores();
boolean canFit(ProfileCapability arg0, ProfileCapability arg1) {
Resource resource0 =
ProfileCapability.toResource(arg0, resourceProfilesMap);
Resource resource1 =
ProfileCapability.toResource(arg1, resourceProfilesMap);
return Resources.fitsIn(resource0, resource1);
return (mem0 <= mem1 && cpu0 <= cpu1);
}
private final Map<Long, RemoteRequestsTable<T>> remoteRequests =
@ -233,6 +234,7 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
return registerApplicationMaster();
}
@SuppressWarnings("unchecked")
private RegisterApplicationMasterResponse registerApplicationMaster()
throws YarnException, IOException {
RegisterApplicationMasterRequest request =
@ -245,6 +247,7 @@ private RegisterApplicationMasterResponse registerApplicationMaster()
if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
populateNMTokens(response.getNMTokensFromPreviousAttempts());
}
this.resourceProfilesMap = response.getResourceProfiles();
}
return response;
}
@ -416,13 +419,15 @@ private List<ResourceRequest> cloneAsks() {
for(ResourceRequest r : ask) {
// create a copy of ResourceRequest as we might change it while the
// RPC layer is using it to send info across
ResourceRequest rr = ResourceRequest.newBuilder()
.priority(r.getPriority()).resourceName(r.getResourceName())
.capability(r.getCapability()).numContainers(r.getNumContainers())
ResourceRequest rr =
ResourceRequest.newBuilder().priority(r.getPriority())
.resourceName(r.getResourceName()).capability(r.getCapability())
.numContainers(r.getNumContainers())
.relaxLocality(r.getRelaxLocality())
.nodeLabelExpression(r.getNodeLabelExpression())
.executionTypeRequest(r.getExecutionTypeRequest())
.allocationRequestId(r.getAllocationRequestId()).build();
.allocationRequestId(r.getAllocationRequestId())
.profileCapability(r.getProfileCapability()).build();
askList.add(rr);
}
return askList;
@ -504,6 +509,8 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
public synchronized void addContainerRequest(T req) {
Preconditions.checkArgument(req != null,
"Resource request can not be null.");
ProfileCapability profileCapability = ProfileCapability
.newInstance(req.getResourceProfile(), req.getCapability());
Set<String> dedupedRacks = new HashSet<String>();
if (req.getRacks() != null) {
dedupedRacks.addAll(req.getRacks());
@ -516,6 +523,8 @@ public synchronized void addContainerRequest(T req) {
Set<String> inferredRacks = resolveRacks(req.getNodes());
inferredRacks.removeAll(dedupedRacks);
checkResourceProfile(req.getResourceProfile());
// check that specific and non-specific requests cannot be mixed within a
// priority
checkLocalityRelaxationConflict(req.getAllocationRequestId(),
@ -540,26 +549,26 @@ public synchronized void addContainerRequest(T req) {
}
for (String node : dedupedNodes) {
addResourceRequest(req.getPriority(), node,
req.getExecutionTypeRequest(), req.getCapability(), req, true,
req.getExecutionTypeRequest(), profileCapability, req, true,
req.getNodeLabelExpression());
}
}
for (String rack : dedupedRacks) {
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
req.getCapability(), req, true, req.getNodeLabelExpression());
profileCapability, req, true, req.getNodeLabelExpression());
}
// Ensure node requests are accompanied by requests for
// corresponding rack
for (String rack : inferredRacks) {
addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
req.getCapability(), req, req.getRelaxLocality(),
profileCapability, req, req.getRelaxLocality(),
req.getNodeLabelExpression());
}
// Off-switch
addResourceRequest(req.getPriority(), ResourceRequest.ANY,
req.getExecutionTypeRequest(), req.getCapability(), req,
req.getExecutionTypeRequest(), profileCapability, req,
req.getRelaxLocality(), req.getNodeLabelExpression());
}
@ -567,6 +576,8 @@ public synchronized void addContainerRequest(T req) {
public synchronized void removeContainerRequest(T req) {
Preconditions.checkArgument(req != null,
"Resource request can not be null.");
ProfileCapability profileCapability = ProfileCapability
.newInstance(req.getResourceProfile(), req.getCapability());
Set<String> allRacks = new HashSet<String>();
if (req.getRacks() != null) {
allRacks.addAll(req.getRacks());
@ -577,17 +588,17 @@ public synchronized void removeContainerRequest(T req) {
if (req.getNodes() != null) {
for (String node : new HashSet<String>(req.getNodes())) {
decResourceRequest(req.getPriority(), node,
req.getExecutionTypeRequest(), req.getCapability(), req);
req.getExecutionTypeRequest(), profileCapability, req);
}
}
for (String rack : allRacks) {
decResourceRequest(req.getPriority(), rack,
req.getExecutionTypeRequest(), req.getCapability(), req);
req.getExecutionTypeRequest(), profileCapability, req);
}
decResourceRequest(req.getPriority(), ResourceRequest.ANY,
req.getExecutionTypeRequest(), req.getCapability(), req);
req.getExecutionTypeRequest(), profileCapability, req);
}
@Override
@ -686,6 +697,17 @@ public synchronized List<? extends Collection<T>> getMatchingRequests(
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
Resource capability) {
ProfileCapability profileCapability =
ProfileCapability.newInstance(capability);
return getMatchingRequests(priority, resourceName, executionType,
profileCapability);
}
@Override
@SuppressWarnings("unchecked")
public synchronized List<? extends Collection<T>> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
ProfileCapability capability) {
Preconditions.checkArgument(capability != null,
"The Resource to be requested should not be null ");
Preconditions.checkArgument(priority != null,
@ -695,15 +717,15 @@ public synchronized List<? extends Collection<T>> getMatchingRequests(
RemoteRequestsTable remoteRequestsTable = getTable(0);
if (null != remoteRequestsTable) {
List<ResourceRequestInfo<T>> matchingRequests =
remoteRequestsTable.getMatchingRequests(priority, resourceName,
executionType, capability);
List<ResourceRequestInfo<T>> matchingRequests = remoteRequestsTable
.getMatchingRequests(priority, resourceName, executionType,
capability);
if (null != matchingRequests) {
// If no exact match. Container may be larger than what was requested.
// get all resources <= capability. map is reverse sorted.
for (ResourceRequestInfo<T> resReqInfo : matchingRequests) {
if (canFit(resReqInfo.remoteRequest.getCapability(), capability) &&
!resReqInfo.containerRequests.isEmpty()) {
if (canFit(resReqInfo.remoteRequest.getProfileCapability(),
capability) && !resReqInfo.containerRequests.isEmpty()) {
list.add(resReqInfo.containerRequests);
}
}
@ -759,6 +781,15 @@ private void checkLocalityRelaxationConflict(Long allocationReqId,
}
}
private void checkResourceProfile(String profile) {
if (resourceProfilesMap != null && !resourceProfilesMap.isEmpty()
&& !resourceProfilesMap.containsKey(profile)) {
throw new InvalidContainerRequestException(
"Invalid profile name, valid profile names are " + resourceProfilesMap
.keySet());
}
}
/**
* Valid if a node label expression specified on container request is valid or
* not
@ -845,12 +876,16 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
}
private void addResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req,
ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req,
boolean relaxLocality, String labelExpression) {
RemoteRequestsTable<T> remoteRequestsTable =
getTable(req.getAllocationRequestId());
if (remoteRequestsTable == null) {
remoteRequestsTable = new RemoteRequestsTable<T>();
if (this.resourceProfilesMap instanceof HashMap) {
remoteRequestsTable.setResourceComparator(
new ProfileCapabilityComparator((HashMap) resourceProfilesMap));
}
putTable(req.getAllocationRequestId(), remoteRequestsTable);
}
@SuppressWarnings("unchecked")
@ -863,6 +898,7 @@ private void addResourceRequest(Priority priority, String resourceName,
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest);
LOG.debug("addResourceRequest:" + " applicationId="
+ " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
@ -872,7 +908,7 @@ private void addResourceRequest(Priority priority, String resourceName,
}
private void decResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req) {
ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) {
RemoteRequestsTable<T> remoteRequestsTable =
getTable(req.getAllocationRequestId());
if (remoteRequestsTable != null) {
@ -882,7 +918,7 @@ private void decResourceRequest(Priority priority, String resourceName,
execTypeReq, capability, req);
// send the ResourceRequest to RM even if is 0 because it needs to
// override a previously sent value. If ResourceRequest was not sent
// previously then sending 0 ought to be a no-op on RM
// previously then sending 0 aught to be a no-op on RM
if (resourceRequestInfo != null) {
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);

View File

@ -23,7 +23,7 @@
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import java.util.Collection;
import java.util.HashMap;
@ -35,43 +35,42 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceReverseMemoryThenCpuComparator;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ProfileCapabilityComparator;
class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
private static final Log LOG = LogFactory.getLog(RemoteRequestsTable.class);
static ResourceReverseMemoryThenCpuComparator resourceComparator =
new ResourceReverseMemoryThenCpuComparator();
private ProfileCapabilityComparator resourceComparator;
/**
* Nested Iterator that iterates over just the ResourceRequestInfo
* object.
*/
class RequestInfoIterator implements Iterator<ResourceRequestInfo> {
private Iterator<Map<String, Map<ExecutionType, TreeMap<Resource,
private Iterator<Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>>> iLocMap;
private Iterator<Map<ExecutionType, TreeMap<Resource,
private Iterator<Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>> iExecTypeMap;
private Iterator<TreeMap<Resource, ResourceRequestInfo>> iCapMap;
private Iterator<TreeMap<ProfileCapability, ResourceRequestInfo>> iCapMap;
private Iterator<ResourceRequestInfo> iResReqInfo;
public RequestInfoIterator(Iterator<Map<String,
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>>>
Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>>>
iLocationMap) {
this.iLocMap = iLocationMap;
if (iLocMap.hasNext()) {
iExecTypeMap = iLocMap.next().values().iterator();
} else {
iExecTypeMap =
new LinkedList<Map<ExecutionType, TreeMap<Resource,
new LinkedList<Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>>().iterator();
}
if (iExecTypeMap.hasNext()) {
iCapMap = iExecTypeMap.next().values().iterator();
} else {
iCapMap =
new LinkedList<TreeMap<Resource, ResourceRequestInfo>>()
new LinkedList<TreeMap<ProfileCapability, ResourceRequestInfo>>()
.iterator();
}
if (iCapMap.hasNext()) {
@ -113,7 +112,7 @@ public void remove() {
// Nest map with Primary key :
// Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource)
// and value : ResourceRequestInfo
private Map<Priority, Map<String, Map<ExecutionType, TreeMap<Resource,
private Map<Priority, Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>>> remoteRequestsTable = new HashMap<>();
@Override
@ -122,8 +121,8 @@ public Iterator<ResourceRequestInfo> iterator() {
}
ResourceRequestInfo get(Priority priority, String location,
ExecutionType execType, Resource capability) {
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
ExecutionType execType, ProfileCapability capability) {
TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
getCapabilityMap(priority, location, execType);
if (capabilityMap == null) {
return null;
@ -131,9 +130,10 @@ ResourceRequestInfo get(Priority priority, String location,
return capabilityMap.get(capability);
}
@SuppressWarnings("unchecked")
void put(Priority priority, String resourceName, ExecutionType execType,
Resource capability, ResourceRequestInfo resReqInfo) {
Map<String, Map<ExecutionType, TreeMap<Resource,
ProfileCapability capability, ResourceRequestInfo resReqInfo) {
Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>> locationMap =
remoteRequestsTable.get(priority);
if (locationMap == null) {
@ -143,8 +143,8 @@ void put(Priority priority, String resourceName, ExecutionType execType,
LOG.debug("Added priority=" + priority);
}
}
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> execTypeMap =
locationMap.get(resourceName);
Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
execTypeMap = locationMap.get(resourceName);
if (execTypeMap == null) {
execTypeMap = new HashMap<>();
locationMap.put(resourceName, execTypeMap);
@ -152,9 +152,14 @@ void put(Priority priority, String resourceName, ExecutionType execType,
LOG.debug("Added resourceName=" + resourceName);
}
}
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
execTypeMap.get(execType);
if (capabilityMap == null) {
// this can happen if the user doesn't register with the RM before
// calling addResourceRequest
if (resourceComparator == null) {
resourceComparator = new ProfileCapabilityComparator(new HashMap<>());
}
capabilityMap = new TreeMap<>(resourceComparator);
execTypeMap.put(execType, capabilityMap);
if (LOG.isDebugEnabled()) {
@ -165,9 +170,9 @@ void put(Priority priority, String resourceName, ExecutionType execType,
}
ResourceRequestInfo remove(Priority priority, String resourceName,
ExecutionType execType, Resource capability) {
ExecutionType execType, ProfileCapability capability) {
ResourceRequestInfo retVal = null;
Map<String, Map<ExecutionType, TreeMap<Resource,
Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>> locationMap = remoteRequestsTable.get(priority);
if (locationMap == null) {
if (LOG.isDebugEnabled()) {
@ -175,7 +180,7 @@ ResourceRequestInfo remove(Priority priority, String resourceName,
}
return null;
}
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
execTypeMap = locationMap.get(resourceName);
if (execTypeMap == null) {
if (LOG.isDebugEnabled()) {
@ -183,7 +188,7 @@ ResourceRequestInfo remove(Priority priority, String resourceName,
}
return null;
}
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
execTypeMap.get(execType);
if (capabilityMap == null) {
if (LOG.isDebugEnabled()) {
@ -204,14 +209,14 @@ ResourceRequestInfo remove(Priority priority, String resourceName,
return retVal;
}
Map<String, Map<ExecutionType, TreeMap<Resource,
Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>> getLocationMap(Priority priority) {
return remoteRequestsTable.get(priority);
}
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
getExecutionTypeMap(Priority priority, String location) {
Map<String, Map<ExecutionType, TreeMap<Resource,
Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>> locationMap = getLocationMap(priority);
if (locationMap == null) {
return null;
@ -219,10 +224,10 @@ ResourceRequestInfo>>> getLocationMap(Priority priority) {
return locationMap.get(location);
}
TreeMap<Resource, ResourceRequestInfo> getCapabilityMap(Priority
TreeMap<ProfileCapability, ResourceRequestInfo> getCapabilityMap(Priority
priority, String location,
ExecutionType execType) {
Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
executionTypeMap = getExecutionTypeMap(priority, location);
if (executionTypeMap == null) {
return null;
@ -236,7 +241,7 @@ List<ResourceRequestInfo> getAllResourceRequestInfos(Priority priority,
List retList = new LinkedList<>();
for (String location : locations) {
for (ExecutionType eType : ExecutionType.values()) {
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
getCapabilityMap(priority, location, eType);
if (capabilityMap != null) {
retList.addAll(capabilityMap.values());
@ -248,9 +253,9 @@ List<ResourceRequestInfo> getAllResourceRequestInfos(Priority priority,
List<ResourceRequestInfo> getMatchingRequests(
Priority priority, String resourceName, ExecutionType executionType,
Resource capability) {
ProfileCapability capability) {
List<ResourceRequestInfo> list = new LinkedList<>();
TreeMap<Resource, ResourceRequestInfo> capabilityMap =
TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
getCapabilityMap(priority, resourceName, executionType);
if (capabilityMap != null) {
ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability);
@ -266,14 +271,15 @@ List<ResourceRequestInfo> getMatchingRequests(
@SuppressWarnings("unchecked")
ResourceRequestInfo addResourceRequest(Long allocationRequestId,
Priority priority, String resourceName, ExecutionTypeRequest execTypeReq,
Resource capability, T req, boolean relaxLocality,
ProfileCapability capability, T req, boolean relaxLocality,
String labelExpression) {
ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
execTypeReq.getExecutionType(), capability);
ResourceRequestInfo resourceRequestInfo =
get(priority, resourceName, execTypeReq.getExecutionType(), capability);
if (resourceRequestInfo == null) {
resourceRequestInfo =
new ResourceRequestInfo(allocationRequestId, priority, resourceName,
capability, relaxLocality);
capability.getProfileCapabilityOverride(), relaxLocality,
capability.getProfileName());
put(priority, resourceName, execTypeReq.getExecutionType(), capability,
resourceRequestInfo);
}
@ -288,11 +294,14 @@ ResourceRequestInfo addResourceRequest(Long allocationRequestId,
if (ResourceRequest.ANY.equals(resourceName)) {
resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest);
}
return resourceRequestInfo;
}
ResourceRequestInfo decResourceRequest(Priority priority, String resourceName,
ExecutionTypeRequest execTypeReq, Resource capability, T req) {
ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) {
ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
execTypeReq.getExecutionType(), capability);
@ -330,4 +339,34 @@ boolean isEmpty() {
return remoteRequestsTable.isEmpty();
}
@SuppressWarnings("unchecked")
public void setResourceComparator(ProfileCapabilityComparator comparator) {
ProfileCapabilityComparator old = this.resourceComparator;
this.resourceComparator = comparator;
if (old != null) {
// we've already set a resource comparator - re-create the maps with the
// new one. this is needed in case someone adds container requests before
// registering with the RM. In such a case, the comparator won't have
// the resource profiles map. After registration, the map is available
// so re-create the capabilities maps
for (Map.Entry<Priority, Map<String, Map<ExecutionType,
TreeMap<ProfileCapability, ResourceRequestInfo>>>>
priEntry : remoteRequestsTable.entrySet()) {
for (Map.Entry<String, Map<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>>> nameEntry : priEntry.getValue().entrySet()) {
for (Map.Entry<ExecutionType, TreeMap<ProfileCapability,
ResourceRequestInfo>> execEntry : nameEntry
.getValue().entrySet()) {
Map<ProfileCapability, ResourceRequestInfo> capabilityMap =
execEntry.getValue();
TreeMap<ProfileCapability, ResourceRequestInfo> newCapabiltyMap =
new TreeMap<>(resourceComparator);
newCapabiltyMap.putAll(capabilityMap);
execEntry.setValue(newCapabiltyMap);
}
}
}
}
}
}

View File

@ -130,11 +130,13 @@ public static Collection<Object[]> data() {
@Before
public void setup() throws Exception {
conf = new YarnConfiguration();
createClusterAndStartApplication();
createClusterAndStartApplication(conf);
}
private void createClusterAndStartApplication() throws Exception {
private void createClusterAndStartApplication(Configuration conf)
throws Exception {
// start minicluster
this.conf = conf;
conf.set(YarnConfiguration.RM_SCHEDULER, schedulerName);
conf.setLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
@ -536,7 +538,8 @@ private void verifyMatches(
}
@Test (timeout=60000)
public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
public void testAMRMClientMatchingFitInferredRack()
throws YarnException, IOException {
AMRMClientImpl<ContainerRequest> amClient = null;
try {
// start am rm client
@ -570,8 +573,9 @@ public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOExce
matches = amClient.getMatchingRequests(priority, rack, capability);
assertTrue(matches.isEmpty());
amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
null, null);
amClient
.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
null);
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
@ -605,15 +609,18 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException {
amClient.addContainerRequest(storedContainer2);
amClient.addContainerRequest(storedContainer3);
ProfileCapability profileCapability =
ProfileCapability.newInstance(capability);
// test addition and storage
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
amClient.getTable(0);
int containersRequestedAny = remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
assertEquals(2, containersRequestedAny);
containersRequestedAny = remoteRequestsTable.get(priority1,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
assertEquals(1, containersRequestedAny);
List<? extends Collection<ContainerRequest>> matches =
@ -884,7 +891,7 @@ public void testAMRMClientWithSaslEncryption() throws Exception {
teardown();
conf = new YarnConfiguration();
conf.set(CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION, "privacy");
createClusterAndStartApplication();
createClusterAndStartApplication(conf);
initAMRMClientAndTest(false);
}
@ -1702,14 +1709,16 @@ private void assertNumContainers(AMRMClientImpl<ContainerRequest> amClient,
int expAsks, int expRelease) {
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
amClient.getTable(allocationReqId);
ProfileCapability profileCapability =
ProfileCapability.newInstance(capability);
int containersRequestedNode = remoteRequestsTable.get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedRack = remoteRequestsTable.get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedAny = remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
assertEquals(expNode, containersRequestedNode);
@ -1907,4 +1916,106 @@ public ApplicationMasterProtocol run() {
}
return result;
}
@Test(timeout = 60000)
public void testGetMatchingFitWithProfiles() throws Exception {
teardown();
conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true);
createClusterAndStartApplication(conf);
AMRMClient<ContainerRequest> amClient = null;
try {
// start am rm client
amClient = AMRMClient.<ContainerRequest>createAMRMClient();
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
ProfileCapability capability1 = ProfileCapability.newInstance("minimum");
ProfileCapability capability2 = ProfileCapability.newInstance("default");
ProfileCapability capability3 = ProfileCapability.newInstance("maximum");
ProfileCapability capability4 = ProfileCapability
.newInstance("minimum", Resource.newInstance(2048, 1));
ProfileCapability capability5 = ProfileCapability.newInstance("default");
ProfileCapability capability6 = ProfileCapability
.newInstance("default", Resource.newInstance(2048, 1));
// http has the same capabilities as default
ProfileCapability capability7 = ProfileCapability.newInstance("http");
ContainerRequest storedContainer1 =
new ContainerRequest(capability1, nodes, racks, priority);
ContainerRequest storedContainer2 =
new ContainerRequest(capability2, nodes, racks, priority);
ContainerRequest storedContainer3 =
new ContainerRequest(capability3, nodes, racks, priority);
ContainerRequest storedContainer4 =
new ContainerRequest(capability4, nodes, racks, priority);
ContainerRequest storedContainer5 =
new ContainerRequest(capability5, nodes, racks, priority2);
ContainerRequest storedContainer6 =
new ContainerRequest(capability6, nodes, racks, priority);
ContainerRequest storedContainer7 =
new ContainerRequest(capability7, nodes, racks, priority);
amClient.addContainerRequest(storedContainer1);
amClient.addContainerRequest(storedContainer2);
amClient.addContainerRequest(storedContainer3);
amClient.addContainerRequest(storedContainer4);
amClient.addContainerRequest(storedContainer5);
amClient.addContainerRequest(storedContainer6);
amClient.addContainerRequest(storedContainer7);
// test matching of containers
List<? extends Collection<ContainerRequest>> matches;
ContainerRequest storedRequest;
// exact match
ProfileCapability testCapability1 =
ProfileCapability.newInstance("minimum");
matches = amClient
.getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
testCapability1);
verifyMatches(matches, 1);
storedRequest = matches.get(0).iterator().next();
assertEquals(storedContainer1, storedRequest);
amClient.removeContainerRequest(storedContainer1);
// exact matching with order maintained
// we should get back 3 matches - default + http because they have the
// same capability
ProfileCapability testCapability2 =
ProfileCapability.newInstance("default");
matches = amClient
.getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
testCapability2);
verifyMatches(matches, 2);
// must be returned in the order they were made
int i = 0;
for (ContainerRequest storedRequest1 : matches.get(0)) {
switch(i) {
case 0:
assertEquals(storedContainer2, storedRequest1);
break;
case 1:
assertEquals(storedContainer7, storedRequest1);
break;
}
i++;
}
amClient.removeContainerRequest(storedContainer5);
// matching with larger container. all requests returned
Resource testCapability3 = Resource.newInstance(8192, 8);
matches = amClient
.getMatchingRequests(priority, node, testCapability3);
assertEquals(3, matches.size());
Resource testCapability4 = Resource.newInstance(2048, 1);
matches = amClient.getMatchingRequests(priority, node, testCapability4);
assertEquals(1, matches.size());
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
amClient.stop();
}
}
}
}

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@ -276,9 +277,10 @@ private void verifyResourceRequest(
AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
String location, boolean expectedRelaxLocality,
ExecutionType executionType) {
ResourceRequest ask = client.getTable(0)
.get(request.getPriority(), location, executionType,
request.getCapability()).remoteRequest;
ProfileCapability profileCapability = ProfileCapability
.newInstance(request.getResourceProfile(), request.getCapability());
ResourceRequest ask = client.getTable(0).get(request.getPriority(),
location, executionType, profileCapability).remoteRequest;
assertEquals(location, ask.getResourceName());
assertEquals(1, ask.getNumContainers());
assertEquals(expectedRelaxLocality, ask.getRelaxLocality());

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
@ -387,18 +388,21 @@ public void testAMRMClient() throws Exception {
RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
amClient.getTable(0);
ProfileCapability profileCapability =
ProfileCapability.newInstance(capability);
int containersRequestedNode = remoteRequestsTable.get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedRack = remoteRequestsTable.get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedAny = remoteRequestsTable.get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
int oppContainersRequestedAny =
remoteRequestsTable.get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
.getNumContainers();
assertEquals(2, containersRequestedNode);

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -255,9 +256,11 @@ private Set<Container> allocateContainers(
racks, priority));
}
ProfileCapability profileCapability =
ProfileCapability.newInstance(capability);
int containersRequestedAny = rmClient.getTable(0)
.get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED,
capability).remoteRequest.getNumContainers();
profileCapability).remoteRequest.getNumContainers();
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
@ -99,6 +100,7 @@ public class TestOpportunisticContainerAllocation {
private static final long AM_EXPIRE_MS = 4000;
private static Resource capability;
private static ProfileCapability profileCapability;
private static Priority priority;
private static Priority priority2;
private static Priority priority3;
@ -151,6 +153,7 @@ public static void setup() throws Exception {
priority3 = Priority.newInstance(3);
priority4 = Priority.newInstance(4);
capability = Resource.newInstance(512, 1);
profileCapability = ProfileCapability.newInstance(capability);
node = nodeReports.get(0).getNodeId().getHost();
rack = nodeReports.get(0).getRackName();
@ -273,7 +276,7 @@ public void testPromotionFromAcquired() throws YarnException, IOException {
int oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
.getNumContainers();
assertEquals(1, oppContainersRequestedAny);
@ -394,7 +397,7 @@ public void testDemotionFromAcquired() throws YarnException, IOException {
new AMRMClient.ContainerRequest(capability, null, null, priority3));
int guarContainersRequestedAny = amClient.getTable(0).get(priority3,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
assertEquals(1, guarContainersRequestedAny);
@ -512,6 +515,7 @@ public void testMixedAllocationAndRelease() throws YarnException,
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
@ -532,17 +536,17 @@ public void testMixedAllocationAndRelease() throws YarnException,
ExecutionType.OPPORTUNISTIC, true)));
int containersRequestedNode = amClient.getTable(0).get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedRack = amClient.getTable(0).get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
int containersRequestedAny = amClient.getTable(0).get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
int oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
.getNumContainers();
assertEquals(4, containersRequestedNode);
@ -564,17 +568,17 @@ public void testMixedAllocationAndRelease() throws YarnException,
ExecutionType.OPPORTUNISTIC, true)));
containersRequestedNode = amClient.getTable(0).get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
containersRequestedRack = amClient.getTable(0).get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
.getNumContainers();
containersRequestedAny = amClient.getTable(0).get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
.remoteRequest.getNumContainers();
oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
.getNumContainers();
assertEquals(2, containersRequestedNode);
@ -691,10 +695,9 @@ public void testOpportunisticAllocation() throws YarnException, IOException {
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
int oppContainersRequestedAny =
amClient.getTable(0).get(priority3, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
int oppContainersRequestedAny = amClient.getTable(0)
.get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC,
profileCapability).remoteRequest.getNumContainers();
assertEquals(2, oppContainersRequestedAny);

View File

@ -0,0 +1,18 @@
{
"minimum": {
"memory-mb" : 1024,
"vcores" : 1
},
"default" : {
"memory-mb" : 2048,
"vcores" : 2
},
"maximum" : {
"memory-mb": 4096,
"vcores" : 4
},
"http" : {
"memory-mb" : 2048,
"vcores" : 2
}
}

View File

@ -33,6 +33,8 @@
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProfilesProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProfileEntry;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@ -59,6 +61,7 @@ public class RegisterApplicationMasterResponsePBImpl extends
private List<Container> containersFromPreviousAttempts = null;
private List<NMToken> nmTokens = null;
private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;
private Map<String, Resource> profiles = null;
public RegisterApplicationMasterResponsePBImpl() {
builder = RegisterApplicationMasterResponseProto.newBuilder();
@ -123,6 +126,9 @@ private void mergeLocalToBuilder() {
if(schedulerResourceTypes != null) {
addSchedulerResourceTypes();
}
if (profiles != null) {
addResourceProfiles();
}
}
@ -433,6 +439,58 @@ public void setSchedulerResourceTypes(EnumSet<SchedulerResourceTypes> types) {
this.schedulerResourceTypes.addAll(types);
}
private void addResourceProfiles() {
maybeInitBuilder();
builder.clearResourceProfiles();
if (profiles == null) {
return;
}
ResourceProfilesProto.Builder profilesBuilder =
ResourceProfilesProto.newBuilder();
for (Map.Entry<String, Resource> entry : profiles.entrySet()) {
ResourceProfileEntry.Builder entryBuilder =
ResourceProfileEntry.newBuilder();
entryBuilder.setName(entry.getKey());
entryBuilder.setResources(convertToProtoFormat(entry.getValue()));
profilesBuilder.addResourceProfilesMap(entryBuilder.build());
}
builder.setResourceProfiles(profilesBuilder.build());
}
private void initResourceProfiles() {
if (this.profiles != null) {
return;
}
this.profiles = new HashMap<>();
RegisterApplicationMasterResponseProtoOrBuilder p =
viaProto ? proto : builder;
if (p.hasResourceProfiles()) {
ResourceProfilesProto profilesProto = p.getResourceProfiles();
for (ResourceProfileEntry entry : profilesProto
.getResourceProfilesMapList()) {
this.profiles
.put(entry.getName(), convertFromProtoFormat(entry.getResources()));
}
}
}
@Override
public Map<String, Resource> getResourceProfiles() {
initResourceProfiles();
return this.profiles;
}
@Override
public void setResourceProfiles(Map<String, Resource> profilesMap) {
if (profilesMap == null) {
return;
}
initResourceProfiles();
this.profiles.clear();
this.profiles.putAll(profilesMap);
}
private Resource convertFromProtoFormat(ResourceProto resource) {
return new ResourcePBImpl(resource);
}

View File

@ -144,8 +144,8 @@ private void initResources() {
ResourceTypes.COUNTABLE;
String units = entry.hasUnits() ? entry.getUnits() : "";
Long value = entry.hasValue() ? entry.getValue() : 0L;
ResourceInformation ri =
ResourceInformation.newInstance(entry.getKey(), units, value, type);
ResourceInformation ri = ResourceInformation
.newInstance(entry.getKey(), units, value, type, 0L, Long.MAX_VALUE);
if (resources.containsKey(ri.getName())) {
resources.get(ri.getName()).setResourceType(ri.getResourceType());
resources.get(ri.getName()).setUnits(ri.getUnits());

View File

@ -23,8 +23,10 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.proto.YarnProtos.ProfileCapabilityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
@ -40,6 +42,7 @@ public class ResourceRequestPBImpl extends ResourceRequest {
private Priority priority = null;
private Resource capability = null;
private ExecutionTypeRequest executionTypeRequest = null;
private ProfileCapability profile = null;
public ResourceRequestPBImpl() {
@ -69,6 +72,9 @@ private void mergeLocalToBuilder() {
builder.setExecutionTypeRequest(
ProtoUtils.convertToProtoFormat(this.executionTypeRequest));
}
if (this.profile != null) {
builder.setProfile(converToProtoFormat(this.profile));
}
}
private void mergeLocalToProto() {
@ -229,7 +235,8 @@ public String toString() {
+ ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality()
+ ", Execution Type Request: " + getExecutionTypeRequest()
+ ", Node Label Expression: " + getNodeLabelExpression() + "}";
+ ", Node Label Expression: " + getNodeLabelExpression()
+ ", Resource Profile: " + getProfileCapability() + "}";
}
@Override
@ -250,4 +257,34 @@ public void setNodeLabelExpression(String nodeLabelExpression) {
}
builder.setNodeLabelExpression(nodeLabelExpression);
}
@Override
public void setProfileCapability(ProfileCapability profileCapability) {
maybeInitBuilder();
if (profile == null) {
builder.clearProfile();
}
this.profile = profileCapability;
}
@Override
public ProfileCapability getProfileCapability() {
if (profile != null) {
return profile;
}
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasProfile()) {
return null;
}
return new ProfileCapabilityPBImpl(p.getProfile());
}
private ProfileCapabilityProto converToProtoFormat(
ProfileCapability profileCapability) {
ProfileCapabilityPBImpl tmp = new ProfileCapabilityPBImpl();
tmp.setProfileName(profileCapability.getProfileName());
tmp.setProfileCapabilityOverride(
profileCapability.getProfileCapabilityOverride());
return tmp.getProto();
}
}

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
@ -51,6 +52,8 @@ public class ResourceUtils {
public static final String UNITS = ".units";
public static final String TYPE = ".type";
public static final String MINIMUM_ALLOCATION = ".minimum-allocation";
public static final String MAXIMUM_ALLOCATION = ".maximum-allocation";
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
private static final String VCORES = ResourceInformation.VCORES.getName();
@ -122,6 +125,86 @@ private static void addManadtoryResources(
}
}
private static void setMinimumAllocationForMandatoryResources(
Map<String, ResourceInformation> res, Configuration conf) {
String[][] resourceTypesKeys =
{
{ ResourceInformation.MEMORY_MB.getName(),
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
String.valueOf(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
ResourceInformation.MEMORY_MB.getName()
},
{ ResourceInformation.VCORES.getName(),
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
String.valueOf(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES),
ResourceInformation.VCORES.getName()
}
};
for (String[] arr : resourceTypesKeys) {
String resourceTypesKey =
YarnConfiguration.RESOURCE_TYPES + "." + arr[0] + MINIMUM_ALLOCATION;
long minimumResourceTypes = conf.getLong(resourceTypesKey, -1);
long minimumConf = conf.getLong(arr[1], -1);
long minimum;
if (minimumResourceTypes != -1) {
minimum = minimumResourceTypes;
if (minimumConf != -1) {
LOG.warn("Using minimum allocation for memory specified in "
+ "resource-types config file with key "
+ minimumResourceTypes + ", ignoring minimum specified using "
+ arr[1]);
}
} else {
minimum = conf.getLong(arr[1], Long.parseLong(arr[2]));
}
ResourceInformation ri = res.get(arr[3]);
ri.setMinimumAllocation(minimum);
}
}
private static void setMaximumAllocationForMandatoryResources(
Map<String, ResourceInformation> res, Configuration conf) {
String[][] resourceTypesKeys =
{
{
ResourceInformation.MEMORY_MB.getName(),
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
String.valueOf(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
ResourceInformation.MEMORY_MB.getName()
},
{
ResourceInformation.VCORES.getName(),
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
String.valueOf(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES),
ResourceInformation.VCORES.getName()
}
};
for (String[] arr : resourceTypesKeys) {
String resourceTypesKey =
YarnConfiguration.RESOURCE_TYPES + "." + arr[0] + MAXIMUM_ALLOCATION;
long maximumResourceTypes = conf.getLong(resourceTypesKey, -1);
long maximumConf = conf.getLong(arr[1], -1);
long maximum;
if (maximumResourceTypes != -1) {
maximum = maximumResourceTypes;
if (maximumConf != -1) {
LOG.warn("Using maximum allocation for memory specified in "
+ "resource-types config file with key "
+ maximumResourceTypes + ", ignoring maximum specified using "
+ arr[1]);
}
} else {
maximum = conf.getLong(arr[1], Long.parseLong(arr[2]));
}
ResourceInformation ri = res.get(arr[3]);
ri.setMaximumAllocation(maximum);
}
}
@VisibleForTesting
static void initializeResourcesMap(Configuration conf,
Map<String, ResourceInformation> resourceInformationMap) {
@ -135,6 +218,12 @@ static void initializeResourcesMap(Configuration conf,
String resourceTypeName = conf.get(
YarnConfiguration.RESOURCE_TYPES + "." + resourceName + TYPE,
ResourceTypes.COUNTABLE.toString());
Long minimumAllocation = conf.getLong(
YarnConfiguration.RESOURCE_TYPES + "." + resourceName
+ MINIMUM_ALLOCATION, 0L);
Long maximumAllocation = conf.getLong(
YarnConfiguration.RESOURCE_TYPES + "." + resourceName
+ MAXIMUM_ALLOCATION, Long.MAX_VALUE);
if (resourceName == null || resourceName.isEmpty()
|| resourceUnits == null || resourceTypeName == null) {
throw new YarnRuntimeException(
@ -154,11 +243,14 @@ static void initializeResourcesMap(Configuration conf,
"Error in config, key '" + resourceName + "' specified twice");
}
resourceInformationMap.put(resourceName, ResourceInformation
.newInstance(resourceName, resourceUnits, 0L, resourceType));
.newInstance(resourceName, resourceUnits, 0L, resourceType,
minimumAllocation, maximumAllocation));
}
}
checkMandatatoryResources(resourceInformationMap);
addManadtoryResources(resourceInformationMap);
setMinimumAllocationForMandatoryResources(resourceInformationMap, conf);
setMaximumAllocationForMandatoryResources(resourceInformationMap, conf);
readOnlyResources = Collections.unmodifiableMap(resourceInformationMap);
}
@ -172,6 +264,12 @@ public static Map<String, ResourceInformation> getResourceTypes() {
YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE);
}
private static Map<String, ResourceInformation> getResourceTypes(
Configuration conf) {
return getResourceTypes(conf,
YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE);
}
private static Map<String, ResourceInformation> getResourceTypes(
Configuration conf, String resourceFile) {
if (lock == null) {
@ -205,6 +303,12 @@ private static InputStream getConfInputStream(String resourceFile,
ConfigurationProvider provider =
ConfigurationProviderFactory.getConfigurationProvider(conf);
try {
provider.init(conf);
} catch (Exception e) {
throw new IOException(e);
}
InputStream ris = provider.getConfigurationInputStream(conf, resourceFile);
if (ris == null) {
if (conf.getResource(resourceFile) == null) {
@ -241,6 +345,12 @@ static void resetResourceTypes() {
lock = null;
}
@VisibleForTesting
public static void resetResourceTypes(Configuration conf) {
lock = null;
getResourceTypes(conf);
}
public static String getUnits(String resourceValue) {
String units;
for (int i = 0; i < resourceValue.length(); i++) {
@ -326,4 +436,53 @@ synchronized public static void resetNodeResources() {
nodeLock = null;
}
public static Resource getResourceTypesMinimumAllocation() {
Map<String, ResourceInformation> resourceTypes = getResourceTypes();
Resource ret = Resource.newInstance(0, 0);
for (Map.Entry<String, ResourceInformation> entry : resourceTypes
.entrySet()) {
String name = entry.getKey();
if (name.equals(ResourceInformation.MEMORY_MB.getName())) {
ret.setMemorySize(entry.getValue().getMinimumAllocation());
continue;
}
if (name.equals(ResourceInformation.VCORES.getName())) {
Long tmp = entry.getValue().getMinimumAllocation();
if (tmp > Integer.MAX_VALUE) {
tmp = (long) Integer.MAX_VALUE;
}
ret.setVirtualCores(tmp.intValue());
continue;
}
ret.setResourceValue(name, entry.getValue().getMinimumAllocation());
}
return ret;
}
/**
* Get a Resource object with for the maximum allocation possible.
* @return a Resource object with the maximum allocation for the scheduler
*/
public static Resource getResourceTypesMaximumAllocation() {
Map<String, ResourceInformation> resourceTypes = getResourceTypes();
Resource ret = Resource.newInstance(0, 0);
for (Map.Entry<String, ResourceInformation> entry : resourceTypes
.entrySet()) {
String name = entry.getKey();
if (name.equals(ResourceInformation.MEMORY_MB.getName())) {
ret.setMemorySize(entry.getValue().getMaximumAllocation());
continue;
}
if (name.equals(ResourceInformation.VCORES.getName())) {
Long tmp = entry.getValue().getMaximumAllocation();
if (tmp > Integer.MAX_VALUE) {
tmp = (long) Integer.MAX_VALUE;
}
ret.setVirtualCores(tmp.intValue());
continue;
}
ret.setResourceValue(name, entry.getValue().getMaximumAllocation());
}
return ret;
}
}

View File

@ -75,12 +75,12 @@ public long getMemorySize() {
}
@Override
@SuppressWarnings("deprecation")
public void setMemory(int memory) {
throw new RuntimeException(name + " cannot be modified!");
}
@Override
@SuppressWarnings("deprecation")
public void setMemorySize(long memory) {
throw new RuntimeException(name + " cannot be modified!");
}
@ -193,13 +193,7 @@ public static Resource unbounded() {
}
public static Resource clone(Resource res) {
Resource ret = Resource.newInstance(0, 0);
for (Map.Entry<String, ResourceInformation> entry : res.getResources()
.entrySet()) {
ret.setResourceInformation(entry.getKey(),
ResourceInformation.newInstance(entry.getValue()));
}
return ret;
return Resource.newInstance(res);
}
public static Resource addTo(Resource lhs, Resource rhs) {

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
@ -100,11 +101,13 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
RecordFactoryProvider.getRecordFactory(null);
private RMContext rmContext;
private ResourceProfilesManager resourceProfilesManager;
@Override
public void init(ApplicationMasterServiceContext amsContext,
ApplicationMasterServiceProcessor nextProcessor) {
this.rmContext = (RMContext)amsContext;
this.resourceProfilesManager = rmContext.getResourceProfilesManager();
}
@Override
@ -171,6 +174,11 @@ public void registerApplicationMaster(
response.setSchedulerResourceTypes(getScheduler()
.getSchedulingResourceTypes());
if (getRmContext().getYarnConfiguration().getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED)) {
response
.setResourceProfiles(resourceProfilesManager.getResourceProfiles());
}
}
@Override

View File

@ -32,6 +32,7 @@
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@ -46,6 +47,7 @@
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
@ -65,6 +67,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
.RMAppAttemptState;
@ -88,6 +91,8 @@
*/
public class RMServerUtils {
private static final Log LOG_HANDLE = LogFactory.getLog(RMServerUtils.class);
public static final String UPDATE_OUTSTANDING_ERROR =
"UPDATE_OUTSTANDING_ERROR";
private static final String INCORRECT_CONTAINER_VERSION_ERROR =
@ -622,4 +627,49 @@ private static Set<NodeId> getNodeIdsForLabel(RMContext rmContext,
return labelsToNodes.get(label);
}
}
public static void convertProfileToResourceCapability(
List<ResourceRequest> asks, Configuration conf,
ResourceProfilesManager resourceProfilesManager) throws YarnException {
boolean profilesEnabled =
conf.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
if (!profilesEnabled) {
return;
}
for (ResourceRequest req : asks) {
convertProfileToResourceCapability(req, conf, resourceProfilesManager);
}
}
public static void convertProfileToResourceCapability(ResourceRequest ask,
Configuration conf, ResourceProfilesManager resourceProfilesManager)
throws YarnException {
if (LOG_HANDLE.isDebugEnabled()) {
LOG_HANDLE
.debug("Converting profile to resource capability for ask " + ask);
}
boolean profilesEnabled =
conf.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
if (!profilesEnabled) {
if (ask.getProfileCapability() != null && !ask.getProfileCapability()
.getProfileCapabilityOverride().equals(Resources.none())) {
ask.setCapability(
ask.getProfileCapability().getProfileCapabilityOverride());
}
} else {
if (ask.getProfileCapability() != null) {
ask.setCapability(ProfileCapability
.toResource(ask.getProfileCapability(),
resourceProfilesManager.getResourceProfiles()));
}
}
if (LOG_HANDLE.isDebugEnabled()) {
LOG_HANDLE
.debug("Converted profile to resource capability for ask " + ask);
}
}
}

View File

@ -88,6 +88,10 @@ private void loadProfiles() throws IOException {
while (iterator.hasNext()) {
Map.Entry entry = (Map.Entry) iterator.next();
String key = entry.getKey().toString();
if (key.isEmpty()) {
throw new IOException(
"Name of resource profile cannot be an empty string");
}
if (entry.getValue() instanceof Map) {
Map value = (Map) entry.getValue();
// ensure memory and vcores are specified

View File

@ -95,6 +95,7 @@
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@ -1281,8 +1282,51 @@ public List<NodeId> getNodeIds(String resourceName) {
* @param container Container.
*/
public void asyncContainerRelease(RMContainer container) {
this.rmContext.getDispatcher().getEventHandler()
.handle(new ReleaseContainerEvent(container));
this.rmContext.getDispatcher().getEventHandler().handle(
new ReleaseContainerEvent(container));
}
/*
* Get a Resource object with for the minimum allocation possible. If resource
* profiles are enabled then the 'minimum' resource profile will be used. If
* they are not enabled, use the minimums specified in the config files.
*
* @return a Resource object with the minimum allocation for the scheduler
*/
public Resource getMinimumAllocation() {
boolean profilesEnabled = getConfig()
.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
Resource ret;
if (!profilesEnabled) {
ret = ResourceUtils.getResourceTypesMinimumAllocation();
} else {
ret = rmContext.getResourceProfilesManager().getMinimumProfile();
}
LOG.info("Minimum allocation = " + ret);
return ret;
}
/**
* Get a Resource object with for the maximum allocation possible. If resource
* profiles are enabled then the 'maximum' resource profile will be used. If
* they are not enabled, use the maximums specified in the config files.
*
* @return a Resource object with the maximum allocation for the scheduler
*/
public Resource getMaximumAllocation() {
boolean profilesEnabled = getConfig()
.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
Resource ret;
if (!profilesEnabled) {
ret = ResourceUtils.getResourceTypesMaximumAllocation();
} else {
ret = rmContext.getResourceProfilesManager().getMaximumProfile();
}
LOG.info("Maximum allocation = " + ret);
return ret;
}
@Override

View File

@ -221,8 +221,7 @@ public Resource getMaxAllowedAllocation() {
return Resources.createResource(
Math.min(configuredMaxAllocation.getMemorySize(), maxNodeMemory),
Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores)
);
Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores));
} finally {
readLock.unlock();
}

View File

@ -39,10 +39,12 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
@ -253,6 +255,14 @@ public static void normalizeAndvalidateRequest(ResourceRequest resReq,
private static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
throws InvalidResourceRequestException {
try {
RMServerUtils.convertProfileToResourceCapability(resReq,
rmContext.getYarnConfiguration(),
rmContext.getResourceProfilesManager());
} catch (YarnException ye) {
throw new InvalidResourceRequestException(ye);
}
if (resReq.getCapability().getMemorySize() < 0 ||
resReq.getCapability().getMemorySize() > maximumResource.getMemorySize()) {
throw new InvalidResourceRequestException("Invalid resource request"

View File

@ -291,8 +291,8 @@ void initScheduler(Configuration configuration) throws
writeLock.lock();
this.conf = loadCapacitySchedulerConfiguration(configuration);
validateConf(this.conf);
this.minimumAllocation = this.conf.getMinimumAllocation();
initMaximumResourceCapability(this.conf.getMaximumAllocation());
this.minimumAllocation = super.getMinimumAllocation();
initMaximumResourceCapability(super.getMaximumAllocation());
this.calculator = this.conf.getResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications = new ConcurrentHashMap<>();

View File

@ -1298,8 +1298,8 @@ private void initScheduler(Configuration conf) throws IOException {
this.conf = new FairSchedulerConfiguration(conf);
validateConf(this.conf);
authorizer = YarnAuthorizationProvider.getInstance(conf);
minimumAllocation = this.conf.getMinimumAllocation();
initMaximumResourceCapability(this.conf.getMaximumAllocation());
minimumAllocation = super.getMinimumAllocation();
initMaximumResourceCapability(super.getMaximumAllocation());
incrAllocation = this.conf.getIncrementAllocation();
updateReservationThreshold();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();

View File

@ -241,17 +241,8 @@ private synchronized void initScheduler(Configuration conf) {
//Use ConcurrentSkipListMap because applications need to be ordered
this.applications =
new ConcurrentSkipListMap<>();
this.minimumAllocation =
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
initMaximumResourceCapability(
Resources.createResource(conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
conf.getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES)));
this.minimumAllocation = super.getMinimumAllocation();
initMaximumResourceCapability(super.getMaximumAllocation());
this.usePortForNodeName = conf.getBoolean(
YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);

View File

@ -104,6 +104,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@ -150,6 +151,7 @@ public MockRM(Configuration conf, RMStateStore store,
public MockRM(Configuration conf, RMStateStore store,
boolean useNullRMNodeLabelsManager, boolean useRealElector) {
super();
ResourceUtils.resetResourceTypes(conf);
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
this.useRealElector = useRealElector;
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));

View File

@ -247,6 +247,7 @@ public void setUp() {
ResourceScheduler scheduler = mockResourceScheduler();
((RMContextImpl)rmContext).setScheduler(scheduler);
Configuration conf = new Configuration();
((RMContextImpl) rmContext).setYarnConfiguration(conf);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
appMonitor = new TestRMAppManager(rmContext,

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -672,4 +673,38 @@ private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
Assert.fail("Cannot find RMContainer");
}
}
@Test(timeout = 3000000)
public void testResourceProfiles() throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
RMApp app1 = rm.submitApp(2048);
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
RegisterApplicationMasterResponse resp = am1.registerAppAttempt();
Assert.assertEquals(0, resp.getResourceProfiles().size());
rm.stop();
conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true);
conf.set(YarnConfiguration.RM_RESOURCE_PROFILES_SOURCE_FILE,
"profiles/sample-profiles-1.json");
rm = new MockRM(conf);
rm.start();
nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
app1 = rm.submitApp(2048);
nm1.nodeHeartbeat(true);
attempt1 = app1.getCurrentAppAttempt();
am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
resp = am1.registerAppAttempt();
Assert.assertEquals(3, resp.getResourceProfiles().size());
Assert.assertEquals(Resource.newInstance(1024, 1),
resp.getResourceProfiles().get("minimum"));
Assert.assertEquals(Resource.newInstance(2048, 2),
resp.getResourceProfiles().get("default"));
Assert.assertEquals(Resource.newInstance(4096, 4),
resp.getResourceProfiles().get("maximum"));
rm.stop();
}
}

View File

@ -113,6 +113,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
@ -212,6 +213,7 @@ public void testLoadConfigurationOnInitialize() throws IOException {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
128);
ResourceUtils.resetResourceTypes(conf);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
@ -240,6 +242,7 @@ public void testNonMinZeroResourcesSettings() throws IOException {
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
ResourceUtils.resetResourceTypes(conf);
scheduler.init(conf);
scheduler.reinitialize(conf, null);
Assert.assertEquals(256, scheduler.getMinimumResourceCapability().getMemorySize());
@ -257,6 +260,7 @@ public void testMinZeroResourcesSettings() throws IOException {
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
ResourceUtils.resetResourceTypes(conf);
scheduler.init(conf);
scheduler.reinitialize(conf, null);
Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getMemorySize());

View File

@ -95,6 +95,7 @@
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@ -250,6 +251,7 @@ public void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_USE_RPC);
failoverTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
ResourceUtils.resetResourceTypes(conf);
if (useRpc && !useFixedPorts) {
throw new YarnRuntimeException("Invalid configuration!" +