YARN-5110. Fix OpportunisticContainerAllocator to insert complete HostAddress in issued ContainerTokenIds. (Konstantinos Karanasos via asuresh)
This commit is contained in:
parent
010e6ac328
commit
1597630681
@ -41,5 +41,5 @@ public abstract class QueuedContainersStatus {
|
||||
|
||||
public abstract int getWaitQueueLength();
|
||||
|
||||
public abstract void setWaitQueueLength(int queueWaitTime);
|
||||
public abstract void setWaitQueueLength(int waitQueueLength);
|
||||
}
|
||||
|
@ -200,6 +200,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
||||
.getContainerTokenIdentifier().getContainerID();
|
||||
this.context.getQueuingContext().getQueuedContainers().remove(containerId);
|
||||
try {
|
||||
LOG.info("Starting container [" + containerId + "]");
|
||||
super.startContainerInternal(
|
||||
allocatedContainerInfo.getContainerTokenIdentifier(),
|
||||
allocatedContainerInfo.getStartRequest());
|
||||
|
@ -337,8 +337,10 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
|
||||
@Override
|
||||
public DistSchedAllocateResponse allocateForDistributedScheduling
|
||||
(AllocateRequest request) throws YarnException, IOException {
|
||||
LOG.info("Forwarding allocate request to the" +
|
||||
"Distributed Scheduler Service on YARN RM");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Forwarding allocate request to the" +
|
||||
"Distributed Scheduler Service on YARN RM");
|
||||
}
|
||||
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
|
||||
PartitionedResourceRequests partitionedAsks = partitionAskList(request
|
||||
.getAskList());
|
||||
|
@ -84,14 +84,14 @@ public class OpportunisticContainerAllocator {
|
||||
Map<String, NodeId> allNodes, String userName) throws YarnException {
|
||||
Map<Resource, List<Container>> containers = new HashMap<>();
|
||||
Set<String> nodesAllocated = new HashSet<>();
|
||||
int numAsks = resourceAsks.size();
|
||||
for (ResourceRequest anyAsk : resourceAsks) {
|
||||
allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
|
||||
allNodes, userName, containers, nodesAllocated, anyAsk);
|
||||
}
|
||||
if (numAsks > 0) {
|
||||
LOG.info("Opportunistic allocation requested for: " + numAsks
|
||||
+ " containers; allocated = " + containers.size());
|
||||
LOG.info("Opportunistic allocation requested for ["
|
||||
+ "priority=" + anyAsk.getPriority()
|
||||
+ ", num_containers=" + anyAsk.getNumContainers()
|
||||
+ ", capability=" + anyAsk.getCapability() + "]"
|
||||
+ " allocated = " + containers.get(anyAsk.getCapability()).size());
|
||||
}
|
||||
return containers;
|
||||
}
|
||||
@ -129,8 +129,9 @@ public class OpportunisticContainerAllocator {
|
||||
}
|
||||
cList.add(container);
|
||||
numAllocated++;
|
||||
LOG.info("Allocated " + numAllocated + " opportunistic containers.");
|
||||
LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
|
||||
}
|
||||
LOG.info("Allocated " + numAllocated + " opportunistic containers.");
|
||||
}
|
||||
|
||||
private Container buildContainer(DistSchedulerParams appParams,
|
||||
@ -146,8 +147,8 @@ public class OpportunisticContainerAllocator {
|
||||
long currTime = System.currentTimeMillis();
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
new ContainerTokenIdentifier(
|
||||
cId, nodeId.getHost(), userName, capability,
|
||||
currTime + appParams.containerTokenExpiryInterval,
|
||||
cId, nodeId.getHost() + ":" + nodeId.getPort(), userName,
|
||||
capability, currTime + appParams.containerTokenExpiryInterval,
|
||||
context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
|
||||
nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
|
||||
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security
|
||||
.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security
|
||||
.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
@ -196,9 +198,14 @@ public class TestLocalScheduler {
|
||||
}
|
||||
|
||||
private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
|
||||
allocateResponse) {
|
||||
allocateResponse) throws Exception {
|
||||
Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
|
||||
for (Container c : allocateResponse.getAllocatedContainers()) {
|
||||
ContainerTokenIdentifier cTokId = BuilderUtils
|
||||
.newContainerTokenIdentifier(c.getContainerToken());
|
||||
Assert.assertEquals(
|
||||
c.getNodeId().getHost() + ":" + c.getNodeId().getPort(),
|
||||
cTokId.getNmHostAddress());
|
||||
List<ContainerId> cIds = allocs.get(c.getNodeId());
|
||||
if (cIds == null) {
|
||||
cIds = new ArrayList<>();
|
||||
|
@ -195,11 +195,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
|
||||
new ClusterNode(rmNode.getNodeID())
|
||||
.setQueueWaitTime(estimatedQueueWaitTime)
|
||||
.setQueueLength(waitQueueLength));
|
||||
LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
|
||||
LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " +
|
||||
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
|
||||
"wait queue length [" + waitQueueLength + "]");
|
||||
} else {
|
||||
LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
|
||||
LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "] " +
|
||||
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
|
||||
"wait queue length [" + waitQueueLength + "]");
|
||||
}
|
||||
@ -210,12 +210,14 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
|
||||
.setQueueWaitTime(estimatedQueueWaitTime)
|
||||
.setQueueLength(waitQueueLength)
|
||||
.updateTimestamp();
|
||||
LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
|
||||
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
|
||||
"wait queue length [" + waitQueueLength + "]");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Updating ClusterNode [" + rmNode.getNodeID() + "] " +
|
||||
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
|
||||
"wait queue length [" + waitQueueLength + "]");
|
||||
}
|
||||
} else {
|
||||
this.clusterNodes.remove(rmNode.getNodeID());
|
||||
LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
|
||||
LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " +
|
||||
"with queue wait time [" + currentNode.queueWaitTime + "] and " +
|
||||
"wait queue length [" + currentNode.queueLength + "]");
|
||||
}
|
||||
|
@ -75,6 +75,9 @@ import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||
@ -713,8 +716,14 @@ public class MiniYARNCluster extends CompositeService {
|
||||
ContainerExecutor exec, DeletionService del,
|
||||
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
|
||||
LocalDirsHandlerService dirsHandler) {
|
||||
return new CustomContainerManagerImpl(context, exec, del,
|
||||
nodeStatusUpdater, metrics, dirsHandler);
|
||||
if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED,
|
||||
YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) {
|
||||
return new CustomQueueingContainerManagerImpl(context, exec, del,
|
||||
nodeStatusUpdater, metrics, dirsHandler);
|
||||
} else {
|
||||
return new CustomContainerManagerImpl(context, exec, del,
|
||||
nodeStatusUpdater, metrics, dirsHandler);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -846,6 +855,55 @@ public class MiniYARNCluster extends CompositeService {
|
||||
}
|
||||
}
|
||||
|
||||
private class CustomQueueingContainerManagerImpl extends
|
||||
QueuingContainerManagerImpl {
|
||||
|
||||
public CustomQueueingContainerManagerImpl(Context context,
|
||||
ContainerExecutor exec, DeletionService del, NodeStatusUpdater
|
||||
nodeStatusUpdater, NodeManagerMetrics metrics,
|
||||
LocalDirsHandlerService dirsHandler) {
|
||||
super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ContainersMonitor createContainersMonitor(ContainerExecutor
|
||||
exec) {
|
||||
return new ContainersMonitorImpl(exec, dispatcher, this.context) {
|
||||
|
||||
@Override
|
||||
public void increaseContainersAllocation(ProcessTreeInfo pti) { }
|
||||
|
||||
@Override
|
||||
public void decreaseContainersAllocation(ProcessTreeInfo pti) { }
|
||||
|
||||
@Override
|
||||
public boolean hasResourcesAvailable(
|
||||
ContainersMonitorImpl.ProcessTreeInfo pti) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void createAMRMProxyService(Configuration conf) {
|
||||
this.amrmProxyEnabled =
|
||||
conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
|
||||
|
||||
if (this.amrmProxyEnabled) {
|
||||
LOG.info("CustomAMRMProxyService is enabled. "
|
||||
+ "All the AM->RM requests will be intercepted by the proxy");
|
||||
AMRMProxyService amrmProxyService =
|
||||
useRpc ? new AMRMProxyService(getContext(), dispatcher)
|
||||
: new ShortCircuitedAMRMProxy(getContext(), dispatcher);
|
||||
this.setAMRMProxyService(amrmProxyService);
|
||||
addService(this.getAMRMProxyService());
|
||||
} else {
|
||||
LOG.info("CustomAMRMProxyService is disabled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class ShortCircuitedAMRMProxy extends AMRMProxyService {
|
||||
|
||||
public ShortCircuitedAMRMProxy(Context context,
|
||||
|
Loading…
x
Reference in New Issue
Block a user