YARN-11702: Fix Yarn over allocating containers (#6990) Contributed by Syed Shameerur Rahman.

Reviewed-by: Akira Ajisaka <aajisaka@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
Syed Shameerur Rahman 2024-09-25 09:40:15 +05:30 committed by GitHub
parent e602c601dd
commit 21ec686be3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 597 additions and 1 deletions

View File

@ -1537,6 +1537,17 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY = public static final int DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY =
10; 10;
/**
* The configuration key for enabling or disabling the auto-correction of container allocation.
*/
public static final String RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION = RM_PREFIX
+ "scheduler.autocorrect.container.allocation";
/**
* Default value: {@value}.
*/
public static final boolean DEFAULT_RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION = false;
/** Whether to enable log aggregation */ /** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable"; + "log-aggregation-enable";

View File

@ -182,6 +182,21 @@
<name>yarn.resourcemanager.principal</name> <name>yarn.resourcemanager.principal</name>
</property> </property>
<property>
<description>
This configuration key enables or disables the auto-correction of container allocation in
YARN. Due to the asynchronous nature of container request and allocation, YARN may sometimes
over-allocate more containers than requested. The auto-correction feature addresses this by
automatically adjusting the number of requested containers based on those already allocated,
preventing extra containers from being allocated.
While the extra allocated containers will be released by the client within a few seconds,
this may not be a concern in normal circumstances. However, if the user is worried about
resource contention due to over-allocation, enabling this feature can help avoid such cases.
</description>
<name>yarn.resourcemanager.scheduler.autocorrect.container.allocation</name>
<value>false</value>
</property>
<property> <property>
<description>The address of the scheduler interface.</description> <description>The address of the scheduler interface.</description>
<name>yarn.resourcemanager.scheduler.address</name> <name>yarn.resourcemanager.scheduler.address</name>

View File

@ -22,8 +22,10 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -34,6 +36,10 @@
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken; import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -151,6 +157,7 @@ public abstract class AbstractYarnScheduler
Thread updateThread; Thread updateThread;
private final Object updateThreadMonitor = new Object(); private final Object updateThreadMonitor = new Object();
private Timer releaseCache; private Timer releaseCache;
private boolean autoCorrectContainerAllocation;
/* /*
* All schedulers which are inheriting AbstractYarnScheduler should use * All schedulers which are inheriting AbstractYarnScheduler should use
@ -212,6 +219,9 @@ public void serviceInit(Configuration conf) throws Exception {
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf); skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf);
autoCorrectContainerAllocation =
conf.getBoolean(YarnConfiguration.RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION,
YarnConfiguration.DEFAULT_RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION);
long configuredMaximumAllocationWaitTime = long configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
@ -624,6 +634,106 @@ public void recoverContainersOnNode(List<NMContainerStatus> containerReports,
} }
} }
/**
* Autocorrect container resourceRequests by decrementing the number of newly allocated containers
* from the current container request. This also updates the newlyAllocatedContainers to be within
* the limits of the current container resourceRequests.
* ResourceRequests locality/resourceName is not considered while autocorrecting the container
* request, hence when there are two types of resourceRequest which is same except for the
* locality/resourceName, it is counted as same {@link ContainerObjectType} and the container
* ask and number of newly allocated container is decremented accordingly.
* For example when a client requests for 4 containers with locality/resourceName
* as "node1", AMRMClientaugments the resourceRequest into two
* where R1(numContainer=4,locality=*) and R2(numContainer=4,locality=node1),
* if Yarn allocated 6 containers previously, it will release 2 containers as well as
* update the container ask to 0.
*
* If there is a client which directly calls Yarn (without AMRMClient) with
* two where R1(numContainer=4,locality=*) and R2(numContainer=4,locality=node1)
* the autocorrection may not work as expected. The use case of such client is very rare.
*
* <p>
* This method is called from {@link AbstractYarnScheduler#allocate} method. It is package private
* to be used within the scheduler package only.
* @param resourceRequests List of resources to be allocated
* @param application ApplicationAttempt
*/
@VisibleForTesting
protected void autoCorrectContainerAllocation(List<ResourceRequest> resourceRequests,
SchedulerApplicationAttempt application) {
// if there is no resourceRequests for containers or no newly allocated container from
// the previous request there is nothing to do.
if (!autoCorrectContainerAllocation || resourceRequests.isEmpty() ||
application.newlyAllocatedContainers.isEmpty()) {
return;
}
// iterate newlyAllocatedContainers and form a mapping of container type
// and number of its occurrence.
Map<ContainerObjectType, List<RMContainer>> allocatedContainerMap = new HashMap<>();
for (RMContainer rmContainer : application.newlyAllocatedContainers) {
Container container = rmContainer.getContainer();
ContainerObjectType containerObjectType = new ContainerObjectType(
container.getAllocationRequestId(), container.getPriority(),
container.getExecutionType(), container.getResource());
allocatedContainerMap.computeIfAbsent(containerObjectType,
k -> new ArrayList<>()).add(rmContainer);
}
Map<ContainerObjectType, Integer> extraContainerAllocatedMap = new HashMap<>();
// iterate through resourceRequests and update the request by
// decrementing the already allocated containers.
for (ResourceRequest request : resourceRequests) {
ContainerObjectType containerObjectType =
new ContainerObjectType(request.getAllocationRequestId(),
request.getPriority(), request.getExecutionTypeRequest().getExecutionType(),
request.getCapability());
int numContainerAllocated = allocatedContainerMap.getOrDefault(containerObjectType,
Collections.emptyList()).size();
if (numContainerAllocated > 0) {
int numContainerAsk = request.getNumContainers();
int updatedContainerRequest = numContainerAsk - numContainerAllocated;
if (updatedContainerRequest < 0) {
// add an entry to extra allocated map
extraContainerAllocatedMap.put(containerObjectType, Math.abs(updatedContainerRequest));
LOG.debug("{} container of the resource type: {} will be released",
Math.abs(updatedContainerRequest), request);
// if newlyAllocatedContainer count is more than the current container
// resourceRequests, reset it to 0.
updatedContainerRequest = 0;
}
// update the request
LOG.debug("Updating container resourceRequests from {} to {} for the resource type: {}",
numContainerAsk, updatedContainerRequest, request);
request.setNumContainers(updatedContainerRequest);
}
}
// Iterate over the entries in extraContainerAllocatedMap
for (Map.Entry<ContainerObjectType, Integer> entry : extraContainerAllocatedMap.entrySet()) {
ContainerObjectType containerObjectType = entry.getKey();
int extraContainers = entry.getValue();
// Get the list of allocated containers for the current ContainerObjectType
List<RMContainer> allocatedContainers = allocatedContainerMap.get(containerObjectType);
if (allocatedContainers != null) {
for (RMContainer rmContainer : allocatedContainers) {
if (extraContainers > 0) {
// Change the state of the container from ALLOCATED to EXPIRED since it is not required.
LOG.debug("Removing extra container:{}", rmContainer.getContainer());
completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(), SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
application.newlyAllocatedContainers.remove(rmContainer);
extraContainers--;
}
}
}
}
}
private RMContainer recoverAndCreateContainer(NMContainerStatus status, private RMContainer recoverAndCreateContainer(NMContainerStatus status,
RMNode node, String queueName) { RMNode node, String queueName) {
Container container = Container container =
@ -658,6 +768,14 @@ private void recoverResourceRequestForContainer(RMContainer rmContainer) {
return; return;
} }
// when auto correct container allocation is enabled, there can be a case when extra containers
// go to expired state from allocated state. When such scenario happens do not re-attempt the
// container request since this is expected.
if (autoCorrectContainerAllocation &&
RMContainerState.EXPIRED.equals(rmContainer.getState())) {
return;
}
// Add resource request back to Scheduler ApplicationAttempt. // Add resource request back to Scheduler ApplicationAttempt.
// We lookup the application-attempt here again using // We lookup the application-attempt here again using
@ -1678,4 +1796,77 @@ private List<ApplicationAttemptId> getAppsFromQueue(String queueName)
} }
return apps; return apps;
} }
/**
* ContainerObjectType is a container object with the following properties.
* Namely allocationId, priority, executionType and resourceType.
*/
protected class ContainerObjectType extends Object {
private final long allocationId;
private final Priority priority;
private final ExecutionType executionType;
private final Resource resource;
public ContainerObjectType(long allocationId, Priority priority,
ExecutionType executionType, Resource resource) {
this.allocationId = allocationId;
this.priority = priority;
this.executionType = executionType;
this.resource = resource;
}
public long getAllocationId() {
return allocationId;
}
public Priority getPriority() {
return priority;
}
public ExecutionType getExecutionType() {
return executionType;
}
public Resource getResource() {
return resource;
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(allocationId)
.append(priority)
.append(executionType)
.append(resource)
.toHashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != this.getClass()) {
return false;
}
ContainerObjectType other = (ContainerObjectType) obj;
return new EqualsBuilder()
.append(allocationId, other.getAllocationId())
.append(priority, other.getPriority())
.append(executionType, other.getExecutionType())
.append(resource, other.getResource())
.isEquals();
}
@Override
public String toString() {
return "{ContainerObjectType: "
+ ", Priority: " + getPriority()
+ ", Allocation Id: " + getAllocationId()
+ ", Execution Type: " + getExecutionType()
+ ", Resource: " + getResource()
+ "}";
}
}
} }

