YARN-5176. More test cases for queuing of containers at the NM. (Konstantinos Karanasos via asuresh)
This commit is contained in:
parent
58be55b6e0
commit
76f0800c21
@ -160,6 +160,7 @@ protected void stopContainerInternal(ContainerId containerID)
|
||||
containerTokenId.getExecutionType());
|
||||
|
||||
if (foundInQueue) {
|
||||
LOG.info("Removing queued container with ID " + containerID);
|
||||
this.context.getQueuingContext().getKilledQueuedContainers().put(
|
||||
containerTokenId,
|
||||
"Queued container request removed by ApplicationMaster.");
|
||||
@ -502,6 +503,16 @@ public int getNumAllocatedOpportunisticContainers() {
|
||||
return allocatedOpportunisticContainers.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getNumQueuedGuaranteedContainers() {
|
||||
return queuedGuaranteedContainers.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getNumQueuedOpportunisticContainers() {
|
||||
return queuedOpportunisticContainers.size();
|
||||
}
|
||||
|
||||
class QueuingApplicationEventDispatcher implements
|
||||
EventHandler<ApplicationEvent> {
|
||||
private EventHandler<ApplicationEvent> applicationEventDispatcher;
|
||||
|
@ -40,10 +40,17 @@
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
@ -51,6 +58,7 @@
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
@ -71,6 +79,7 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
@ -354,4 +363,59 @@ public static void waitForNMContainerState(ContainerManagerImpl
|
||||
Assert.assertEquals("ContainerState is not correct (timedout)",
|
||||
finalState, currentState);
|
||||
}
|
||||
|
||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||
NodeId nodeId, String user,
|
||||
NMContainerTokenSecretManager containerTokenSecretManager)
|
||||
throws IOException {
|
||||
return createContainerToken(cId, rmIdentifier, nodeId, user,
|
||||
containerTokenSecretManager, null);
|
||||
}
|
||||
|
||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||
NodeId nodeId, String user,
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
LogAggregationContext logAggregationContext)
|
||||
throws IOException {
|
||||
Resource r = BuilderUtils.newResource(1024, 1);
|
||||
return createContainerToken(cId, rmIdentifier, nodeId, user, r,
|
||||
containerTokenSecretManager, logAggregationContext);
|
||||
}
|
||||
|
||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||
NodeId nodeId, String user, Resource resource,
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
LogAggregationContext logAggregationContext)
|
||||
throws IOException {
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
|
||||
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
|
||||
Priority.newInstance(0), 0, logAggregationContext, null);
|
||||
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
|
||||
.retrievePassword(containerTokenIdentifier),
|
||||
containerTokenIdentifier);
|
||||
}
|
||||
|
||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||
NodeId nodeId, String user, Resource resource,
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
LogAggregationContext logAggregationContext, ExecutionType executionType)
|
||||
throws IOException {
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
|
||||
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
|
||||
Priority.newInstance(0), 0, logAggregationContext, null,
|
||||
ContainerType.TASK, executionType);
|
||||
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
|
||||
.retrievePassword(containerTokenIdentifier),
|
||||
containerTokenIdentifier);
|
||||
}
|
||||
|
||||
public static ContainerId createContainerId(int id) {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
|
||||
return containerId;
|
||||
}
|
||||
}
|
||||
|
@ -18,6 +18,11 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
@ -38,10 +43,10 @@
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||
@ -58,13 +63,9 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||
@ -75,7 +76,6 @@
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
||||
@ -90,8 +90,6 @@
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -99,11 +97,6 @@
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class TestContainerManager extends BaseContainerManagerTest {
|
||||
|
||||
public TestContainerManager() throws UnsupportedFileSystemException {
|
||||
@ -119,14 +112,6 @@ public TestContainerManager() throws UnsupportedFileSystemException {
|
||||
public void setup() throws IOException {
|
||||
super.setup();
|
||||
}
|
||||
|
||||
public static ContainerId createContainerId(int id) {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
|
||||
return containerId;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainerManagerImpl
|
||||
@ -1153,53 +1138,6 @@ public void testChangeContainerResource() throws Exception {
|
||||
assertEquals(targetResource, containerStatus.getCapability());
|
||||
}
|
||||
|
||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||
NodeId nodeId, String user,
|
||||
NMContainerTokenSecretManager containerTokenSecretManager)
|
||||
throws IOException {
|
||||
return createContainerToken(cId, rmIdentifier, nodeId, user,
|
||||
containerTokenSecretManager, null);
|
||||
}
|
||||
|
||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||
NodeId nodeId, String user,
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
LogAggregationContext logAggregationContext)
|
||||
throws IOException {
|
||||
Resource r = BuilderUtils.newResource(1024, 1);
|
||||
return createContainerToken(cId, rmIdentifier, nodeId, user, r,
|
||||
containerTokenSecretManager, logAggregationContext);
|
||||
}
|
||||
|
||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||
NodeId nodeId, String user, Resource resource,
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
LogAggregationContext logAggregationContext)
|
||||
throws IOException {
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
|
||||
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
|
||||
Priority.newInstance(0), 0, logAggregationContext, null);
|
||||
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
|
||||
.retrievePassword(containerTokenIdentifier),
|
||||
containerTokenIdentifier);
|
||||
}
|
||||
|
||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||
NodeId nodeId, String user, Resource resource,
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
LogAggregationContext logAggregationContext, ExecutionType executionType)
|
||||
throws IOException {
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
|
||||
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
|
||||
Priority.newInstance(0), 0, logAggregationContext, null,
|
||||
ContainerType.TASK, executionType);
|
||||
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
|
||||
.retrievePassword(containerTokenIdentifier),
|
||||
containerTokenIdentifier);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOutputThreadDumpSignal() throws IOException,
|
||||
InterruptedException, YarnException {
|
||||
|
@ -18,6 +18,11 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -32,42 +37,27 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
|
||||
.ContainersMonitorImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class TestQueuingContainerManager extends TestContainerManager {
|
||||
/**
|
||||
* Class for testing the {@link QueuingContainerManagerImpl}.
|
||||
*/
|
||||
public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
||||
|
||||
interface HasResources {
|
||||
boolean decide(Context context, ContainerId cId);
|
||||
@ -119,15 +109,6 @@ protected UserGroupInformation getRemoteUgi() throws YarnException {
|
||||
return ugi;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void authorizeGetAndStopContainerRequest(
|
||||
ContainerId containerId, Container container, boolean stopRequest,
|
||||
NMTokenIdentifier identifier) throws YarnException {
|
||||
if (container == null || container.getUser().equals("Fail")) {
|
||||
throw new YarnException("Reject this container");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainersMonitor createContainersMonitor(
|
||||
ContainerExecutor exec) {
|
||||
@ -148,7 +129,7 @@ public long getVmemAllocatedForContainers() {
|
||||
|
||||
@Override
|
||||
public long getVCoresAllocatedForContainers() {
|
||||
return 2;
|
||||
return 4;
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -186,54 +167,17 @@ public void setup() throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that an OPPORTUNISTIC container is killed when
|
||||
* a GUARANTEED container arrives and all the Node Resources are used up
|
||||
*
|
||||
* For this specific test case, 4 containers are requested (last one being
|
||||
* guaranteed). Assumptions :
|
||||
* 1) The first OPPORTUNISTIC Container will start running
|
||||
* 2) The second and third OPP containers will be queued
|
||||
* 3) When the GUARANTEED container comes in, the running OPP container
|
||||
* will be killed to make room
|
||||
* 4) After the GUARANTEED container finishes, the remaining 2 OPP
|
||||
* containers will be dequeued and run.
|
||||
* 5) Only the first OPP container will be killed.
|
||||
*
|
||||
* Starting one GUARANTEED and one OPPORTUNISTIC container.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testSimpleOpportunisticContainer() throws Exception {
|
||||
public void testStartMultipleContainers() throws Exception {
|
||||
shouldDeleteWait = true;
|
||||
containerManager.start();
|
||||
|
||||
// ////// Create the resources for the container
|
||||
File dir = new File(tmpDir, "dir");
|
||||
dir.mkdirs();
|
||||
File file = new File(dir, "file");
|
||||
PrintWriter fileWriter = new PrintWriter(file);
|
||||
fileWriter.write("Hello World!");
|
||||
fileWriter.close();
|
||||
|
||||
// ////// Construct the container-spec.
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
URL resource_alpha =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
.makeQualified(new Path(file.getAbsolutePath())));
|
||||
LocalResource rsrc_alpha =
|
||||
recordFactory.newRecordInstance(LocalResource.class);
|
||||
rsrc_alpha.setResource(resource_alpha);
|
||||
rsrc_alpha.setSize(-1);
|
||||
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
rsrc_alpha.setType(LocalResourceType.FILE);
|
||||
rsrc_alpha.setTimestamp(file.lastModified());
|
||||
String destinationFile = "dest_file";
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
localResources.put(destinationFile, rsrc_alpha);
|
||||
containerLaunchContext.setLocalResources(localResources);
|
||||
|
||||
// Start 3 OPPORTUNISTIC containers and 1 GUARANTEED container
|
||||
List<StartContainerRequest> list = new ArrayList<>();
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
@ -241,6 +185,122 @@ public void testSimpleOpportunisticContainer() throws Exception {
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(1024, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.GUARANTEED)));
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(1024, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
|
||||
StartContainersRequest allRequests =
|
||||
StartContainersRequest.newInstance(list);
|
||||
containerManager.startContainers(allRequests);
|
||||
|
||||
BaseContainerManagerTest.waitForContainerState(containerManager,
|
||||
createContainerId(0),
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
|
||||
BaseContainerManagerTest.waitForContainerState(containerManager,
|
||||
createContainerId(1),
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
|
||||
|
||||
// Ensure all containers are running.
|
||||
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
statList.add(createContainerId(i));
|
||||
}
|
||||
GetContainerStatusesRequest statRequest =
|
||||
GetContainerStatusesRequest.newInstance(statList);
|
||||
List<ContainerStatus> containerStatuses = containerManager
|
||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||
for (ContainerStatus status : containerStatuses) {
|
||||
Assert.assertEquals(
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||
status.getState());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit both a GUARANTEED and an OPPORTUNISTIC container, each of which
|
||||
* requires more resources than available at the node, and make sure they
|
||||
* are both queued.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testQueueMultipleContainers() throws Exception {
|
||||
shouldDeleteWait = true;
|
||||
containerManager.start();
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
List<StartContainerRequest> list = new ArrayList<>();
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(3072, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.GUARANTEED)));
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(3072, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
|
||||
StartContainersRequest allRequests =
|
||||
StartContainersRequest.newInstance(list);
|
||||
containerManager.startContainers(allRequests);
|
||||
|
||||
Thread.sleep(5000);
|
||||
|
||||
// Ensure both containers are queued.
|
||||
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
statList.add(createContainerId(i));
|
||||
}
|
||||
GetContainerStatusesRequest statRequest =
|
||||
GetContainerStatusesRequest.newInstance(statList);
|
||||
List<ContainerStatus> containerStatuses = containerManager
|
||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||
for (ContainerStatus status : containerStatuses) {
|
||||
Assert.assertEquals(
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
|
||||
status.getState());
|
||||
}
|
||||
|
||||
// Ensure both containers are properly queued.
|
||||
Assert.assertEquals(2, containerManager.getContext().getQueuingContext()
|
||||
.getQueuedContainers().size());
|
||||
Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager)
|
||||
.getNumQueuedGuaranteedContainers());
|
||||
Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager)
|
||||
.getNumQueuedOpportunisticContainers());
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts one OPPORTUNISTIC container that takes up the whole node's
|
||||
* resources, and submit two more that will be queued.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testStartAndQueueMultipleContainers() throws Exception {
|
||||
shouldDeleteWait = true;
|
||||
containerManager.start();
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
List<StartContainerRequest> list = new ArrayList<>();
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(2048, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
@ -256,21 +316,185 @@ public void testSimpleOpportunisticContainer() throws Exception {
|
||||
user, BuilderUtils.newResource(1024, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
// GUARANTEED
|
||||
|
||||
StartContainersRequest allRequests =
|
||||
StartContainersRequest.newInstance(list);
|
||||
containerManager.startContainers(allRequests);
|
||||
|
||||
Thread.sleep(5000);
|
||||
|
||||
// Ensure first container is running and others are queued.
|
||||
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
statList.add(createContainerId(i));
|
||||
}
|
||||
GetContainerStatusesRequest statRequest = GetContainerStatusesRequest
|
||||
.newInstance(Arrays.asList(createContainerId(0)));
|
||||
List<ContainerStatus> containerStatuses = containerManager
|
||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||
for (ContainerStatus status : containerStatuses) {
|
||||
if (status.getContainerId().equals(createContainerId(0))) {
|
||||
Assert.assertEquals(
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||
status.getState());
|
||||
} else {
|
||||
Assert.assertEquals(
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
|
||||
status.getState());
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure two containers are properly queued.
|
||||
Assert.assertEquals(2, containerManager.getContext().getQueuingContext()
|
||||
.getQueuedContainers().size());
|
||||
Assert.assertEquals(0, ((QueuingContainerManagerImpl) containerManager)
|
||||
.getNumQueuedGuaranteedContainers());
|
||||
Assert.assertEquals(2, ((QueuingContainerManagerImpl) containerManager)
|
||||
.getNumQueuedOpportunisticContainers());
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
|
||||
* requests by each container as such that only one can run in parallel.
|
||||
* Thus, the OPPORTUNISTIC container that started running, will be
|
||||
* killed for the GUARANTEED container to start.
|
||||
* Once the GUARANTEED container finishes its execution, the remaining
|
||||
* OPPORTUNISTIC container will be executed.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testKillOpportunisticForGuaranteedContainer() throws Exception {
|
||||
shouldDeleteWait = true;
|
||||
containerManager.start();
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
List<StartContainerRequest> list = new ArrayList<>();
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(2048, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(2048, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(2048, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.GUARANTEED)));
|
||||
|
||||
StartContainersRequest allRequests =
|
||||
StartContainersRequest.newInstance(list);
|
||||
containerManager.startContainers(allRequests);
|
||||
|
||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||
createContainerId(0), ContainerState.DONE, 30);
|
||||
Thread.sleep(5000);
|
||||
|
||||
// Get container statuses. Container 0 should be killed, container 1
|
||||
// should be queued and container 2 should be running.
|
||||
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
statList.add(createContainerId(i));
|
||||
}
|
||||
GetContainerStatusesRequest statRequest =
|
||||
GetContainerStatusesRequest.newInstance(statList);
|
||||
List<ContainerStatus> containerStatuses = containerManager
|
||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||
for (ContainerStatus status : containerStatuses) {
|
||||
if (status.getContainerId().equals(createContainerId(0))) {
|
||||
Assert.assertTrue(status.getDiagnostics()
|
||||
.contains("Container killed by the ApplicationMaster"));
|
||||
} else if (status.getContainerId().equals(createContainerId(1))) {
|
||||
Assert.assertEquals(
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
|
||||
status.getState());
|
||||
} else if (status.getContainerId().equals(createContainerId(2))) {
|
||||
Assert.assertEquals(
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||
status.getState());
|
||||
}
|
||||
System.out.println("\nStatus : [" + status + "]\n");
|
||||
}
|
||||
|
||||
// Make sure the remaining OPPORTUNISTIC container starts its execution.
|
||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||
createContainerId(2), ContainerState.DONE, 30);
|
||||
Thread.sleep(5000);
|
||||
statRequest = GetContainerStatusesRequest.newInstance(Arrays.asList(
|
||||
createContainerId(1)));
|
||||
ContainerStatus contStatus1 = containerManager.getContainerStatuses(
|
||||
statRequest).getContainerStatuses().get(0);
|
||||
Assert.assertEquals(
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||
contStatus1.getState());
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit three OPPORTUNISTIC containers that can run concurrently, and one
|
||||
* GUARANTEED that needs to kill two of the OPPORTUNISTIC for it to run.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testKillMultipleOpportunisticContainers() throws Exception {
|
||||
shouldDeleteWait = true;
|
||||
containerManager.start();
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
List<StartContainerRequest> list = new ArrayList<>();
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(512, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(512, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(512, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(1024, 1),
|
||||
user, BuilderUtils.newResource(1500, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.GUARANTEED)));
|
||||
|
||||
StartContainersRequest allRequests =
|
||||
StartContainersRequest.newInstance(list);
|
||||
|
||||
containerManager.startContainers(allRequests);
|
||||
|
||||
Thread.sleep(10000);
|
||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||
createContainerId(0), ContainerState.DONE, 30);
|
||||
Thread.sleep(5000);
|
||||
|
||||
// Get container statuses. Container 0 should be killed, container 1
|
||||
// should be queued and container 2 should be running.
|
||||
int killedContainers = 0;
|
||||
int runningContainers = 0;
|
||||
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||
for (int i = 0; i < 4; i++) {
|
||||
statList.add(createContainerId(i));
|
||||
@ -280,12 +504,18 @@ public void testSimpleOpportunisticContainer() throws Exception {
|
||||
List<ContainerStatus> containerStatuses = containerManager
|
||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||
for (ContainerStatus status : containerStatuses) {
|
||||
// Ensure that the first opportunistic container is killed
|
||||
if (status.getContainerId().equals(createContainerId(0))) {
|
||||
Assert.assertTrue(status.getDiagnostics()
|
||||
.contains("Container killed by the ApplicationMaster"));
|
||||
if (status.getDiagnostics().contains(
|
||||
"Container killed by the ApplicationMaster")) {
|
||||
killedContainers++;
|
||||
}
|
||||
if (status.getState() ==
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) {
|
||||
runningContainers++;
|
||||
}
|
||||
System.out.println("\nStatus : [" + status + "]\n");
|
||||
}
|
||||
|
||||
Assert.assertEquals(2, killedContainers);
|
||||
Assert.assertEquals(2, runningContainers);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user