diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
new file mode 100644
index 0000000000..b9b4b02a14
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMTokenCache;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Class that tests the allocation of OPPORTUNISTIC containers through the
+ * centralized ResourceManager.
+ */
+public class TestOpportunisticContainerAllocation {
+ private static Configuration conf = null;
+ private static MiniYARNCluster yarnCluster = null;
+ private static YarnClient yarnClient = null;
+ private static List nodeReports = null;
+ private static ApplicationAttemptId attemptId = null;
+ private static int nodeCount = 3;
+
+ private static final int ROLLING_INTERVAL_SEC = 13;
+ private static final long AM_EXPIRE_MS = 4000;
+
+ private static Resource capability;
+ private static Priority priority;
+ private static Priority priority2;
+ private static String node;
+ private static String rack;
+ private static String[] nodes;
+ private static String[] racks;
+ private final static int DEFAULT_ITERATION = 3;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ // start minicluster
+ conf = new YarnConfiguration();
+ conf.setLong(
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
+ ROLLING_INTERVAL_SEC);
+ conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS);
+ conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
+ // set the minimum allocation so that resource decrease can go under 1024
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
+ conf.setBoolean(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+ yarnCluster =
+ new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+
+ // start rm client
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+
+ // get node info
+ nodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
+
+ priority = Priority.newInstance(1);
+ priority2 = Priority.newInstance(2);
+ capability = Resource.newInstance(1024, 1);
+
+ node = nodeReports.get(0).getNodeId().getHost();
+ rack = nodeReports.get(0).getRackName();
+ nodes = new String[]{node};
+ racks = new String[]{rack};
+ }
+
+ @Before
+ public void startApp() throws Exception {
+ // submit new app
+ ApplicationSubmissionContext appContext =
+ yarnClient.createApplication().getApplicationSubmissionContext();
+ ApplicationId appId = appContext.getApplicationId();
+ // set the application name
+ appContext.setApplicationName("Test");
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(0);
+ appContext.setPriority(pri);
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue("default");
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
+ Collections.emptyMap(),
+ new HashMap(), Arrays.asList("sleep", "100"),
+ new HashMap(), null,
+ new HashMap());
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(Resource.newInstance(1024, 1));
+ // Create the request to send to the applications manager
+ SubmitApplicationRequest appRequest =
+ Records.newRecord(SubmitApplicationRequest.class);
+ appRequest.setApplicationSubmissionContext(appContext);
+ // Submit the application to the applications manager
+ yarnClient.submitApplication(appContext);
+
+ // wait for app to start
+ RMAppAttempt appAttempt = null;
+ while (true) {
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ if (appReport.getYarnApplicationState() ==
+ YarnApplicationState.ACCEPTED) {
+ attemptId = appReport.getCurrentApplicationAttemptId();
+ appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+ .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+ while (true) {
+ if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+ break;
+ }
+ }
+ break;
+ }
+ }
+ // Just dig into the ResourceManager and get the AMRMToken just for the sake
+ // of testing.
+ UserGroupInformation.setLoginUser(UserGroupInformation
+ .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+
+ // emulate RM setup of AMRM token in credentials by adding the token
+ // *before* setting the token service
+ UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+ appAttempt.getAMRMToken()
+ .setService(ClientRMProxy.getAMRMTokenService(conf));
+ }
+
+ @After
+ public void cancelApp() throws YarnException, IOException {
+ yarnClient.killApplication(attemptId.getApplicationId());
+ attemptId = null;
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (yarnClient != null &&
+ yarnClient.getServiceState() == Service.STATE.STARTED) {
+ yarnClient.stop();
+ }
+ if (yarnCluster != null &&
+ yarnCluster.getServiceState() == Service.STATE.STARTED) {
+ yarnCluster.stop();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testAMRMClient() throws YarnException, IOException {
+ AMRMClient amClient = null;
+ try {
+ // start am rm client
+ amClient = AMRMClient.createAMRMClient();
+
+ //setting an instance NMTokenCache
+ amClient.setNMTokenCache(new NMTokenCache());
+ //asserting we are not using the singleton instance cache
+ Assert.assertNotSame(NMTokenCache.getSingleton(),
+ amClient.getNMTokenCache());
+
+ amClient.init(conf);
+ amClient.start();
+
+ amClient.registerApplicationMaster("Host", 10000, "");
+
+ testAllocation((AMRMClientImpl)amClient);
+
+ amClient
+ .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+ null);
+
+ } finally {
+ if (amClient != null &&
+ amClient.getServiceState() == Service.STATE.STARTED) {
+ amClient.stop();
+ }
+ }
+ }
+
+ private void testAllocation(
+ final AMRMClientImpl amClient)
+ throws YarnException, IOException {
+ // setup container request
+
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ amClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+
+ int containersRequestedNode = amClient.getTable(0).get(priority,
+ node, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedRack = amClient.getTable(0).get(priority,
+ rack, ExecutionType.GUARANTEED, capability).remoteRequest
+ .getNumContainers();
+ int containersRequestedAny = amClient.getTable(0).get(priority,
+ ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+ .remoteRequest.getNumContainers();
+ int oppContainersRequestedAny =
+ amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+ ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+ .getNumContainers();
+
+ assertEquals(2, containersRequestedNode);
+ assertEquals(2, containersRequestedRack);
+ assertEquals(2, containersRequestedAny);
+ assertEquals(1, oppContainersRequestedAny);
+
+ assertEquals(4, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ // RM should allocate container within 2 calls to allocate()
+ int allocatedContainerCount = 0;
+ int allocatedOpportContainerCount = 0;
+ int iterationsLeft = 10;
+ Set releases = new TreeSet<>();
+
+ amClient.getNMTokenCache().clearCache();
+ Assert.assertEquals(0,
+ amClient.getNMTokenCache().numberOfTokensInCache());
+ HashMap receivedNMTokens = new HashMap<>();
+
+ while (allocatedContainerCount <
+ containersRequestedAny + oppContainersRequestedAny
+ && iterationsLeft-- > 0) {
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+
+ allocatedContainerCount += allocResponse.getAllocatedContainers()
+ .size();
+ for (Container container : allocResponse.getAllocatedContainers()) {
+ if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ allocatedOpportContainerCount++;
+ }
+ ContainerId rejectContainerId = container.getId();
+ releases.add(rejectContainerId);
+ }
+
+ for (NMToken token : allocResponse.getNMTokens()) {
+ String nodeID = token.getNodeId().toString();
+ receivedNMTokens.put(nodeID, token.getToken());
+ }
+
+ if (allocatedContainerCount < containersRequestedAny) {
+ // sleep to let NM's heartbeat to RM and trigger allocations
+ sleep(100);
+ }
+ }
+
+ assertEquals(allocatedContainerCount,
+ containersRequestedAny + oppContainersRequestedAny);
+ assertEquals(allocatedOpportContainerCount, oppContainersRequestedAny);
+ for (ContainerId rejectContainerId : releases) {
+ amClient.releaseAssignedContainer(rejectContainerId);
+ }
+ assertEquals(3, amClient.release.size());
+ assertEquals(0, amClient.ask.size());
+
+ // need to tell the AMRMClient that we don't need these resources anymore
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
+ amClient.removeContainerRequest(
+ new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0,
+ true, null,
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ assertEquals(4, amClient.ask.size());
+
+ iterationsLeft = 3;
+ // do a few iterations to ensure RM is not going to send new containers
+ while (iterationsLeft-- > 0) {
+ // inform RM of rejection
+ AllocateResponse allocResponse = amClient.allocate(0.1f);
+ // RM did not send new containers because AM does not need any
+ assertEquals(0, allocResponse.getAllocatedContainers().size());
+ if (allocResponse.getCompletedContainersStatuses().size() > 0) {
+ for (ContainerStatus cStatus : allocResponse
+ .getCompletedContainersStatuses()) {
+ if (releases.contains(cStatus.getContainerId())) {
+ assertEquals(cStatus.getState(), ContainerState.COMPLETE);
+ assertEquals(-100, cStatus.getExitStatus());
+ releases.remove(cStatus.getContainerId());
+ }
+ }
+ }
+ if (iterationsLeft > 0) {
+ // sleep to make sure NM's heartbeat
+ sleep(100);
+ }
+ }
+ assertEquals(0, amClient.ask.size());
+ assertEquals(0, amClient.release.size());
+ }
+
+ private void sleep(int sleepTime) {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 9b2fd3857b..9c158e95b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -37,6 +37,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -144,15 +145,6 @@ public void resetContainerIdCounter(long containerIdStart) {
this.containerIdCounter.set(containerIdStart);
}
- /**
- * Sets the underlying Atomic Long. To be used when implementation needs to
- * share the underlying AtomicLong of an existing counter.
- * @param counter AtomicLong
- */
- public void setContainerIdCounter(AtomicLong counter) {
- this.containerIdCounter = counter;
- }
-
/**
* Generates a new long value. Default implementation increments the
* underlying AtomicLong. Sub classes are encouraged to over-ride this
@@ -213,6 +205,10 @@ public List allocateContainers(
PartitionedResourceRequests partitionedAsks =
partitionAskList(request.getAskList());
+ if (partitionedAsks.getOpportunistic().isEmpty()) {
+ return Collections.emptyList();
+ }
+
List releasedContainers = request.getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
@@ -236,8 +232,8 @@ public List allocateContainers(
appContext.getOutstandingOpReqs().descendingKeySet()) {
// Allocated containers :
// Key = Requested Capability,
- // Value = List of Containers of given Cap (The actual container size
- // might be different than what is requested.. which is why
+ // Value = List of Containers of given cap (the actual container size
+ // might be different than what is requested, which is why
// we need the requested capability (key) to match against
// the outstanding reqs)
Map> allocated = allocate(rmIdentifier,
@@ -290,6 +286,10 @@ private void allocateContainersInternal(long rmIdentifier,
}
nodesForScheduling.add(nodeEntry.getValue());
}
+ if (nodesForScheduling.isEmpty()) {
+ LOG.warn("No nodes available for allocating opportunistic containers.");
+ return;
+ }
int numAllocated = 0;
int nextNodeToSchedule = 0;
for (int numCont = 0; numCont < toAllocate; numCont++) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
index 1b701eaaa3..6fcddf842e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerContext.java
@@ -18,9 +18,11 @@
package org.apache.hadoop.yarn.server.scheduler;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -28,9 +30,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -56,15 +60,13 @@ public class OpportunisticContainerContext {
private ContainerIdGenerator containerIdGenerator =
new ContainerIdGenerator();
- private Map nodeMap = new LinkedHashMap<>();
+ private volatile List nodeList = new LinkedList<>();
+ private final Map nodeMap = new LinkedHashMap<>();
- // Mapping of NodeId to NodeTokens. Populated either from RM response or
- // generated locally if required.
- private Map nodeTokens = new HashMap<>();
private final Set blacklist = new HashSet<>();
// This maintains a map of outstanding OPPORTUNISTIC Reqs. Key-ed by Priority,
- // Resource Name (Host/rack/any) and capability. This mapping is required
+ // Resource Name (host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequest (ask).
private final TreeMap>
@@ -74,7 +76,7 @@ public Set getContainersAllocated() {
return containersAllocated;
}
- public OpportunisticContainerAllocator.AllocationParams getAppParams() {
+ public AllocationParams getAppParams() {
return appParams;
}
@@ -88,11 +90,29 @@ public void setContainerIdGenerator(
}
public Map getNodeMap() {
- return nodeMap;
+ return Collections.unmodifiableMap(nodeMap);
}
- public Map getNodeTokens() {
- return nodeTokens;
+ public synchronized void updateNodeList(List newNodeList) {
+ // This is an optimization for centralized placement. The
+ // OppContainerAllocatorAMService has a cached list of nodes which it sets
+ // here. The nodeMap needs to be updated only if the backing node list is
+ // modified.
+ if (newNodeList != nodeList) {
+ nodeList = newNodeList;
+ nodeMap.clear();
+ for (NodeId n : nodeList) {
+ nodeMap.put(n.getHost(), n);
+ }
+ }
+ }
+
+ public void updateAllocationParams(Resource minResource, Resource maxResource,
+ Resource incrResource, int containerTokenExpiryInterval) {
+ appParams.setMinResource(minResource);
+ appParams.setMaxResource(maxResource);
+ appParams.setIncrementResource(incrResource);
+ appParams.setContainerTokenExpiryInterval(containerTokenExpiryInterval);
}
public Set getBlacklist() {
@@ -104,6 +124,15 @@ public Set getBlacklist() {
return outstandingOpReqs;
}
+ public void updateCompletedContainers(AllocateResponse allocateResponse) {
+ for (ContainerStatus cs :
+ allocateResponse.getCompletedContainersStatuses()) {
+ if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ containersAllocated.remove(cs.getContainerId());
+ }
+ }
+ }
+
/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
* (Priority, ResourceName, Capability) and adds to the outstanding
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 7f133340f1..37f67c458a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -336,8 +336,7 @@ protected void serviceInit(Configuration conf) throws Exception {
addService(nodeHealthChecker);
boolean isDistSchedulingEnabled =
- conf.getBoolean(YarnConfiguration.
- OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED,
+ conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED,
YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
this.context = createNMContext(containerTokenSecretManager,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
index efbdfb4e8a..22fc8f6101 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/DefaultRequestInterceptor.java
@@ -152,7 +152,7 @@ public AllocateResponse allocate(final AllocateRequest request)
return ((DistributedSchedulingAMProtocol)rmClient)
.registerApplicationMasterForDistributedScheduling(request);
} else {
- throw new YarnException("Distributed Scheduling is not enabled !!");
+ throw new YarnException("Distributed Scheduling is not enabled.");
}
}
@@ -174,7 +174,7 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
}
return allocateResponse;
} else {
- throw new YarnException("Distributed Scheduling is not enabled !!");
+ throw new YarnException("Distributed Scheduling is not enabled.");
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
index 368858c43d..8a40337003 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistributedScheduler.java
@@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
@@ -32,8 +33,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -48,7 +47,9 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* The DistributedScheduler runs on the NodeManager and is modeled as an
@@ -74,6 +75,9 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
private OpportunisticContainerContext oppContainerContext =
new OpportunisticContainerContext();
+ // Mapping of NodeId to NodeTokens. Populated either from RM response or
+ // generated locally if required.
+ private Map nodeTokens = new HashMap<>();
private ApplicationAttemptId applicationAttemptId;
private OpportunisticContainerAllocator containerAllocator;
private NMTokenSecretManagerInNM nmSecretManager;
@@ -157,17 +161,17 @@ public AllocateResponse allocate(AllocateRequest request) throws
}
/**
- * Check if we already have a NMToken. if Not, generate the Token and
- * add it to the response
+ * Adds all the newly allocated Containers to the allocate Response.
+ * Additionally, in case the NMToken for one of the nodes does not exist, it
+ * generates one and adds it to the response.
*/
- private void updateResponseWithNMTokens(AllocateResponse response,
+ private void updateAllocateResponse(AllocateResponse response,
List nmTokens, List allocatedContainers) {
List newTokens = new ArrayList<>();
if (allocatedContainers.size() > 0) {
response.getAllocatedContainers().addAll(allocatedContainers);
for (Container alloc : allocatedContainers) {
- if (!oppContainerContext.getNodeTokens().containsKey(
- alloc.getNodeId())) {
+ if (!nodeTokens.containsKey(alloc.getNodeId())) {
newTokens.add(nmSecretManager.generateNMToken(appSubmitter, alloc));
}
}
@@ -179,17 +183,14 @@ private void updateResponseWithNMTokens(AllocateResponse response,
private void updateParameters(
RegisterDistributedSchedulingAMResponse registerResponse) {
- oppContainerContext.getAppParams().setMinResource(
- registerResponse.getMinContainerResource());
- oppContainerContext.getAppParams().setMaxResource(
- registerResponse.getMaxContainerResource());
- oppContainerContext.getAppParams().setIncrementResource(
- registerResponse.getIncrContainerResource());
- if (oppContainerContext.getAppParams().getIncrementResource() == null) {
- oppContainerContext.getAppParams().setIncrementResource(
- oppContainerContext.getAppParams().getMinResource());
+ Resource incrementResource = registerResponse.getIncrContainerResource();
+ if (incrementResource == null) {
+ incrementResource = registerResponse.getMinContainerResource();
}
- oppContainerContext.getAppParams().setContainerTokenExpiryInterval(
+ oppContainerContext.updateAllocationParams(
+ registerResponse.getMinContainerResource(),
+ registerResponse.getMaxContainerResource(),
+ incrementResource,
registerResponse.getContainerTokenExpiryInterval());
oppContainerContext.getContainerIdGenerator()
@@ -198,14 +199,7 @@ private void updateParameters(
}
private void setNodeList(List nodeList) {
- oppContainerContext.getNodeMap().clear();
- addToNodeList(nodeList);
- }
-
- private void addToNodeList(List nodes) {
- for (NodeId n : nodes) {
- oppContainerContext.getNodeMap().put(n.getHost(), n);
- }
+ oppContainerContext.updateNodeList(nodeList);
}
@Override
@@ -243,23 +237,14 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
setNodeList(dsResp.getNodesForScheduling());
List nmTokens = dsResp.getAllocateResponse().getNMTokens();
for (NMToken nmToken : nmTokens) {
- oppContainerContext.getNodeTokens().put(nmToken.getNodeId(), nmToken);
+ nodeTokens.put(nmToken.getNodeId(), nmToken);
}
- List completedContainers =
- dsResp.getAllocateResponse().getCompletedContainersStatuses();
-
- // Only account for opportunistic containers
- for (ContainerStatus cs : completedContainers) {
- if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
- oppContainerContext.getContainersAllocated()
- .remove(cs.getContainerId());
- }
- }
+ oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse());
// Check if we have NM tokens for all the allocated containers. If not
// generate one and update the response.
- updateResponseWithNMTokens(
+ updateAllocateResponse(
dsResp.getAllocateResponse(), nmTokens, allocatedContainers);
if (LOG.isDebugEnabled()) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index a473b14d0d..a7c0a507ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -24,9 +24,11 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
@@ -65,12 +67,14 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
/**
* The OpportunisticContainerAllocatorAMService is started instead of the
@@ -88,17 +92,20 @@ public class OpportunisticContainerAllocatorAMService
LogFactory.getLog(OpportunisticContainerAllocatorAMService.class);
private final NodeQueueLoadMonitor nodeMonitor;
+ private final OpportunisticContainerAllocator oppContainerAllocator;
- private final ConcurrentHashMap> rackToNode =
- new ConcurrentHashMap<>();
- private final ConcurrentHashMap> hostToNode =
- new ConcurrentHashMap<>();
private final int k;
+ private final long cacheRefreshInterval;
+ private List cachedNodeIds;
+ private long lastCacheUpdateTime;
+
public OpportunisticContainerAllocatorAMService(RMContext rmContext,
YarnScheduler scheduler) {
super(OpportunisticContainerAllocatorAMService.class.getName(),
rmContext, scheduler);
+ this.oppContainerAllocator = new OpportunisticContainerAllocator(
+ rmContext.getContainerTokenSecretManager(), 0);
this.k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED_DEFAULT);
@@ -106,6 +113,8 @@ public OpportunisticContainerAllocatorAMService(RMContext rmContext,
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
YarnConfiguration.
NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
+ this.cacheRefreshInterval = nodeSortInterval;
+ this.lastCacheUpdateTime = System.currentTimeMillis();
NodeQueueLoadMonitor.LoadComparator comparator =
NodeQueueLoadMonitor.LoadComparator.valueOf(
rmContext.getYarnConfiguration().get(
@@ -172,6 +181,27 @@ public Server getServer(YarnRPC rpc, Configuration serverConf,
public RegisterApplicationMasterResponse registerApplicationMaster
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
+ final ApplicationAttemptId appAttemptId = getAppAttemptId();
+ SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
+ rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
+ if (appAttempt.getOpportunisticContainerContext() == null) {
+ OpportunisticContainerContext opCtx = new OpportunisticContainerContext();
+ opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator
+ .ContainerIdGenerator() {
+ @Override
+ public long generateContainerId() {
+ return appAttempt.getAppSchedulingInfo().getNewContainerId();
+ }
+ });
+ int tokenExpiryInterval = getConfig()
+ .getInt(YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
+ YarnConfiguration.
+ OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT);
+ opCtx.updateAllocationParams(createMinContainerResource(),
+ createMaxContainerResource(), createIncrContainerResource(),
+ tokenExpiryInterval);
+ appAttempt.setOpportunisticContainerContext(opCtx);
+ }
return super.registerApplicationMaster(request);
}
@@ -185,7 +215,30 @@ public Server getServer(YarnRPC rpc, Configuration serverConf,
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
- return super.allocate(request);
+
+ final ApplicationAttemptId appAttemptId = getAppAttemptId();
+ SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
+ rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
+ OpportunisticContainerContext oppCtx =
+ appAttempt.getOpportunisticContainerContext();
+ oppCtx.updateNodeList(getLeastLoadedNodes());
+ List oppContainers =
+ oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx,
+ ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
+
+ if (!oppContainers.isEmpty()) {
+ handleNewContainers(oppContainers, false);
+ appAttempt.updateNMTokens(oppContainers);
+ }
+
+ // Allocate all guaranteed containers
+ AllocateResponse allocateResp = super.allocate(request);
+
+ oppCtx.updateCompletedContainers(allocateResp);
+
+ // Add all opportunistic containers
+ allocateResp.getAllocatedContainers().addAll(oppContainers);
+ return allocateResp;
}
@Override
@@ -198,39 +251,9 @@ public AllocateResponse allocate(AllocateRequest request) throws
RegisterDistributedSchedulingAMResponse dsResp = recordFactory
.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
dsResp.setRegisterResponse(response);
- dsResp.setMinContainerResource(
- Resource.newInstance(
- getConfig().getInt(
- YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
- YarnConfiguration.
- OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
- getConfig().getInt(
- YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
- YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
- )
- );
- dsResp.setMaxContainerResource(
- Resource.newInstance(
- getConfig().getInt(
- YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
- YarnConfiguration
- .OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
- getConfig().getInt(
- YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
- YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
- )
- );
- dsResp.setIncrContainerResource(
- Resource.newInstance(
- getConfig().getInt(
- YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
- YarnConfiguration.
- OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
- getConfig().getInt(
- YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
- YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
- )
- );
+ dsResp.setMinContainerResource(createMinContainerResource());
+ dsResp.setMaxContainerResource(createMaxContainerResource());
+ dsResp.setIncrContainerResource(createIncrContainerResource());
dsResp.setContainerTokenExpiryInterval(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
@@ -240,8 +263,7 @@ public AllocateResponse allocate(AllocateRequest request) throws
this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
// Set nodes to be used for scheduling
- dsResp.setNodesForScheduling(
- this.nodeMonitor.selectLeastLoadedNodes(this.k));
+ dsResp.setNodesForScheduling(getLeastLoadedNodes());
return dsResp;
}
@@ -250,47 +272,30 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
List distAllocContainers = request.getAllocatedContainers();
- for (Container container : distAllocContainers) {
+ handleNewContainers(distAllocContainers, true);
+ AllocateResponse response = allocate(request.getAllocateRequest());
+ DistributedSchedulingAllocateResponse dsResp = recordFactory
+ .newRecordInstance(DistributedSchedulingAllocateResponse.class);
+ dsResp.setAllocateResponse(response);
+ dsResp.setNodesForScheduling(getLeastLoadedNodes());
+ return dsResp;
+ }
+
+ private void handleNewContainers(List allocContainers,
+ boolean isRemotelyAllocated) {
+ for (Container container : allocContainers) {
// Create RMContainer
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler) rmContext.getScheduler())
.getCurrentAttemptForContainer(container.getId());
RMContainer rmContainer = new RMContainerImpl(container,
appAttempt.getApplicationAttemptId(), container.getNodeId(),
- appAttempt.getUser(), rmContext, true);
+ appAttempt.getUser(), rmContext, isRemotelyAllocated);
appAttempt.addRMContainer(container.getId(), rmContainer);
rmContainer.handle(
new RMContainerEvent(container.getId(),
RMContainerEventType.LAUNCHED));
}
- AllocateResponse response = allocate(request.getAllocateRequest());
- DistributedSchedulingAllocateResponse dsResp = recordFactory
- .newRecordInstance(DistributedSchedulingAllocateResponse.class);
- dsResp.setAllocateResponse(response);
- dsResp.setNodesForScheduling(
- this.nodeMonitor.selectLeastLoadedNodes(this.k));
- return dsResp;
- }
-
- private void addToMapping(ConcurrentHashMap> mapping,
- String rackName, NodeId nodeId) {
- if (rackName != null) {
- mapping.putIfAbsent(rackName, new HashSet());
- Set nodeIds = mapping.get(rackName);
- synchronized (nodeIds) {
- nodeIds.add(nodeId);
- }
- }
- }
-
- private void removeFromMapping(ConcurrentHashMap> mapping,
- String rackName, NodeId nodeId) {
- if (rackName != null) {
- Set nodeIds = mapping.get(rackName);
- synchronized (nodeIds) {
- nodeIds.remove(nodeId);
- }
- }
}
@Override
@@ -303,10 +308,6 @@ public void handle(SchedulerEvent event) {
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
nodeAddedEvent.getAddedRMNode());
- addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
- nodeAddedEvent.getAddedRMNode().getNodeID());
- addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
- nodeAddedEvent.getAddedRMNode().getNodeID());
break;
case NODE_REMOVED:
if (!(event instanceof NodeRemovedSchedulerEvent)) {
@@ -315,12 +316,6 @@ public void handle(SchedulerEvent event) {
NodeRemovedSchedulerEvent nodeRemovedEvent =
(NodeRemovedSchedulerEvent) event;
nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
- removeFromMapping(rackToNode,
- nodeRemovedEvent.getRemovedRMNode().getRackName(),
- nodeRemovedEvent.getRemovedRMNode().getNodeID());
- removeFromMapping(hostToNode,
- nodeRemovedEvent.getRemovedRMNode().getHostName(),
- nodeRemovedEvent.getRemovedRMNode().getNodeID());
break;
case NODE_UPDATE:
if (!(event instanceof NodeUpdateSchedulerEvent)) {
@@ -364,4 +359,58 @@ public void handle(SchedulerEvent event) {
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
return nodeMonitor.getThresholdCalculator();
}
+
+ private Resource createIncrContainerResource() {
+ return Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
+ YarnConfiguration.
+ OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
+ );
+ }
+
+ private synchronized List getLeastLoadedNodes() {
+ long currTime = System.currentTimeMillis();
+ if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
+ || cachedNodeIds == null) {
+ cachedNodeIds = this.nodeMonitor.selectLeastLoadedNodes(this.k);
+ lastCacheUpdateTime = currTime;
+ }
+ return cachedNodeIds;
+ }
+
+ private Resource createMaxContainerResource() {
+ return Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
+ YarnConfiguration
+ .OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
+ );
+ }
+
+ private Resource createMinContainerResource() {
+ return Resource.newInstance(
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
+ YarnConfiguration.
+ OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
+ getConfig().getInt(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
+ YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
+ );
+ }
+
+ private static ApplicationAttemptId getAppAttemptId() throws YarnException {
+ AMRMTokenIdentifier amrmTokenIdentifier =
+ YarnServerSecurityUtils.authorizeRequest();
+ ApplicationAttemptId applicationAttemptId =
+ amrmTokenIdentifier.getApplicationAttemptId();
+ return applicationAttemptId;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 5e9bece33c..d2d706d05a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1184,6 +1184,13 @@ protected ApplicationMasterService createApplicationMasterService() {
Configuration config = this.rmContext.getYarnConfiguration();
if (YarnConfiguration.isOpportunisticContainerAllocationEnabled(config)
|| YarnConfiguration.isDistSchedulingEnabled(config)) {
+ if (YarnConfiguration.isDistSchedulingEnabled(config) &&
+ !YarnConfiguration
+ .isOpportunisticContainerAllocationEnabled(config)) {
+ throw new YarnRuntimeException(
+ "Invalid parameters: opportunistic container allocation has to " +
+ "be enabled when distributed scheduling is enabled.");
+ }
OpportunisticContainerAllocatorAMService
oppContainerAllocatingAMService =
new OpportunisticContainerAllocatorAMService(this.rmContext,
@@ -1193,9 +1200,8 @@ protected ApplicationMasterService createApplicationMasterService() {
OpportunisticContainerAllocatorAMService.class.getName());
// Add an event dispatcher for the
// OpportunisticContainerAllocatorAMService to handle node
- // updates/additions and removals.
- // Since the SchedulerEvent is currently a super set of theses,
- // we register interest for it..
+ // additions, updates and removals. Since the SchedulerEvent is currently
+ // a super set of theses, we register interest for it.
addService(oppContainerAllocEventDispatcher);
rmDispatcher.register(SchedulerEventType.class,
oppContainerAllocEventDispatcher);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index adc3a97c77..9675fac5ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -68,6 +69,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
+
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -114,6 +117,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private boolean isAttemptRecovering;
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
+ /** Resource usage of opportunistic containers. */
+ protected ResourceUsage attemptOpportunisticResourceUsage =
+ new ResourceUsage();
/** Scheduled by a remote scheduler. */
protected ResourceUsage attemptResourceUsageAllocatedRemotely =
new ResourceUsage();
@@ -132,6 +138,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// by NM should not be recovered.
private Set pendingRelease = null;
+ private OpportunisticContainerContext oppContainerContext;
+
/**
* Count how many times the application has been given an opportunity to
* schedule a task at each priority. Each time the scheduler asks the
@@ -199,7 +207,17 @@ public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
readLock = lock.readLock();
writeLock = lock.writeLock();
}
-
+
+ public void setOpportunisticContainerContext(
+ OpportunisticContainerContext oppContext) {
+ this.oppContainerContext = oppContext;
+ }
+
+ public OpportunisticContainerContext
+ getOpportunisticContainerContext() {
+ return this.oppContainerContext;
+ }
+
/**
* Get the live containers of the application.
* @return live containers of the application
@@ -331,6 +349,10 @@ public void addRMContainer(
try {
writeLock.lock();
liveContainers.put(id, rmContainer);
+ if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ this.attemptOpportunisticResourceUsage.incUsed(
+ rmContainer.getAllocatedResource());
+ }
if (rmContainer.isRemotelyAllocated()) {
this.attemptResourceUsageAllocatedRemotely.incUsed(
rmContainer.getAllocatedResource());
@@ -344,9 +366,15 @@ public void removeRMContainer(ContainerId containerId) {
try {
writeLock.lock();
RMContainer rmContainer = liveContainers.remove(containerId);
- if (rmContainer != null && rmContainer.isRemotelyAllocated()) {
- this.attemptResourceUsageAllocatedRemotely.decUsed(
- rmContainer.getAllocatedResource());
+ if (rmContainer != null) {
+ if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ this.attemptOpportunisticResourceUsage
+ .decUsed(rmContainer.getAllocatedResource());
+ }
+ if (rmContainer.isRemotelyAllocated()) {
+ this.attemptResourceUsageAllocatedRemotely
+ .decUsed(rmContainer.getAllocatedResource());
+ }
}
} finally {
writeLock.unlock();
@@ -612,12 +640,7 @@ private Container updateContainerAndNMToken(RMContainer rmContainer,
container.getPriority(), rmContainer.getCreationTime(),
this.logAggregationContext, rmContainer.getNodeLabelExpression(),
containerType));
- NMToken nmToken =
- rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
- getApplicationAttemptId(), container);
- if (nmToken != null) {
- updatedNMTokens.add(nmToken);
- }
+ updateNMToken(container);
} catch (IllegalArgumentException e) {
// DNS might be down, skip returning this container.
LOG.error("Error trying to assign container token and NM token to"
@@ -635,6 +658,21 @@ private Container updateContainerAndNMToken(RMContainer rmContainer,
return container;
}
+ public void updateNMTokens(Collection containers) {
+ for (Container container : containers) {
+ updateNMToken(container);
+ }
+ }
+
+ private void updateNMToken(Container container) {
+ NMToken nmToken =
+ rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
+ getApplicationAttemptId(), container);
+ if (nmToken != null) {
+ updatedNMTokens.add(nmToken);
+ }
+ }
+
// Create container token and update NMToken altogether, if either of them fails for
// some reason like DNS unavailable, do not return this container and keep it
// in the newlyAllocatedContainers waiting to be refetched.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index 017a256e04..b80a17cdf7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -37,6 +38,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* The NodeQueueLoadMonitor keeps track of load metrics (such as queue length
@@ -103,16 +105,23 @@ public ClusterNode updateTimestamp() {
new ConcurrentHashMap<>();
private final LoadComparator comparator;
private QueueLimitCalculator thresholdCalculator;
+ private ReentrantReadWriteLock sortedNodesLock = new ReentrantReadWriteLock();
+ private ReentrantReadWriteLock clusterNodesLock =
+ new ReentrantReadWriteLock();
Runnable computeTask = new Runnable() {
@Override
public void run() {
- synchronized (sortedNodes) {
+ ReentrantReadWriteLock.WriteLock writeLock = sortedNodesLock.writeLock();
+ writeLock.lock();
+ try {
sortedNodes.clear();
sortedNodes.addAll(sortNodes());
if (thresholdCalculator != null) {
thresholdCalculator.update();
}
+ } finally {
+ writeLock.unlock();
}
}
};
@@ -166,9 +175,16 @@ public void addNode(List containerStatuses, RMNode
@Override
public void removeNode(RMNode removedRMNode) {
LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
- synchronized (this.clusterNodes) {
- if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
- this.clusterNodes.remove(removedRMNode.getNodeID());
+ ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
+ writeLock.lock();
+ ClusterNode node;
+ try {
+ node = this.clusterNodes.remove(removedRMNode.getNodeID());
+ } finally {
+ writeLock.unlock();
+ }
+ if (LOG.isDebugEnabled()) {
+ if (node != null) {
LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
} else {
LOG.debug("Node not in list!");
@@ -186,7 +202,9 @@ public void updateNode(RMNode rmNode) {
int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
// Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
// UNLESS comparator is based on queue length.
- synchronized (this.clusterNodes) {
+ ReentrantReadWriteLock.WriteLock writeLock = clusterNodesLock.writeLock();
+ writeLock.lock();
+ try {
ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
if (currentNode == null) {
if (estimatedQueueWaitTime != -1
@@ -222,6 +240,8 @@ public void updateNode(RMNode rmNode) {
"wait queue length [" + currentNode.queueLength + "]");
}
}
+ } finally {
+ writeLock.unlock();
}
}
@@ -245,15 +265,22 @@ public List selectNodes() {
* @return ordered list of nodes
*/
public List selectLeastLoadedNodes(int k) {
- synchronized (this.sortedNodes) {
- return ((k < this.sortedNodes.size()) && (k >= 0)) ?
+ ReentrantReadWriteLock.ReadLock readLock = sortedNodesLock.readLock();
+ readLock.lock();
+ try {
+ List retVal = ((k < this.sortedNodes.size()) && (k >= 0)) ?
new ArrayList<>(this.sortedNodes).subList(0, k) :
new ArrayList<>(this.sortedNodes);
+ return Collections.unmodifiableList(retVal);
+ } finally {
+ readLock.unlock();
}
}
private List sortNodes() {
- synchronized (this.clusterNodes) {
+ ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock();
+ readLock.lock();
+ try {
ArrayList aList = new ArrayList<>(this.clusterNodes.values());
List retList = new ArrayList<>();
Object[] nodes = aList.toArray();
@@ -267,6 +294,8 @@ private List sortNodes() {
retList.add(((ClusterNode)nodes[j]).nodeId);
}
return retList;
+ } finally {
+ readLock.unlock();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 07c6b54f1f..207f5ba0e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.Assert;
import org.junit.Test;
@@ -97,6 +98,11 @@ public AMLivelinessMonitor getAMLivelinessMonitor() {
public Configuration getYarnConfiguration() {
return new YarnConfiguration();
}
+
+ @Override
+ public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+ return new RMContainerTokenSecretManager(conf);
+ }
};
Container c = factory.newRecordInstance(Container.class);
c.setExecutionType(ExecutionType.OPPORTUNISTIC);
@@ -117,8 +123,8 @@ public Configuration getYarnConfiguration() {
Server server = service.getServer(rpc, conf, addr, null);
server.start();
- // Verify that the DistrubutedSchedulingService can handle vanilla
- // ApplicationMasterProtocol clients
+ // Verify that the OpportunisticContainerAllocatorAMSercvice can handle
+ // vanilla ApplicationMasterProtocol clients
RPC.setProtocolEngine(conf, ApplicationMasterProtocolPB.class,
ProtobufRpcEngine.class);
ApplicationMasterProtocolPB ampProxy =