YARN-5486. Update OpportunisticContainerAllocatorAMService::allocate method to handle OPPORTUNISTIC container requests. (Konstantinos Karanasos via asuresh)

This commit is contained in:
Arun Suresh 2016-09-29 15:11:41 -07:00
parent 1e0ea27e96
commit 10be45986c
11 changed files with 707 additions and 168 deletions

View File

@ -0,0 +1,398 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import static org.junit.Assert.assertEquals;
/**
* Class that tests the allocation of OPPORTUNISTIC containers through the
* centralized ResourceManager.
*/
public class TestOpportunisticContainerAllocation {
private static Configuration conf = null;
private static MiniYARNCluster yarnCluster = null;
private static YarnClient yarnClient = null;
private static List<NodeReport> nodeReports = null;
private static ApplicationAttemptId attemptId = null;
private static int nodeCount = 3;
private static final int ROLLING_INTERVAL_SEC = 13;
private static final long AM_EXPIRE_MS = 4000;
private static Resource capability;
private static Priority priority;
private static Priority priority2;
private static String node;
private static String rack;
private static String[] nodes;
private static String[] racks;
private final static int DEFAULT_ITERATION = 3;
@BeforeClass
public static void setup() throws Exception {
// start minicluster
conf = new YarnConfiguration();
conf.setLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
ROLLING_INTERVAL_SEC);
conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
// set the minimum allocation so that resource decrease can go under 1024
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setBoolean(
YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
yarnCluster =
new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
// start rm client
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
// get node info
nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
priority = Priority.newInstance(1);
priority2 = Priority.newInstance(2);
capability = Resource.newInstance(1024, 1);
node = nodeReports.get(0).getNodeId().getHost();
rack = nodeReports.get(0).getRackName();
nodes = new String[]{node};
racks = new String[]{rack};
}
@Before
public void startApp() throws Exception {
// submit new app
ApplicationSubmissionContext appContext =
yarnClient.createApplication().getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
// set the application name
appContext.setApplicationName("Test");
// Set the priority for the application master
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(0);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue("default");
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
Collections.<String, LocalResource>emptyMap(),
new HashMap<String, String>(), Arrays.asList("sleep", "100"),
new HashMap<String, ByteBuffer>(), null,
new HashMap<ApplicationAccessType, String>());
appContext.setAMContainerSpec(amContainer);
appContext.setResource(Resource.newInstance(1024, 1));
// Create the request to send to the applications manager
SubmitApplicationRequest appRequest =
Records.newRecord(SubmitApplicationRequest.class);
appRequest.setApplicationSubmissionContext(appContext);
// Submit the application to the applications manager
yarnClient.submitApplication(appContext);
// wait for app to start
RMAppAttempt appAttempt = null;
while (true) {
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
if (appReport.getYarnApplicationState() ==
YarnApplicationState.ACCEPTED) {
attemptId = appReport.getCurrentApplicationAttemptId();
appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
.get(attemptId.getApplicationId()).getCurrentAppAttempt();
while (true) {
if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
break;
}
}
break;
}
}
// Just dig into the ResourceManager and get the AMRMToken just for the sake
// of testing.
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
// emulate RM setup of AMRM token in credentials by adding the token
// *before* setting the token service
UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
appAttempt.getAMRMToken()
.setService(ClientRMProxy.getAMRMTokenService(conf));
}
@After
public void cancelApp() throws YarnException, IOException {
yarnClient.killApplication(attemptId.getApplicationId());
attemptId = null;
}
@AfterClass
public static void tearDown() {
if (yarnClient != null &&
yarnClient.getServiceState() == Service.STATE.STARTED) {
yarnClient.stop();
}
if (yarnCluster != null &&
yarnCluster.getServiceState() == Service.STATE.STARTED) {
yarnCluster.stop();
}
}
@Test(timeout = 60000)
public void testAMRMClient() throws YarnException, IOException {
AMRMClient<AMRMClient.ContainerRequest> amClient = null;
try {
// start am rm client
amClient = AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient();
//setting an instance NMTokenCache
amClient.setNMTokenCache(new NMTokenCache());
//asserting we are not using the singleton instance cache
Assert.assertNotSame(NMTokenCache.getSingleton(),
amClient.getNMTokenCache());
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
testAllocation((AMRMClientImpl<AMRMClient.ContainerRequest>)amClient);
amClient
.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
null);
} finally {
if (amClient != null &&
amClient.getServiceState() == Service.STATE.STARTED) {
amClient.stop();
}
}
}
private void testAllocation(
final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
throws YarnException, IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
int containersRequestedNode = amClient.getTable(0).get(priority,
node, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedRack = amClient.getTable(0).get(priority,
rack, ExecutionType.GUARANTEED, capability).remoteRequest
.getNumContainers();
int containersRequestedAny = amClient.getTable(0).get(priority,
ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
.remoteRequest.getNumContainers();
int oppContainersRequestedAny =
amClient.getTable(0).get(priority2, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
assertEquals(2, containersRequestedNode);
assertEquals(2, containersRequestedRack);
assertEquals(2, containersRequestedAny);
assertEquals(1, oppContainersRequestedAny);
assertEquals(4, amClient.ask.size());
assertEquals(0, amClient.release.size());
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
int allocatedOpportContainerCount = 0;
int iterationsLeft = 10;
Set<ContainerId> releases = new TreeSet<>();
amClient.getNMTokenCache().clearCache();
Assert.assertEquals(0,
amClient.getNMTokenCache().numberOfTokensInCache());
HashMap<String, Token> receivedNMTokens = new HashMap<>();
while (allocatedContainerCount <
containersRequestedAny + oppContainersRequestedAny
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
allocatedContainerCount += allocResponse.getAllocatedContainers()
.size();
for (Container container : allocResponse.getAllocatedContainers()) {
if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
allocatedOpportContainerCount++;
}
ContainerId rejectContainerId = container.getId();
releases.add(rejectContainerId);
}
for (NMToken token : allocResponse.getNMTokens()) {
String nodeID = token.getNodeId().toString();
receivedNMTokens.put(nodeID, token.getToken());
}
if (allocatedContainerCount < containersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
assertEquals(allocatedContainerCount,
containersRequestedAny + oppContainersRequestedAny);
assertEquals(allocatedOpportContainerCount, oppContainersRequestedAny);
for (ContainerId rejectContainerId : releases) {
amClient.releaseAssignedContainer(rejectContainerId);
}
assertEquals(3, amClient.release.size());
assertEquals(0, amClient.ask.size());
// need to tell the AMRMClient that we don't need these resources anymore
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
amClient.removeContainerRequest(
new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
assertEquals(4, amClient.ask.size());
iterationsLeft = 3;
// do a few iterations to ensure RM is not going to send new containers
while (iterationsLeft-- > 0) {
// inform RM of rejection
AllocateResponse allocResponse = amClient.allocate(0.1f);
// RM did not send new containers because AM does not need any
assertEquals(0, allocResponse.getAllocatedContainers().size());
if (allocResponse.getCompletedContainersStatuses().size() > 0) {
for (ContainerStatus cStatus : allocResponse
.getCompletedContainersStatuses()) {
if (releases.contains(cStatus.getContainerId())) {
assertEquals(cStatus.getState(), ContainerState.COMPLETE);
assertEquals(-100, cStatus.getExitStatus());
releases.remove(cStatus.getContainerId());
}
}
}
if (iterationsLeft > 0) {
// sleep to make sure NM's heartbeat
sleep(100);
}
}
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
}
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -37,6 +37,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -144,15 +145,6 @@ public void resetContainerIdCounter(long containerIdStart) {
this.containerIdCounter.set(containerIdStart);
}
/**
* Sets the underlying Atomic Long. To be used when implementation needs to
* share the underlying AtomicLong of an existing counter.
* @param counter AtomicLong
*/
public void setContainerIdCounter(AtomicLong counter) {
this.containerIdCounter = counter;
}
/**
* Generates a new long value. Default implementation increments the
* underlying AtomicLong. Sub classes are encouraged to over-ride this
@ -213,6 +205,10 @@ public List<Container> allocateContainers(
PartitionedResourceRequests partitionedAsks =
partitionAskList(request.getAskList());
if (partitionedAsks.getOpportunistic().isEmpty()) {
return Collections.emptyList();
}
List<ContainerId> releasedContainers = request.getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
@ -236,8 +232,8 @@ public List<Container> allocateContainers(
appContext.getOutstandingOpReqs().descendingKeySet()) {
// Allocated containers :
// Key = Requested Capability,
// Value = List of Containers of given Cap (The actual container size
// might be different than what is requested.. which is why
// Value = List of Containers of given cap (the actual container size
// might be different than what is requested, which is why
// we need the requested capability (key) to match against
// the outstanding reqs)
Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
@ -290,6 +286,10 @@ private void allocateContainersInternal(long rmIdentifier,
}
nodesForScheduling.add(nodeEntry.getValue());
}
if (nodesForScheduling.isEmpty()) {
LOG.warn("No nodes available for allocating opportunistic containers.");
return;
}
int numAllocated = 0;
int nextNodeToSchedule = 0;
for (int numCont = 0; numCont < toAllocate; numCont++) {

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.yarn.server.scheduler;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -28,9 +30,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -56,15 +60,13 @@ public class OpportunisticContainerContext {
private ContainerIdGenerator containerIdGenerator =
new ContainerIdGenerator();
private Map<String, NodeId> nodeMap = new LinkedHashMap<>();
private volatile List<NodeId> nodeList = new LinkedList<>();
private final Map<String, NodeId> nodeMap = new LinkedHashMap<>();
// Mapping of NodeId to NodeTokens. Populated either from RM response or
// generated locally if required.
private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
private final Set<String> blacklist = new HashSet<>();
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
// Resource Name (Host/rack/any) and capability. This mapping is required
// Resource Name (host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequest (ask).
private final TreeMap<Priority, Map<Resource, ResourceRequest>>
@ -74,7 +76,7 @@ public Set<ContainerId> getContainersAllocated() {
return containersAllocated;
}
public OpportunisticContainerAllocator.AllocationParams getAppParams() {
public AllocationParams getAppParams() {
return appParams;
}
@ -88,11 +90,29 @@ public void setContainerIdGenerator(
}
public Map<String, NodeId> getNodeMap() {
return nodeMap;
return Collections.unmodifiableMap(nodeMap);
}
public Map<NodeId, NMToken> getNodeTokens() {
return nodeTokens;
public synchronized void updateNodeList(List<NodeId> newNodeList) {
// This is an optimization for centralized placement. The
// OppContainerAllocatorAMService has a cached list of nodes which it sets
// here. The nodeMap needs to be updated only if the backing node list is
// modified.
if (newNodeList != nodeList) {
nodeList = newNodeList;
nodeMap.clear();
for (NodeId n : nodeList) {
nodeMap.put(n.getHost(), n);
}
}
}
public void updateAllocationParams(Resource minResource, Resource maxResource,
Resource incrResource, int containerTokenExpiryInterval) {
appParams.setMinResource(minResource);
appParams.setMaxResource(maxResource);
appParams.setIncrementResource(incrResource);
appParams.setContainerTokenExpiryInterval(containerTokenExpiryInterval);
}
public Set<String> getBlacklist() {
@ -104,6 +124,15 @@ public Set<String> getBlacklist() {
return outstandingOpReqs;
}
public void updateCompletedContainers(AllocateResponse allocateResponse) {
for (ContainerStatus cs :
allocateResponse.getCompletedContainersStatuses()) {
if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
containersAllocated.remove(cs.getContainerId());
}
}
}
/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
* (Priority, ResourceName, Capability) and adds to the outstanding

View File

@ -336,8 +336,7 @@ protected void serviceInit(Configuration conf) throws Exception {
addService(nodeHealthChecker);
boolean isDistSchedulingEnabled =
conf.getBoolean(YarnConfiguration.
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
this.context = createNMContext(containerTokenSecretManager,

View File

@ -152,7 +152,7 @@ public AllocateResponse allocate(final AllocateRequest request)
return ((DistributedSchedulingAMProtocol)rmClient)
.registerApplicationMasterForDistributedScheduling(request);
} else {
throw new YarnException("Distributed Scheduling is not enabled !!");
throw new YarnException("Distributed Scheduling is not enabled.");
}
}
@ -174,7 +174,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
}
return allocateResponse;
} else {
throw new YarnException("Distributed Scheduling is not enabled !!");
throw new YarnException("Distributed Scheduling is not enabled.");
}
}

View File

@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
@ -32,8 +33,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -48,7 +47,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* <p>The DistributedScheduler runs on the NodeManager and is modeled as an
@ -74,6 +75,9 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
private OpportunisticContainerContext oppContainerContext =
new OpportunisticContainerContext();
// Mapping of NodeId to NodeTokens. Populated either from RM response or
// generated locally if required.
private Map<NodeId, NMToken> nodeTokens = new HashMap<>();
private ApplicationAttemptId applicationAttemptId;
private OpportunisticContainerAllocator containerAllocator;
private NMTokenSecretManagerInNM nmSecretManager;
@ -157,17 +161,17 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
/**
* Check if we already have a NMToken. if Not, generate the Token and
* add it to the response
* Adds all the newly allocated Containers to the allocate Response.
* Additionally, in case the NMToken for one of the nodes does not exist, it
* generates one and adds it to the response.
*/
private void updateResponseWithNMTokens(AllocateResponse response,
private void updateAllocateResponse(AllocateResponse response,
List<NMToken> nmTokens, List<Container> allocatedContainers) {
List<NMToken> newTokens = new ArrayList<>();
if (allocatedContainers.size() > 0) {
response.getAllocatedContainers().addAll(allocatedContainers);
for (Container alloc : allocatedContainers) {
if (!oppContainerContext.getNodeTokens().containsKey(
alloc.getNodeId())) {
if (!nodeTokens.containsKey(alloc.getNodeId())) {
newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
}
}
@ -179,17 +183,14 @@ private void updateResponseWithNMTokens(AllocateResponse response,
private void updateParameters(
RegisterDistributedSchedulingAMResponse registerResponse) {
oppContainerContext.getAppParams().setMinResource(
registerResponse.getMinContainerResource());
oppContainerContext.getAppParams().setMaxResource(
registerResponse.getMaxContainerResource());
oppContainerContext.getAppParams().setIncrementResource(
registerResponse.getIncrContainerResource());
if (oppContainerContext.getAppParams().getIncrementResource() == null) {
oppContainerContext.getAppParams().setIncrementResource(
oppContainerContext.getAppParams().getMinResource());
Resource incrementResource = registerResponse.getIncrContainerResource();
if (incrementResource == null) {
incrementResource = registerResponse.getMinContainerResource();
}
oppContainerContext.getAppParams().setContainerTokenExpiryInterval(
oppContainerContext.updateAllocationParams(
registerResponse.getMinContainerResource(),
registerResponse.getMaxContainerResource(),
incrementResource,
registerResponse.getContainerTokenExpiryInterval());
oppContainerContext.getContainerIdGenerator()
@ -198,14 +199,7 @@ private void updateParameters(
}
private void setNodeList(List<NodeId> nodeList) {
oppContainerContext.getNodeMap().clear();
addToNodeList(nodeList);
}
private void addToNodeList(List<NodeId> nodes) {
for (NodeId n : nodes) {
oppContainerContext.getNodeMap().put(n.getHost(), n);
}
oppContainerContext.updateNodeList(nodeList);
}
@Override
@ -243,23 +237,14 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
setNodeList(dsResp.getNodesForScheduling());
List<NMToken> nmTokens = dsResp.getAllocateResponse().getNMTokens();
for (NMToken nmToken : nmTokens) {
oppContainerContext.getNodeTokens().put(nmToken.getNodeId(), nmToken);
nodeTokens.put(nmToken.getNodeId(), nmToken);
}
List<ContainerStatus> completedContainers =
dsResp.getAllocateResponse().getCompletedContainersStatuses();
// Only account for opportunistic containers
for (ContainerStatus cs : completedContainers) {
if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
oppContainerContext.getContainersAllocated()
.remove(cs.getContainerId());
}
}
oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse());
// Check if we have NM tokens for all the allocated containers. If not
// generate one and update the response.
updateResponseWithNMTokens(
updateAllocateResponse(
dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
if (LOG.isDebugEnabled()) {

View File

@ -24,9 +24,11 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
@ -65,12 +67,14 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* The OpportunisticContainerAllocatorAMService is started instead of the
@ -88,17 +92,20 @@ public class OpportunisticContainerAllocatorAMService
LogFactory.getLog(OpportunisticContainerAllocatorAMService.class);
private final NodeQueueLoadMonitor nodeMonitor;
private final OpportunisticContainerAllocator oppContainerAllocator;
private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
new ConcurrentHashMap<>();
private final int k;
private final long cacheRefreshInterval;
private List<NodeId> cachedNodeIds;
private long lastCacheUpdateTime;
public OpportunisticContainerAllocatorAMService(RMContext rmContext,
YarnScheduler scheduler) {
super(OpportunisticContainerAllocatorAMService.class.getName(),
rmContext, scheduler);
this.oppContainerAllocator = new OpportunisticContainerAllocator(
rmContext.getContainerTokenSecretManager(), 0);
this.k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT);
@ -106,6 +113,8 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext,
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
YarnConfiguration.
NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
this.cacheRefreshInterval = nodeSortInterval;
this.lastCacheUpdateTime = System.currentTimeMillis();
NodeQueueLoadMonitor.LoadComparator comparator =
NodeQueueLoadMonitor.LoadComparator.valueOf(
rmContext.getYarnConfiguration().get(
@ -172,6 +181,27 @@ public Server getServer(YarnRPC rpc, Configuration serverConf,
public RegisterApplicationMasterResponse registerApplicationMaster
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
final ApplicationAttemptId appAttemptId = getAppAttemptId();
SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
if (appAttempt.getOpportunisticContainerContext() == null) {
OpportunisticContainerContext opCtx = new OpportunisticContainerContext();
opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator
.ContainerIdGenerator() {
@Override
public long generateContainerId() {
return appAttempt.getAppSchedulingInfo().getNewContainerId();
}
});
int tokenExpiryInterval = getConfig()
.getInt(YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
YarnConfiguration.
OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT);
opCtx.updateAllocationParams(createMinContainerResource(),
createMaxContainerResource(), createIncrContainerResource(),
tokenExpiryInterval);
appAttempt.setOpportunisticContainerContext(opCtx);
}
return super.registerApplicationMaster(request);
}
@ -185,7 +215,30 @@ public Server getServer(YarnRPC rpc, Configuration serverConf,
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
return super.allocate(request);
final ApplicationAttemptId appAttemptId = getAppAttemptId();
SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
OpportunisticContainerContext oppCtx =
appAttempt.getOpportunisticContainerContext();
oppCtx.updateNodeList(getLeastLoadedNodes());
List<Container> oppContainers =
oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx,
ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
if (!oppContainers.isEmpty()) {
handleNewContainers(oppContainers, false);
appAttempt.updateNMTokens(oppContainers);
}
// Allocate all guaranteed containers
AllocateResponse allocateResp = super.allocate(request);
oppCtx.updateCompletedContainers(allocateResp);
// Add all opportunistic containers
allocateResp.getAllocatedContainers().addAll(oppContainers);
return allocateResp;
}
@Override
@ -198,39 +251,9 @@ public AllocateResponse allocate(AllocateRequest request) throws
RegisterDistributedSchedulingAMResponse dsResp = recordFactory
.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
dsResp.setRegisterResponse(response);
dsResp.setMinContainerResource(
Resource.newInstance(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
YarnConfiguration.
OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
)
);
dsResp.setMaxContainerResource(
Resource.newInstance(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
YarnConfiguration
.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
)
);
dsResp.setIncrContainerResource(
Resource.newInstance(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
YarnConfiguration.
OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
)
);
dsResp.setMinContainerResource(createMinContainerResource());
dsResp.setMaxContainerResource(createMaxContainerResource());
dsResp.setIncrContainerResource(createIncrContainerResource());
dsResp.setContainerTokenExpiryInterval(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
@ -240,8 +263,7 @@ public AllocateResponse allocate(AllocateRequest request) throws
this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
// Set nodes to be used for scheduling
dsResp.setNodesForScheduling(
this.nodeMonitor.selectLeastLoadedNodes(this.k));
dsResp.setNodesForScheduling(getLeastLoadedNodes());
return dsResp;
}
@ -250,47 +272,30 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
List<Container> distAllocContainers = request.getAllocatedContainers();
for (Container container : distAllocContainers) {
handleNewContainers(distAllocContainers, true);
AllocateResponse response = allocate(request.getAllocateRequest());
DistributedSchedulingAllocateResponse dsResp = recordFactory
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
dsResp.setAllocateResponse(response);
dsResp.setNodesForScheduling(getLeastLoadedNodes());
return dsResp;
}
private void handleNewContainers(List<Container> allocContainers,
boolean isRemotelyAllocated) {
for (Container container : allocContainers) {
// Create RMContainer
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler) rmContext.getScheduler())
.getCurrentAttemptForContainer(container.getId());
RMContainer rmContainer = new RMContainerImpl(container,
appAttempt.getApplicationAttemptId(), container.getNodeId(),
appAttempt.getUser(), rmContext, true);
appAttempt.getUser(), rmContext, isRemotelyAllocated);
appAttempt.addRMContainer(container.getId(), rmContainer);
rmContainer.handle(
new RMContainerEvent(container.getId(),
RMContainerEventType.LAUNCHED));
}
AllocateResponse response = allocate(request.getAllocateRequest());
DistributedSchedulingAllocateResponse dsResp = recordFactory
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
dsResp.setAllocateResponse(response);
dsResp.setNodesForScheduling(
this.nodeMonitor.selectLeastLoadedNodes(this.k));
return dsResp;
}
private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
String rackName, NodeId nodeId) {
if (rackName != null) {
mapping.putIfAbsent(rackName, new HashSet<NodeId>());
Set<NodeId> nodeIds = mapping.get(rackName);
synchronized (nodeIds) {
nodeIds.add(nodeId);
}
}
}
private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
String rackName, NodeId nodeId) {
if (rackName != null) {
Set<NodeId> nodeIds = mapping.get(rackName);
synchronized (nodeIds) {
nodeIds.remove(nodeId);
}
}
}
@Override
@ -303,10 +308,6 @@ public void handle(SchedulerEvent event) {
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
nodeAddedEvent.getAddedRMNode());
addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
nodeAddedEvent.getAddedRMNode().getNodeID());
addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
nodeAddedEvent.getAddedRMNode().getNodeID());
break;
case NODE_REMOVED:
if (!(event instanceof NodeRemovedSchedulerEvent)) {
@ -315,12 +316,6 @@ public void handle(SchedulerEvent event) {
NodeRemovedSchedulerEvent nodeRemovedEvent =
(NodeRemovedSchedulerEvent) event;
nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
removeFromMapping(rackToNode,
nodeRemovedEvent.getRemovedRMNode().getRackName(),
nodeRemovedEvent.getRemovedRMNode().getNodeID());
removeFromMapping(hostToNode,
nodeRemovedEvent.getRemovedRMNode().getHostName(),
nodeRemovedEvent.getRemovedRMNode().getNodeID());
break;
case NODE_UPDATE:
if (!(event instanceof NodeUpdateSchedulerEvent)) {
@ -364,4 +359,58 @@ public void handle(SchedulerEvent event) {
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
return nodeMonitor.getThresholdCalculator();
}
private Resource createIncrContainerResource() {
return Resource.newInstance(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
YarnConfiguration.
OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
);
}
private synchronized List<NodeId> getLeastLoadedNodes() {
long currTime = System.currentTimeMillis();
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
|| cachedNodeIds == null) {
cachedNodeIds = this.nodeMonitor.selectLeastLoadedNodes(this.k);
lastCacheUpdateTime = currTime;
}
return cachedNodeIds;
}
private Resource createMaxContainerResource() {
return Resource.newInstance(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
YarnConfiguration
.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
);
}
private Resource createMinContainerResource() {
return Resource.newInstance(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
YarnConfiguration.
OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
);
}
private static ApplicationAttemptId getAppAttemptId() throws YarnException {
AMRMTokenIdentifier amrmTokenIdentifier =
YarnServerSecurityUtils.authorizeRequest();
ApplicationAttemptId applicationAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
return applicationAttemptId;
}
}

View File

@ -1184,6 +1184,13 @@ protected ApplicationMasterService createApplicationMasterService() {
Configuration config = this.rmContext.getYarnConfiguration();
if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
|| YarnConfiguration.isDistSchedulingEnabled(config)) {
if (YarnConfiguration.isDistSchedulingEnabled(config) &&
!YarnConfiguration
.isOpportunisticContainerAllocationEnabled(config)) {
throw new YarnRuntimeException(
"Invalid parameters: opportunistic container allocation has to " +
"be enabled when distributed scheduling is enabled.");
}
OpportunisticContainerAllocatorAMService
oppContainerAllocatingAMService =
new OpportunisticContainerAllocatorAMService(this.rmContext,
@ -1193,9 +1200,8 @@ protected ApplicationMasterService createApplicationMasterService() {
OpportunisticContainerAllocatorAMService.class.getName());
// Add an event dispatcher for the
// OpportunisticContainerAllocatorAMService to handle node
// updates/additions and removals.
// Since the SchedulerEvent is currently a super set of theses,
// we register interest for it..
// additions, updates and removals. Since the SchedulerEvent is currently
// a super set of theses, we register interest for it.
addService(oppContainerAllocEventDispatcher);
rmDispatcher.register(SchedulerEventType.class,
oppContainerAllocEventDispatcher);

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -68,6 +69,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -114,6 +117,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private boolean isAttemptRecovering;
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
/** Resource usage of opportunistic containers. */
protected ResourceUsage attemptOpportunisticResourceUsage =
new ResourceUsage();
/** Scheduled by a remote scheduler. */
protected ResourceUsage attemptResourceUsageAllocatedRemotely =
new ResourceUsage();
@ -132,6 +138,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// by NM should not be recovered.
private Set<ContainerId> pendingRelease = null;
private OpportunisticContainerContext oppContainerContext;
/**
* Count how many times the application has been given an opportunity to
* schedule a task at each priority. Each time the scheduler asks the
@ -199,7 +207,17 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
readLock = lock.readLock();
writeLock = lock.writeLock();
}
public void setOpportunisticContainerContext(
OpportunisticContainerContext oppContext) {
this.oppContainerContext = oppContext;
}
public OpportunisticContainerContext
getOpportunisticContainerContext() {
return this.oppContainerContext;
}
/**
* Get the live containers of the application.
* @return live containers of the application
@ -331,6 +349,10 @@ public void addRMContainer(
try {
writeLock.lock();
liveContainers.put(id, rmContainer);
if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
this.attemptOpportunisticResourceUsage.incUsed(
rmContainer.getAllocatedResource());
}
if (rmContainer.isRemotelyAllocated()) {
this.attemptResourceUsageAllocatedRemotely.incUsed(
rmContainer.getAllocatedResource());
@ -344,9 +366,15 @@ public void removeRMContainer(ContainerId containerId) {
try {
writeLock.lock();
RMContainer rmContainer = liveContainers.remove(containerId);
if (rmContainer != null && rmContainer.isRemotelyAllocated()) {
this.attemptResourceUsageAllocatedRemotely.decUsed(
rmContainer.getAllocatedResource());
if (rmContainer != null) {
if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
this.attemptOpportunisticResourceUsage
.decUsed(rmContainer.getAllocatedResource());
}
if (rmContainer.isRemotelyAllocated()) {
this.attemptResourceUsageAllocatedRemotely
.decUsed(rmContainer.getAllocatedResource());
}
}
} finally {
writeLock.unlock();
@ -612,12 +640,7 @@ private Container updateContainerAndNMToken(RMContainer rmContainer,
container.getPriority(), rmContainer.getCreationTime(),
this.logAggregationContext, rmContainer.getNodeLabelExpression(),
containerType));
NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container);
if (nmToken != null) {
updatedNMTokens.add(nmToken);
}
updateNMToken(container);
} catch (IllegalArgumentException e) {
// DNS might be down, skip returning this container.
LOG.error("Error trying to assign container token and NM token to"
@ -635,6 +658,21 @@ private Container updateContainerAndNMToken(RMContainer rmContainer,
return container;
}
public void updateNMTokens(Collection<Container> containers) {
for (Container container : containers) {
updateNMToken(container);
}
}
private void updateNMToken(Container container) {
NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container);
if (nmToken != null) {
updatedNMTokens.add(nmToken);
}
}
// Create container token and update NMToken altogether, if either of them fails for
// some reason like DNS unavailable, do not return this container and keep it
// in the newlyAllocatedContainers waiting to be refetched.

View File

@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@ -37,6 +38,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* The NodeQueueLoadMonitor keeps track of load metrics (such as queue length
@ -103,16 +105,23 @@ public ClusterNode updateTimestamp() {
new ConcurrentHashMap<>();
private final LoadComparator comparator;
private QueueLimitCalculator thresholdCalculator;
private ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock clusterNodesLock =
new ReentrantReadWriteLock();
Runnable computeTask = new Runnable() {
@Override
public void run() {
synchronized (sortedNodes) {
ReentrantReadWriteLock.WriteLock writeLock = sortedNodesLock.writeLock();
writeLock.lock();
try {
sortedNodes.clear();
sortedNodes.addAll(sortNodes());
if (thresholdCalculator != null) {
thresholdCalculator.update();
}
} finally {
writeLock.unlock();
}
}
};
@ -166,9 +175,16 @@ public void addNode(List<NMContainerStatus> containerStatuses, RMNode
@Override
public void removeNode(RMNode removedRMNode) {
LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
synchronized (this.clusterNodes) {
if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
this.clusterNodes.remove(removedRMNode.getNodeID());
ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
writeLock.lock();
ClusterNode node;
try {
node = this.clusterNodes.remove(removedRMNode.getNodeID());
} finally {
writeLock.unlock();
}
if (LOG.isDebugEnabled()) {
if (node != null) {
LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
} else {
LOG.debug("Node not in list!");
@ -186,7 +202,9 @@ public void updateNode(RMNode rmNode) {
int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
// Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
// UNLESS comparator is based on queue length.
synchronized (this.clusterNodes) {
ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
writeLock.lock();
try {
ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
if (currentNode == null) {
if (estimatedQueueWaitTime != -1
@ -222,6 +240,8 @@ public void updateNode(RMNode rmNode) {
"wait queue length [" + currentNode.queueLength + "]");
}
}
} finally {
writeLock.unlock();
}
}
@ -245,15 +265,22 @@ public List<NodeId> selectNodes() {
* @return ordered list of nodes
*/
public List<NodeId> selectLeastLoadedNodes(int k) {
synchronized (this.sortedNodes) {
return ((k < this.sortedNodes.size()) && (k >= 0)) ?
ReentrantReadWriteLock.ReadLock readLock = sortedNodesLock.readLock();
readLock.lock();
try {
List<NodeId> retVal = ((k < this.sortedNodes.size()) && (k >= 0)) ?
new ArrayList<>(this.sortedNodes).subList(0, k) :
new ArrayList<>(this.sortedNodes);
return Collections.unmodifiableList(retVal);
} finally {
readLock.unlock();
}
}
private List<NodeId> sortNodes() {
synchronized (this.clusterNodes) {
ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock();
readLock.lock();
try {
ArrayList aList = new ArrayList<>(this.clusterNodes.values());
List<NodeId> retList = new ArrayList<>();
Object[] nodes = aList.toArray();
@ -267,6 +294,8 @@ private List<NodeId> sortNodes() {
retList.add(((ClusterNode)nodes[j]).nodeId);
}
return retList;
} finally {
readLock.unlock();
}
}

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.Assert;
import org.junit.Test;
@ -97,6 +98,11 @@ public AMLivelinessMonitor getAMLivelinessMonitor() {
public Configuration getYarnConfiguration() {
return new YarnConfiguration();
}
@Override
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return new RMContainerTokenSecretManager(conf);
}
};
Container c = factory.newRecordInstance(Container.class);
c.setExecutionType(ExecutionType.OPPORTUNISTIC);
@ -117,8 +123,8 @@ public Configuration getYarnConfiguration() {
Server server = service.getServer(rpc, conf, addr, null);
server.start();
// Verify that the DistrubutedSchedulingService can handle vanilla
// ApplicationMasterProtocol clients
// Verify that the OpportunisticContainerAllocatorAMSercvice can handle
// vanilla ApplicationMasterProtocol clients
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
ProtobufRpcEngine.class);
ApplicationMasterProtocolPB ampProxy =