YARN-6406. Remove SchedulerRequestKeys when no more pending ResourceRequest. (Arun Suresh via wangda)

This commit is contained in:
Wangda Tan 2017-04-04 14:43:58 -07:00
parent e8071aa249
commit 87e2ef8c98
5 changed files with 266 additions and 79 deletions

View File

@ -25,12 +25,8 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@ -51,9 +47,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -86,8 +81,8 @@ public class AppSchedulingInfo {
private Set<String> requestedPartitions = new HashSet<>();
private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
schedulerKeys = new ConcurrentSkipListMap<>();
private final ConcurrentSkipListSet<SchedulerRequestKey>
schedulerKeys = new ConcurrentSkipListSet<>();
final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
@ -156,29 +151,6 @@ private void clearRequests() {
LOG.info("Application " + applicationId + " requests cleared");
}
private void incrementSchedulerKeyReference(
SchedulerRequestKey schedulerKey) {
Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
if (schedulerKeyCount == null) {
schedulerKeys.put(schedulerKey, 1);
} else {
schedulerKeys.put(schedulerKey, schedulerKeyCount + 1);
}
}
public void decrementSchedulerKeyReference(
SchedulerRequestKey schedulerKey) {
Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
if (schedulerKeyCount != null) {
if (schedulerKeyCount > 1) {
schedulerKeys.put(schedulerKey, schedulerKeyCount - 1);
} else {
schedulerKeys.remove(schedulerKey);
}
}
}
public ContainerUpdateContext getUpdateContext() {
return updateContext;
}
@ -230,6 +202,10 @@ public boolean updateResourceRequests(List<ResourceRequest> requests,
}
}
public void removePlacementSets(SchedulerRequestKey schedulerRequestKey) {
schedulerKeyToPlacementSets.remove(schedulerRequestKey);
}
boolean addToPlacementSets(
boolean recoverPreemptedRequestForAContainer,
Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
@ -268,7 +244,8 @@ private void updatePendingResources(ResourceRequest lastRequest,
(lastRequest != null) ? lastRequest.getNumContainers() : 0;
if (request.getNumContainers() <= 0) {
if (lastRequestContainers >= 0) {
decrementSchedulerKeyReference(schedulerKey);
schedulerKeys.remove(schedulerKey);
schedulerKeyToPlacementSets.remove(schedulerKey);
}
LOG.info("checking for deactivate of application :"
+ this.applicationId);
@ -276,7 +253,7 @@ private void updatePendingResources(ResourceRequest lastRequest,
} else {
// Activate application. Metrics activation is done here.
if (lastRequestContainers <= 0) {
incrementSchedulerKeyReference(schedulerKey);
schedulerKeys.add(schedulerKey);
abstractUsersManager.activateApplication(user, applicationId);
}
}
@ -366,7 +343,7 @@ public boolean getAndResetBlacklistChanged() {
}
public Collection<SchedulerRequestKey> getSchedulerKeys() {
return schedulerKeys.keySet();
return schedulerKeys;
}
/**
@ -389,7 +366,7 @@ public List<ResourceRequest> getAllResourceRequests() {
public PendingAsk getNextPendingAsk() {
try {
readLock.lock();
SchedulerRequestKey firstRequestKey = schedulerKeys.firstKey();
SchedulerRequestKey firstRequestKey = schedulerKeys.first();
return getPendingAsk(firstRequestKey, ResourceRequest.ANY);
} finally {
readLock.unlock();

View File

@ -204,15 +204,17 @@ public int getOutstandingAsksCount(String resourceName) {
private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey,
ResourceRequest offSwitchRequest) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
// Do not remove ANY
offSwitchRequest.setNumContainers(numOffSwitchContainers);
// Do we have any outstanding requests?
// If there is nothing, we need to deactivate this application
if (numOffSwitchContainers == 0) {
appSchedulingInfo.decrementSchedulerKeyReference(schedulerRequestKey);
appSchedulingInfo.getSchedulerKeys().remove(schedulerRequestKey);
appSchedulingInfo.checkForDeactivation();
resourceRequestMap.remove(ResourceRequest.ANY);
if (resourceRequestMap.isEmpty()) {
appSchedulingInfo.removePlacementSets(schedulerRequestKey);
}
}
appSchedulingInfo.decPendingResource(

View File

@ -31,6 +31,7 @@
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -63,6 +64,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
@ -86,6 +88,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@ -2946,6 +2949,162 @@ private void waitContainerAllocated(MockAM am, int mem, int nContainer,
}
}
@Test
public void testSchedulerKeyGarbageCollection() throws Exception {
YarnConfiguration conf =
new YarnConfiguration(new CapacitySchedulerConfiguration());
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm = new MockRM(conf, memStore);
rm.start();
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
// All nodes 1 - 4 will be applicable for scheduling.
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
Thread.sleep(1000);
AllocateResponse allocateResponse = am1.allocate(
Arrays.asList(
newResourceRequest(1, 1, ResourceRequest.ANY,
Resources.createResource(3 * GB), 1, true,
ExecutionType.GUARANTEED),
newResourceRequest(2, 2, ResourceRequest.ANY,
Resources.createResource(3 * GB), 1, true,
ExecutionType.GUARANTEED),
newResourceRequest(3, 3, ResourceRequest.ANY,
Resources.createResource(3 * GB), 1, true,
ExecutionType.GUARANTEED),
newResourceRequest(4, 4, ResourceRequest.ANY,
Resources.createResource(3 * GB), 1, true,
ExecutionType.GUARANTEED)
),
null);
List<Container> allocatedContainers = allocateResponse
.getAllocatedContainers();
Assert.assertEquals(0, allocatedContainers.size());
Collection<SchedulerRequestKey> schedulerKeys =
((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
.getAppSchedulingInfo().getSchedulerKeys();
Assert.assertEquals(4, schedulerKeys.size());
// Get a Node to HB... at which point 1 container should be
// allocated
nm1.nodeHeartbeat(true);
Thread.sleep(200);
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
allocatedContainers = allocateResponse.getAllocatedContainers();
Assert.assertEquals(1, allocatedContainers.size());
// Verify 1 outstanding schedulerKey is removed
Assert.assertEquals(3, schedulerKeys.size());
List <ResourceRequest> resReqs =
((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
.getAppSchedulingInfo().getAllResourceRequests();
// Verify 1 outstanding schedulerKey is removed from the
// rrMap as well
Assert.assertEquals(3, resReqs.size());
// Verify One more container Allocation on node nm2
// And ensure the outstanding schedulerKeys go down..
nm2.nodeHeartbeat(true);
Thread.sleep(200);
// Update the allocateReq to send 0 numContainer req.
// For the satisfied container...
allocateResponse = am1.allocate(Arrays.asList(
newResourceRequest(1,
allocatedContainers.get(0).getAllocationRequestId(),
ResourceRequest.ANY,
Resources.createResource(3 * GB), 0, true,
ExecutionType.GUARANTEED)
),
new ArrayList<>());
allocatedContainers = allocateResponse.getAllocatedContainers();
Assert.assertEquals(1, allocatedContainers.size());
// Verify 1 outstanding schedulerKey is removed
Assert.assertEquals(2, schedulerKeys.size());
resReqs = ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
.getAppSchedulingInfo().getAllResourceRequests();
// Verify the map size is not increased due to 0 req
Assert.assertEquals(2, resReqs.size());
// Now Verify that the AM can cancel 1 Ask:
SchedulerRequestKey sk = schedulerKeys.iterator().next();
am1.allocate(
Arrays.asList(
newResourceRequest(sk.getPriority().getPriority(),
sk.getAllocationRequestId(),
ResourceRequest.ANY, Resources.createResource(3 * GB), 0, true,
ExecutionType.GUARANTEED)
),
null);
schedulerKeys =
((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
.getAppSchedulingInfo().getSchedulerKeys();
Thread.sleep(200);
// Verify 1 outstanding schedulerKey is removed because of the
// cancel ask
Assert.assertEquals(1, schedulerKeys.size());
// Now verify that after the next node heartbeat, we allocate
// the last schedulerKey
nm3.nodeHeartbeat(true);
Thread.sleep(200);
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
allocatedContainers = allocateResponse.getAllocatedContainers();
Assert.assertEquals(1, allocatedContainers.size());
// Verify no more outstanding schedulerKeys..
Assert.assertEquals(0, schedulerKeys.size());
resReqs =
((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
.getAppSchedulingInfo().getAllResourceRequests();
Assert.assertEquals(0, resReqs.size());
}
private static ResourceRequest newResourceRequest(int priority,
long allocReqId, String rName, Resource resource, int numContainers,
boolean relaxLoc, ExecutionType eType) {
ResourceRequest rr = ResourceRequest.newInstance(
Priority.newInstance(priority), rName, resource, numContainers,
relaxLoc, null, ExecutionTypeRequest.newInstance(eType, true));
rr.setAllocationRequestId(allocReqId);
return rr;
}
@Test
public void testHierarchyQueuesCurrentLimits() throws Exception {
/*

View File

@ -1059,13 +1059,9 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
//test case 3
qb.finishApplication(app_0.getApplicationId(), user_0);
qb.finishApplication(app_2.getApplicationId(), user_1);
qb.releaseResource(clusterResource, app_0,
app_0.getAppSchedulingInfo().getPendingAsk(u0SchedKey)
.getPerAllocationResource(),
qb.releaseResource(clusterResource, app_0, Resource.newInstance(4*GB, 1),
null, null);
qb.releaseResource(clusterResource, app_2,
app_2.getAppSchedulingInfo().getPendingAsk(u1SchedKey)
.getPerAllocationResource(),
qb.releaseResource(clusterResource, app_2, Resource.newInstance(4*GB, 1),
null, null);
qb.setUserLimit(50);

View File

@ -24,6 +24,7 @@
import static org.junit.Assert.fail;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collection;
import javax.ws.rs.core.MediaType;
@ -46,6 +47,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@ -169,7 +172,38 @@ public void testAppsXML() throws JSONException, Exception {
assertEquals("incorrect number of elements", 1, nodesApps.getLength());
NodeList nodes = dom.getElementsByTagName("app");
assertEquals("incorrect number of elements", 1, nodes.getLength());
verifyAppsXML(nodes, app1);
verifyAppsXML(nodes, app1, false);
rm.stop();
}
@Test
public void testRunningApp() throws JSONException, Exception {
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, amNodeManager);
am1.allocate("*", 4096, 1, new ArrayList<>());
amNodeManager.nodeHeartbeat(true);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("apps").accept(MediaType.APPLICATION_XML)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_XML_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
String xml = response.getEntity(String.class);
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = dbf.newDocumentBuilder();
InputSource is = new InputSource();
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList nodesApps = dom.getElementsByTagName("apps");
assertEquals("incorrect number of elements", 1, nodesApps.getLength());
NodeList nodes = dom.getElementsByTagName("app");
assertEquals("incorrect number of elements", 1, nodes.getLength());
verifyAppsXML(nodes, app1, true);
testAppsHelper("apps/", app1, MediaType.APPLICATION_JSON, true);
rm.stop();
}
@ -203,6 +237,11 @@ public void testAppsXMLMulti() throws JSONException, Exception {
public void testAppsHelper(String path, RMApp app, String media)
throws JSONException, Exception {
testAppsHelper(path, app, media, false);
}
public void testAppsHelper(String path, RMApp app, String media,
boolean hasResourceReq) throws JSONException, Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
@ -215,7 +254,7 @@ public void testAppsHelper(String path, RMApp app, String media)
assertEquals("incorrect number of elements", 1, apps.length());
JSONArray array = apps.getJSONArray("app");
assertEquals("incorrect number of elements", 1, array.length());
verifyAppInfo(array.getJSONObject(0), app);
verifyAppInfo(array.getJSONObject(0), app, hasResourceReq);
}
@ -239,7 +278,7 @@ public void testAppsQueryState() throws JSONException, Exception {
assertEquals("incorrect number of elements", 1, apps.length());
JSONArray array = apps.getJSONArray("app");
assertEquals("incorrect number of elements", 1, array.length());
verifyAppInfo(array.getJSONObject(0), app1);
verifyAppInfo(array.getJSONObject(0), app1, false);
rm.stop();
}
@ -483,7 +522,7 @@ public void testAppsQueryFinalStatus() throws JSONException, Exception {
assertEquals("incorrect number of elements", 1, apps.length());
JSONArray array = apps.getJSONArray("app");
assertEquals("incorrect number of elements", 1, array.length());
verifyAppInfo(array.getJSONObject(0), app1);
verifyAppInfo(array.getJSONObject(0), app1, false);
rm.stop();
}
@ -1327,7 +1366,7 @@ public void testSingleAppsHelper(String path, RMApp app, String media)
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
verifyAppInfo(json.getJSONObject("app"), app);
verifyAppInfo(json.getJSONObject("app"), app, false);
}
@Test
@ -1351,11 +1390,11 @@ public void testSingleAppsXML() throws JSONException, Exception {
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("app");
assertEquals("incorrect number of elements", 1, nodes.getLength());
verifyAppsXML(nodes, app1);
verifyAppsXML(nodes, app1, false);
rm.stop();
}
public void verifyAppsXML(NodeList nodes, RMApp app)
public void verifyAppsXML(NodeList nodes, RMApp app, boolean hasResourceReq)
throws JSONException, Exception {
for (int i = 0; i < nodes.getLength(); i++) {
@ -1394,14 +1433,19 @@ public void verifyAppsXML(NodeList nodes, RMApp app)
WebServicesTestUtils.getXmlString(element, "amNodeLabelExpression"),
WebServicesTestUtils.getXmlString(element, "amRPCAddress"));
if (hasResourceReq) {
assertEquals(element.getElementsByTagName("resourceRequests").getLength(),
1);
Element resourceRequests =
(Element) element.getElementsByTagName("resourceRequests").item(0);
Element capability =
(Element) resourceRequests.getElementsByTagName("capability").item(0);
verifyResourceRequestsGeneric(app,
ResourceRequest rr =
((AbstractYarnScheduler)rm.getRMContext().getScheduler())
.getApplicationAttempt(
app.getCurrentAppAttempt().getAppAttemptId())
.getAppSchedulingInfo().getAllResourceRequests().get(0);
verifyResourceRequestsGeneric(rr,
WebServicesTestUtils.getXmlString(resourceRequests,
"nodeLabelExpression"),
WebServicesTestUtils.getXmlInt(resourceRequests, "numContainers"),
@ -1415,11 +1459,12 @@ public void verifyAppsXML(NodeList nodes, RMApp app)
"enforceExecutionType"));
}
}
}
public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
Exception {
public void verifyAppInfo(JSONObject info, RMApp app, boolean hasResourceReqs)
throws JSONException, Exception {
int expectedNumberOfElements = 35;
int expectedNumberOfElements = 34 + (hasResourceReqs ? 2 : 0);
String appNodeLabelExpression = null;
String amNodeLabelExpression = null;
if (app.getApplicationSubmissionContext()
@ -1461,8 +1506,10 @@ public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
amNodeLabelExpression,
amRPCAddress);
if (hasResourceReqs) {
verifyResourceRequests(info.getJSONArray("resourceRequests"), app);
}
}
public void verifyAppInfoGeneric(RMApp app, String id, String user,
String name, String applicationType, String queue, int prioirty,
@ -1490,8 +1537,10 @@ public void verifyAppInfoGeneric(RMApp app, String id, String user,
WebServicesTestUtils.checkStringMatch("finalStatus", app
.getFinalApplicationStatus().toString(), finalStatus);
assertEquals("progress doesn't match", 0, progress, 0.0);
if ("UNASSIGNED".equals(trackingUI)) {
WebServicesTestUtils.checkStringMatch("trackingUI", "UNASSIGNED",
trackingUI);
}
WebServicesTestUtils.checkStringEqual("diagnostics",
app.getDiagnostics().toString(), diagnostics);
assertEquals("clusterId doesn't match",
@ -1544,7 +1593,12 @@ public void verifyAppInfoGeneric(RMApp app, String id, String user,
public void verifyResourceRequests(JSONArray resourceRequest, RMApp app)
throws JSONException {
JSONObject requestInfo = resourceRequest.getJSONObject(0);
verifyResourceRequestsGeneric(app,
ResourceRequest rr =
((AbstractYarnScheduler) rm.getRMContext().getScheduler())
.getApplicationAttempt(
app.getCurrentAppAttempt().getAppAttemptId())
.getAppSchedulingInfo().getAllResourceRequests().get(0);
verifyResourceRequestsGeneric(rr,
requestInfo.getString("nodeLabelExpression"),
requestInfo.getInt("numContainers"),
requestInfo.getBoolean("relaxLocality"), requestInfo.getInt("priority"),
@ -1557,11 +1611,10 @@ public void verifyResourceRequests(JSONArray resourceRequest, RMApp app)
.getBoolean("enforceExecutionType"));
}
public void verifyResourceRequestsGeneric(RMApp app,
public void verifyResourceRequestsGeneric(ResourceRequest request,
String nodeLabelExpression, int numContainers, boolean relaxLocality,
int priority, String resourceName, long memory, long vCores,
String executionType, boolean enforceExecutionType) {
ResourceRequest request = app.getAMResourceRequests().get(0);
assertEquals("nodeLabelExpression doesn't match",
request.getNodeLabelExpression(), nodeLabelExpression);
assertEquals("numContainers doesn't match", request.getNumContainers(),