YARN-5117. QueuingContainerManager does not start GUARANTEED Container even if Resources are available. (Konstantinos Karanasos via asuresh)
This commit is contained in:
parent
21890c4239
commit
4fc09a897b
@ -769,12 +769,14 @@ public void subtractNodeResourcesFromResourceUtilization(
|
|||||||
(int) (getVmemAllocatedForContainers() >> 20), 1.0f);
|
(int) (getVmemAllocatedForContainers() >> 20), 1.0f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates the vCores CPU usage that is assigned to the given
|
||||||
|
* {@link ProcessTreeInfo}. In particular, it takes into account the number of
|
||||||
|
* vCores that are allowed to be used by the NM and returns the CPU usage
|
||||||
|
* as a normalized value between {@literal >=} 0 and {@literal <=} 1.
|
||||||
|
*/
|
||||||
private float allocatedCpuUsage(ProcessTreeInfo pti) {
|
private float allocatedCpuUsage(ProcessTreeInfo pti) {
|
||||||
float cpuUsagePercentPerCore = pti.getCpuVcores() * 100.0f;
|
return (float) pti.getCpuVcores() / getVCoresAllocatedForContainers();
|
||||||
float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore
|
|
||||||
/ resourceCalculatorPlugin.getNumProcessors();
|
|
||||||
return (cpuUsageTotalCoresPercentage * 1000 *
|
|
||||||
maxVCoresAllottedForContainers / nodeCpuPercentageForYARN) / 1000.0f;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -127,17 +127,19 @@ protected void startContainerInternal(
|
|||||||
hasResourcesAvailable(allocatedContInfo.getPti())) {
|
hasResourcesAvailable(allocatedContInfo.getPti())) {
|
||||||
startAllocatedContainer(allocatedContInfo);
|
startAllocatedContainer(allocatedContInfo);
|
||||||
} else {
|
} else {
|
||||||
this.context.getNMStateStore().storeContainer(containerTokenIdentifier
|
ContainerId cIdToStart = containerTokenIdentifier.getContainerID();
|
||||||
.getContainerID(), request);
|
this.context.getNMStateStore().storeContainer(cIdToStart, request);
|
||||||
this.context.getNMStateStore().storeContainerQueued(
|
this.context.getNMStateStore().storeContainerQueued(cIdToStart);
|
||||||
containerTokenIdentifier.getContainerID());
|
LOG.info("No available resources for container {} to start its execution "
|
||||||
|
+ "immediately.", cIdToStart);
|
||||||
if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
|
if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
|
||||||
queuedGuaranteedContainers.add(allocatedContInfo);
|
queuedGuaranteedContainers.add(allocatedContInfo);
|
||||||
// Kill running opportunistic containers to make space for
|
// Kill running opportunistic containers to make space for
|
||||||
// guaranteed container.
|
// guaranteed container.
|
||||||
killOpportunisticContainers(allocatedContInfo);
|
killOpportunisticContainers(allocatedContInfo);
|
||||||
} else {
|
} else {
|
||||||
|
LOG.info("Opportunistic container {} will be queued at the NM.",
|
||||||
|
cIdToStart);
|
||||||
queuedOpportunisticContainers.add(allocatedContInfo);
|
queuedOpportunisticContainers.add(allocatedContInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -401,7 +403,6 @@ private ResourceUtilization resourcesToFreeUp(
|
|||||||
// Subtract the overall node resources.
|
// Subtract the overall node resources.
|
||||||
getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
|
getContainersMonitor().subtractNodeResourcesFromResourceUtilization(
|
||||||
resourceAllocationToFreeUp);
|
resourceAllocationToFreeUp);
|
||||||
|
|
||||||
return resourceAllocationToFreeUp;
|
return resourceAllocationToFreeUp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,7 +30,6 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
@ -43,7 +42,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
@ -83,12 +81,11 @@ public TestQueuingContainerManager() throws UnsupportedFileSystemException {
|
|||||||
LOG = LogFactory.getLog(TestQueuingContainerManager.class);
|
LOG = LogFactory.getLog(TestQueuingContainerManager.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
HasResources hasResources = null;
|
|
||||||
boolean shouldDeleteWait = false;
|
boolean shouldDeleteWait = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ContainerManagerImpl
|
protected ContainerManagerImpl createContainerManager(
|
||||||
createContainerManager(DeletionService delSrvc) {
|
DeletionService delSrvc) {
|
||||||
return new QueuingContainerManagerImpl(context, exec, delSrvc,
|
return new QueuingContainerManagerImpl(context, exec, delSrvc,
|
||||||
nodeStatusUpdater, metrics, dirsHandler) {
|
nodeStatusUpdater, metrics, dirsHandler) {
|
||||||
|
|
||||||
@ -123,21 +120,35 @@ protected UserGroupInformation getRemoteUgi() throws YarnException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
|
protected void authorizeGetAndStopContainerRequest(
|
||||||
Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
|
ContainerId containerId, Container container, boolean stopRequest,
|
||||||
if(container == null || container.getUser().equals("Fail")){
|
NMTokenIdentifier identifier) throws YarnException {
|
||||||
|
if (container == null || container.getUser().equals("Fail")) {
|
||||||
throw new YarnException("Reject this container");
|
throw new YarnException("Reject this container");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ContainersMonitor createContainersMonitor(ContainerExecutor
|
protected ContainersMonitor createContainersMonitor(
|
||||||
exec) {
|
ContainerExecutor exec) {
|
||||||
return new ContainersMonitorImpl(exec, dispatcher, this.context) {
|
return new ContainersMonitorImpl(exec, dispatcher, this.context) {
|
||||||
|
// Define resources available for containers to be executed.
|
||||||
@Override
|
@Override
|
||||||
public boolean hasResourcesAvailable(
|
public long getPmemAllocatedForContainers() {
|
||||||
ContainersMonitorImpl.ProcessTreeInfo pti) {
|
return 2048 * 1024 * 1024L;
|
||||||
return hasResources.decide(this.context, pti.getContainerId());
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getVmemAllocatedForContainers() {
|
||||||
|
float pmemRatio = getConfig().getFloat(
|
||||||
|
YarnConfiguration.NM_VMEM_PMEM_RATIO,
|
||||||
|
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
|
||||||
|
return (long) (pmemRatio * getPmemAllocatedForContainers());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getVCoresAllocatedForContainers() {
|
||||||
|
return 2;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -172,12 +183,6 @@ public void delete(String user, Path subDir, Path... baseDirs) {
|
|||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
super.setup();
|
super.setup();
|
||||||
shouldDeleteWait = false;
|
shouldDeleteWait = false;
|
||||||
hasResources = new HasResources() {
|
|
||||||
@Override
|
|
||||||
public boolean decide(Context context, ContainerId cId) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -256,31 +261,16 @@ public void testSimpleOpportunisticContainer() throws Exception {
|
|||||||
containerLaunchContext,
|
containerLaunchContext,
|
||||||
createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
|
createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
|
||||||
context.getNodeId(),
|
context.getNodeId(),
|
||||||
user, context.getContainerTokenSecretManager())));
|
user, BuilderUtils.newResource(1024, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.GUARANTEED)));
|
||||||
StartContainersRequest allRequests =
|
StartContainersRequest allRequests =
|
||||||
StartContainersRequest.newInstance(list);
|
StartContainersRequest.newInstance(list);
|
||||||
|
|
||||||
// Plugin to simulate that the Node is full
|
|
||||||
// It only allows 1 container to run at a time.
|
|
||||||
hasResources = new HasResources() {
|
|
||||||
@Override
|
|
||||||
public boolean decide(Context context, ContainerId cId) {
|
|
||||||
int nOpp = ((QueuingContainerManagerImpl) containerManager)
|
|
||||||
.getNumAllocatedOpportunisticContainers();
|
|
||||||
int nGuar = ((QueuingContainerManagerImpl) containerManager)
|
|
||||||
.getNumAllocatedGuaranteedContainers();
|
|
||||||
boolean val = (nOpp + nGuar < 1);
|
|
||||||
System.out.println("\nHasResources : [" + cId + "]," +
|
|
||||||
"Opp[" + nOpp + "], Guar[" + nGuar + "], [" + val + "]\n");
|
|
||||||
return val;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
containerManager.startContainers(allRequests);
|
containerManager.startContainers(allRequests);
|
||||||
|
|
||||||
BaseContainerManagerTest.waitForContainerState(containerManager,
|
Thread.sleep(10000);
|
||||||
createContainerId(3),
|
|
||||||
ContainerState.COMPLETE, 40);
|
|
||||||
List<ContainerId> statList = new ArrayList<ContainerId>();
|
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
statList.add(createContainerId(i));
|
statList.add(createContainerId(i));
|
||||||
|
Loading…
Reference in New Issue
Block a user