View File

@ -862,7 +862,8 @@ protected synchronized void addToUpdateContainerErrors(
updateContainerErrors.add(error); updateContainerErrors.add(error);
} }
protected synchronized void addToNewlyAllocatedContainers( @VisibleForTesting
public synchronized void addToNewlyAllocatedContainers(
SchedulerNode node, RMContainer rmContainer) { SchedulerNode node, RMContainer rmContainer) {
ContainerId matchedContainerId = ContainerId matchedContainerId =
getUpdateContext().matchContainerToOutstandingIncreaseReq( getUpdateContext().matchContainerToOutstandingIncreaseReq(

View File

@ -1363,6 +1363,10 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
application.showRequests(); application.showRequests();
} }
// update the current container ask by considering the already allocated
// containers from previous allocation request and return updatedNewlyAllocatedContainers.
autoCorrectContainerAllocation(ask, application);
// Update application requests // Update application requests
if (application.updateResourceRequests(ask) || application if (application.updateResourceRequests(ask) || application
.updateSchedulingRequests(schedulingRequests)) { .updateSchedulingRequests(schedulingRequests)) {

View File

@ -943,6 +943,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
} }
application.showRequests(); application.showRequests();
// update the current container ask by considering the already allocated containers
// from previous allocation request as well as populate the updatedNewlyAllocatedContainers
// list according the to the current ask.
autoCorrectContainerAllocation(ask, application);
// Update application requests // Update application requests
application.updateResourceRequests(ask); application.updateResourceRequests(ask);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.createResourceRequest;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -65,17 +66,24 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService; import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@ -83,6 +91,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
@ -268,6 +277,352 @@ private void testMaximumAllocationVCoresHelper(
Assert.assertEquals(0, scheduler.getNumClusterNodes()); Assert.assertEquals(0, scheduler.getNumClusterNodes());
} }
/**
* Test for testing autocorrect container allocation feature.
*/
@Test
public void testAutoCorrectContainerAllocation() {
Configuration conf = new Configuration(getConf());
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION, true);
conf.setBoolean("yarn.scheduler.capacity.root.auto-create-child-queue.enabled",
true);
MockRM rm = new MockRM(conf);
rm.start();
AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm.getResourceScheduler();
String host = "127.0.0.1";
RMNode node =
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * 1024), 1, host);
scheduler.handle(new NodeAddedSchedulerEvent(node));
//add app begin
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
appId1, 1);
RMAppAttemptMetrics attemptMetric1 =
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
RMAppImpl app1 = mock(RMAppImpl.class);
when(app1.getApplicationId()).thenReturn(appId1);
RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
Container container = mock(Container.class);
when(attempt1.getMasterContainer()).thenReturn(container);
ApplicationSubmissionContext submissionContext = mock(
ApplicationSubmissionContext.class);
when(attempt1.getSubmissionContext()).thenReturn(submissionContext);
when(attempt1.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
rm.getRMContext().getRMApps().put(appId1, app1);
ApplicationPlacementContext apc = new ApplicationPlacementContext("user",
"root");
SchedulerEvent addAppEvent1 =
new AppAddedSchedulerEvent(appId1, "user", "user", apc);
scheduler.handle(addAppEvent1);
SchedulerEvent addAttemptEvent1 =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
scheduler.handle(addAttemptEvent1);
SchedulerApplicationAttempt application = scheduler.getApplicationAttempt(appAttemptId);
SchedulerNode schedulerNode = scheduler.getSchedulerNode(node.getNodeID());
Priority priority = Priority.newInstance(0);
NodeId nodeId = NodeId.newInstance("foo.bar.org", 1234);
// test different container ask and newly allocated container.
testContainerAskAndNewlyAllocatedContainerZero(scheduler, application, priority);
testContainerAskAndNewlyAllocatedContainerOne(scheduler, application, schedulerNode,
nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
testContainerAskZeroAndNewlyAllocatedContainerOne(scheduler, application, schedulerNode,
nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
testContainerAskFourAndNewlyAllocatedContainerEight(scheduler, application, schedulerNode,
nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
testContainerAskFourAndNewlyAllocatedContainerSix(scheduler, application, schedulerNode,
nodeId, priority, app1.getCurrentAppAttempt().getAppAttemptId());
}
/**
* Creates a mock instance of {@link RMContainer} with the specified parameters.
*
* @param containerId The ID of the container
* @param nodeId The NodeId of the node where the container is allocated
* @param appAttemptId The ApplicationAttemptId of the application attempt
* @param allocationId The allocation ID of the container
* @param memory The amount of memory (in MB) requested for the container
* @param priority The priority of the container request
* @param executionType The execution type of the container request
* @return A mock instance of RMContainer with the specified parameters
*/
private RMContainer createMockRMContainer(int containerId, NodeId nodeId,
ApplicationAttemptId appAttemptId, long allocationId, int memory,
Priority priority, ExecutionType executionType) {
// Create a mock instance of Container
Container container = mock(Container.class);
// Mock the Container instance with the specified parameters
when(container.getResource()).thenReturn(Resource.newInstance(memory, 1));
when(container.getPriority()).thenReturn(priority);
when(container.getId()).thenReturn(ContainerId.newContainerId(appAttemptId, containerId));
when(container.getNodeId()).thenReturn(nodeId);
when(container.getAllocationRequestId()).thenReturn(allocationId);
when(container.getExecutionType()).thenReturn(executionType);
when(container.getContainerToken()).thenReturn(Token.newInstance(new byte[0], "kind",
new byte[0], "service"));
// Create a mock instance of RMContainerImpl
RMContainer rmContainer = mock(RMContainerImpl.class);
// Set up the behavior of the mock RMContainer
when(rmContainer.getContainer()).thenReturn(container);
when(rmContainer.getContainerId()).thenReturn(
ContainerId.newContainerId(appAttemptId, containerId));
return rmContainer;
}
/**
* Tests the behavior when the container ask is 1 and there are no newly allocated containers.
*
* @param scheduler The AbstractYarnScheduler instance to test.
* @param application The SchedulerApplicationAttempt instance representing the application.
* @param priority The priority of the resource request.
*/
private void testContainerAskAndNewlyAllocatedContainerZero(AbstractYarnScheduler scheduler,
SchedulerApplicationAttempt application, Priority priority) {
// Create a resource request with 1 container, 1024 MB memory, and GUARANTEED execution type
ResourceRequest resourceRequest = createResourceRequest(1024, 1, 1,
priority, 0,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
// Create a list with the resource request
List<ResourceRequest> containerAsk = new ArrayList<>();
containerAsk.add(resourceRequest);
// Call the autoCorrectContainerAllocation method
scheduler.autoCorrectContainerAllocation(containerAsk, application);
// Assert that the container ask remains unchanged (1 container)
assertEquals(1, containerAsk.get(0).getNumContainers());
// Assert that there are no newly allocated containers
assertEquals(0, application.pullNewlyAllocatedContainers().size());
}
/**
* Tests the behavior when the container ask is 1 and there is one newly allocated container.
*
* @param scheduler The AbstractYarnScheduler instance to test
* @param application The SchedulerApplicationAttempt instance representing the application
* @param schedulerNode The SchedulerNode instance representing the node
* @param nodeId The NodeId of the node
* @param priority The priority of the resource request
* @param appAttemptId The ApplicationAttemptId of the application attempt
*/
private void testContainerAskAndNewlyAllocatedContainerOne(AbstractYarnScheduler scheduler,
SchedulerApplicationAttempt application,
SchedulerNode schedulerNode, NodeId nodeId,
Priority priority, ApplicationAttemptId appAttemptId) {
// Create a resource request with 1 container, 1024 MB memory, and GUARANTEED execution type
ResourceRequest resourceRequest = createResourceRequest(1024, 1, 1,
priority, 0L, ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED),
ResourceRequest.ANY);
List<ResourceRequest> containerAsk = new ArrayList<>();
containerAsk.add(resourceRequest);
// Create an RMContainer with the specified parameters
RMContainer rmContainer = createMockRMContainer(1, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
// Add the RMContainer to the newly allocated containers of the application
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer);
// Call the autoCorrectContainerAllocation method
scheduler.autoCorrectContainerAllocation(containerAsk, application);
// Assert that the container ask is updated to 0
assertEquals(0, containerAsk.get(0).getNumContainers());
// Assert that there is one newly allocated container
assertEquals(1, application.pullNewlyAllocatedContainers().size());
}
/**
* Tests the behavior when the container ask is 0 and there is one newly allocated container.
*
* @param scheduler The AbstractYarnScheduler instance to test
* @param application The SchedulerApplicationAttempt instance representing the application
* @param schedulerNode The SchedulerNode instance representing the node
* @param nodeId The NodeId of the node
* @param priority The priority of the resource request
* @param appAttemptId The ApplicationAttemptId of the application attempt
*/
private void testContainerAskZeroAndNewlyAllocatedContainerOne(AbstractYarnScheduler scheduler,
SchedulerApplicationAttempt application, SchedulerNode schedulerNode, NodeId nodeId,
Priority priority, ApplicationAttemptId appAttemptId) {
// Create a resource request with 0 containers, 1024 MB memory, and GUARANTEED execution type
ResourceRequest resourceRequest = createResourceRequest(1024, 1,
0, priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
List<ResourceRequest> containerAsk = new ArrayList<>();
containerAsk.add(resourceRequest);
// Create an RMContainer with the specified parameters
RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
// Add the RMContainer to the newly allocated containers of the application
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1);
// Call the autoCorrectContainerAllocation method
scheduler.autoCorrectContainerAllocation(containerAsk, application);
// Assert that the container ask remains 0
assertEquals(0, resourceRequest.getNumContainers());
// Assert that there are no newly allocated containers
assertEquals(0, application.pullNewlyAllocatedContainers().size());
}
/**
* Tests the behavior when the container ask consists of four unique resource requests
* and there are eight newly allocated containers (two containers for each resource request type).
*
* @param scheduler The AbstractYarnScheduler instance to test
* @param application The SchedulerApplicationAttempt instance representing the application
* @param schedulerNode The SchedulerNode instance representing the node
* @param nodeId The NodeId of the node
* @param priority The priority of the resource requests
* @param appAttemptId The ApplicationAttemptId of the application attempt
*/
private void testContainerAskFourAndNewlyAllocatedContainerEight(AbstractYarnScheduler scheduler,
SchedulerApplicationAttempt application, SchedulerNode schedulerNode,
NodeId nodeId, Priority priority, ApplicationAttemptId appAttemptId) {
// Create four unique resource requests
ResourceRequest resourceRequest1 = createResourceRequest(1024, 1, 1,
priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
ResourceRequest resourceRequest2 = createResourceRequest(2048, 1, 1,
priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
ResourceRequest resourceRequest3 = createResourceRequest(1024, 1, 1,
priority, 1L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
ResourceRequest resourceRequest4 = createResourceRequest(1024, 1, 1,
priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC), ResourceRequest.ANY);
// Add the resource requests to a list
List<ResourceRequest> ask4 = new ArrayList<>();
ask4.add(resourceRequest1);
ask4.add(resourceRequest2);
ask4.add(resourceRequest3);
ask4.add(resourceRequest4);
// Create eight RMContainers (two for each resource request type)
RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer2 = createMockRMContainer(2, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer3 = createMockRMContainer(3, nodeId, appAttemptId,
0L, 2048, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer4 = createMockRMContainer(4, nodeId, appAttemptId,
0L, 2048, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer5 = createMockRMContainer(5, nodeId, appAttemptId,
1L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer6 = createMockRMContainer(6, nodeId, appAttemptId,
1L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer7 = createMockRMContainer(7, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.OPPORTUNISTIC);
RMContainer rmContainer8 = createMockRMContainer(8, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.OPPORTUNISTIC);
// Add the RMContainers to the newly allocated containers of the application
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer2);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer3);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer4);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer5);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer6);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer7);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer8);
// Call the autoCorrectContainerAllocation method
scheduler.autoCorrectContainerAllocation(ask4, application);
// Assert that all resource requests have 0 containers
for (ResourceRequest rr : ask4) {
assertEquals(0, rr.getNumContainers());
}
// Assert that there are four newly allocated containers
assertEquals(4, application.pullNewlyAllocatedContainers().size());
}
/**
* Tests the behavior when the container ask consists of two resource requests.
* i.e one for any host and one for a specific host ,
* each requesting four containers, and there are six newly allocated containers.
*
* @param scheduler The AbstractYarnScheduler instance to test
* @param application The SchedulerApplicationAttempt instance representing the application
* @param schedulerNode The SchedulerNode instance representing the node
* @param nodeId The NodeId of the node
* @param priority The priority of the resource requests
* @param appAttemptId The ApplicationAttemptId of the application attempt
*/
private void testContainerAskFourAndNewlyAllocatedContainerSix(AbstractYarnScheduler scheduler,
SchedulerApplicationAttempt application, SchedulerNode schedulerNode,
NodeId nodeId, Priority priority, ApplicationAttemptId appAttemptId) {
// Create a resource request for any host, requesting 4 containers
ResourceRequest resourceRequest1 = createResourceRequest(1024, 1, 4,
priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), ResourceRequest.ANY);
// Create a resource request for a specific host, requesting 4 containers
ResourceRequest resourceRequest2 = createResourceRequest(1024, 1, 4,
priority, 0L,
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED), nodeId.getHost());
// Add the resource requests to a list
List<ResourceRequest> containerAsk = new ArrayList<>();
containerAsk.add(resourceRequest1);
containerAsk.add(resourceRequest2);
// Create six RMContainers with the specified parameters
RMContainer rmContainer1 = createMockRMContainer(1, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer2 = createMockRMContainer(2, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer3 = createMockRMContainer(3, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer4 = createMockRMContainer(4, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer5 = createMockRMContainer(5, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
RMContainer rmContainer6 = createMockRMContainer(6, nodeId, appAttemptId,
0L, 1024, priority, ExecutionType.GUARANTEED);
// Add the RMContainers to the newly allocated containers of the application
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer1);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer2);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer3);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer4);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer5);
application.addToNewlyAllocatedContainers(schedulerNode, rmContainer6);
// Call the autoCorrectContainerAllocation method
scheduler.autoCorrectContainerAllocation(containerAsk, application);
// Assert that all resource requests have 0 containers
for (ResourceRequest resourceRequest : containerAsk) {
assertEquals(0, resourceRequest.getNumContainers());
}
// Assert that there are four newly allocated containers
assertEquals(4, application.pullNewlyAllocatedContainers().size());
}
@Test @Test
public void testUpdateMaxAllocationUsesTotal() throws IOException { public void testUpdateMaxAllocationUsesTotal() throws IOException {
final int configuredMaxVCores = 20; final int configuredMaxVCores = 20;

View File

@ -196,6 +196,20 @@ public static ResourceRequest createResourceRequest(String resourceName,
return request; return request;
} }
public static ResourceRequest createResourceRequest(int memory, int vcores, int numContainers,
Priority priority, long allocationId, ExecutionTypeRequest type, String resourceName) {
ResourceRequest request =
recordFactory.newRecordInstance(ResourceRequest.class);
Resource capability = Resources.createResource(memory, vcores);
request.setNumContainers(numContainers);
request.setCapability(capability);
request.setPriority(priority);
request.setAllocationRequestId(allocationId);
request.setExecutionTypeRequest(type);
request.setResourceName(resourceName);
return request;
}
public static ResourceRequest createResourceRequest( public static ResourceRequest createResourceRequest(
String resourceName, int memory, int numContainers, boolean relaxLocality, String resourceName, int memory, int numContainers, boolean relaxLocality,
Priority priority, Priority priority,