diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 46ca4bd090..780110968a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -63,10 +63,10 @@ public static void recordRejectedAppActivityFromLeafQueue( ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, Priority priority, String diagnostic) { - NodeId nodeId = getRecordingNodeId(activitiesManager, node); - if (nodeId == null) { + if (activitiesManager == null) { return; } + NodeId nodeId = getRecordingNodeId(activitiesManager, node); if (activitiesManager.shouldRecordThisNode(nodeId)) { recordActivity(activitiesManager, nodeId, application.getQueueName(), application.getApplicationId().toString(), priority, @@ -85,10 +85,10 @@ public static void recordAppActivityWithoutAllocation( ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, Priority priority, String diagnostic, ActivityState appState) { - NodeId nodeId = getRecordingNodeId(activitiesManager, node); - if (nodeId == null) { + if (activitiesManager == null) { return; } + NodeId nodeId = getRecordingNodeId(activitiesManager, node); if (activitiesManager.shouldRecordThisNode(nodeId)) { String type = "container"; // Add application-container activity into specific node allocation. @@ -123,10 +123,10 @@ public static void recordAppActivityWithAllocation( ActivitiesManager activitiesManager, SchedulerNode node, SchedulerApplicationAttempt application, RMContainer updatedContainer, ActivityState activityState) { - NodeId nodeId = getRecordingNodeId(activitiesManager, node); - if (nodeId == null) { + if (activitiesManager == null) { return; } + NodeId nodeId = getRecordingNodeId(activitiesManager, node); if (activitiesManager.shouldRecordThisNode(nodeId)) { String type = "container"; // Add application-container activity into specific node allocation. @@ -163,10 +163,10 @@ public static void startAppAllocationRecording( ActivitiesManager activitiesManager, FiCaSchedulerNode node, long currentTime, SchedulerApplicationAttempt application) { - NodeId nodeId = getRecordingNodeId(activitiesManager, node); - if (nodeId == null) { + if (activitiesManager == null) { return; } + NodeId nodeId = getRecordingNodeId(activitiesManager, node); activitiesManager .startAppAllocationRecording(nodeId, currentTime, application); @@ -214,10 +214,10 @@ public static class QUEUE { public static void recordQueueActivity(ActivitiesManager activitiesManager, SchedulerNode node, String parentQueueName, String queueName, ActivityState state, String diagnostic) { - NodeId nodeId = getRecordingNodeId(activitiesManager, node); - if (nodeId == null) { + if (activitiesManager == null) { return; } + NodeId nodeId = getRecordingNodeId(activitiesManager, node); if (activitiesManager.shouldRecordThisNode(nodeId)) { recordActivity(activitiesManager, nodeId, parentQueueName, queueName, null, state, diagnostic, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java index 740e974340..99ee48ab82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.util.SystemClock; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.List; import java.util.Set; @@ -57,9 +59,10 @@ public class ActivitiesManager extends AbstractService { private Set activeRecordedNodes; private ConcurrentMap recordingAppActivitiesUntilSpecifiedTime; - private ConcurrentMap appsAllocation; - private ConcurrentMap> - completedAppAllocations; + private ThreadLocal> + appsAllocation; + @VisibleForTesting + ConcurrentMap> completedAppAllocations; private boolean recordNextAvailableNode = false; private List lastAvailableNodeActivities = null; private Thread cleanUpThread; @@ -71,7 +74,7 @@ public ActivitiesManager(RMContext rmContext) { super(ActivitiesManager.class.getName()); recordingNodesAllocation = ThreadLocal.withInitial(() -> new HashMap()); completedNodeAllocations = new ConcurrentHashMap<>(); - appsAllocation = new ConcurrentHashMap<>(); + appsAllocation = ThreadLocal.withInitial(() -> new HashMap()); completedAppAllocations = new ConcurrentHashMap<>(); activeRecordedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>(); @@ -79,11 +82,15 @@ public ActivitiesManager(RMContext rmContext) { } public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) { - if (rmContext.getRMApps().get(applicationId).getFinalApplicationStatus() + RMApp app = rmContext.getRMApps().get(applicationId); + if (app != null && app.getFinalApplicationStatus() == FinalApplicationStatus.UNDEFINED) { - List allocations = completedAppAllocations.get( - applicationId); - + Queue curAllocations = + completedAppAllocations.get(applicationId); + List allocations = null; + if (curAllocations != null) { + allocations = new ArrayList(curAllocations); + } return new AppActivitiesInfo(allocations, applicationId); } else { return new AppActivitiesInfo( @@ -135,13 +142,13 @@ public void run() { } } - Iterator>> iteApp = + Iterator>> iteApp = completedAppAllocations.entrySet().iterator(); while (iteApp.hasNext()) { - Map.Entry> appAllocation = + Map.Entry> appAllocation = iteApp.next(); - if (rmContext.getRMApps().get(appAllocation.getKey()) - .getFinalApplicationStatus() + RMApp rmApp = rmContext.getRMApps().get(appAllocation.getKey()); + if (rmApp == null || rmApp.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) { iteApp.remove(); } @@ -191,18 +198,16 @@ void startAppAllocationRecording(NodeId nodeID, long currTS, SchedulerApplicationAttempt application) { ApplicationId applicationId = application.getApplicationId(); - if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) - && recordingAppActivitiesUntilSpecifiedTime.get(applicationId) - > currTS) { - appsAllocation.put(applicationId, - new AppAllocation(application.getPriority(), nodeID, - application.getQueueName())); - } - - if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) - && recordingAppActivitiesUntilSpecifiedTime.get(applicationId) - <= currTS) { - turnOffActivityMonitoringForApp(applicationId); + Long turnOffTimestamp = + recordingAppActivitiesUntilSpecifiedTime.get(applicationId); + if (turnOffTimestamp != null) { + if (turnOffTimestamp > currTS) { + appsAllocation.get().put(applicationId, + new AppAllocation(application.getPriority(), nodeID, + application.getQueueName())); + } else { + turnOffActivityMonitoringForApp(applicationId); + } } } @@ -223,7 +228,7 @@ void addSchedulingActivityForApp(ApplicationId applicationId, ContainerId containerId, String priority, ActivityState state, String diagnostic, String type) { if (shouldRecordThisApp(applicationId)) { - AppAllocation appAllocation = appsAllocation.get(applicationId); + AppAllocation appAllocation = appsAllocation.get().get(applicationId); appAllocation.addAppAllocationActivity(containerId == null ? "Container-Id-Not-Assigned" : containerId.toString(), priority, state, diagnostic, type); @@ -245,24 +250,27 @@ void finishAppAllocationRecording(ApplicationId applicationId, ContainerId containerId, ActivityState appState, String diagnostic) { if (shouldRecordThisApp(applicationId)) { long currTS = SystemClock.getInstance().getTime(); - AppAllocation appAllocation = appsAllocation.remove(applicationId); + AppAllocation appAllocation = appsAllocation.get().remove(applicationId); appAllocation.updateAppContainerStateAndTime(containerId, appState, currTS, diagnostic); - List appAllocations; - if (completedAppAllocations.containsKey(applicationId)) { - appAllocations = completedAppAllocations.get(applicationId); - } else { - appAllocations = new ArrayList<>(); - completedAppAllocations.put(applicationId, appAllocations); + Queue appAllocations = + completedAppAllocations.get(applicationId); + if (appAllocations == null) { + appAllocations = new ConcurrentLinkedQueue<>(); + Queue curAppAllocations = + completedAppAllocations.putIfAbsent(applicationId, appAllocations); + if (curAppAllocations != null) { + appAllocations = curAppAllocations; + } } if (appAllocations.size() == 1000) { - appAllocations.remove(0); + appAllocations.poll(); } appAllocations.add(appAllocation); - - if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId) - <= currTS) { + Long stopTime = + recordingAppActivitiesUntilSpecifiedTime.get(applicationId); + if (stopTime != null && stopTime <= currTS) { turnOffActivityMonitoringForApp(applicationId); } } @@ -292,8 +300,12 @@ void finishNodeUpdateRecording(NodeId nodeID) { } boolean shouldRecordThisApp(ApplicationId applicationId) { + if (recordingAppActivitiesUntilSpecifiedTime.isEmpty() + || appsAllocation.get().isEmpty()) { + return false; + } return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) - && appsAllocation.containsKey(applicationId); + && appsAllocation.get().containsKey(applicationId); } boolean shouldRecordThisNode(NodeId nodeID) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java index 15850c0412..1903ae7843 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java @@ -68,7 +68,7 @@ public void addAppAllocationActivity(String containerId, String priority, } public String getNodeId() { - return nodeId.toString(); + return nodeId == null ? null : nodeId.toString(); } public String getQueueName() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index e4c81dbacc..2e422e6def 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -746,6 +746,7 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, return appActivitiesInfo; } catch (Exception e) { String errMessage = "Cannot find application with given appId"; + LOG.error(errMessage, e); return new AppActivitiesInfo(errMessage, appId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java index 5216a21684..bc81e615d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -41,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.SystemClock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -189,6 +192,55 @@ public void testRecordingSchedulerActivitiesForMultiNodesInMultiThreads() Assert.assertEquals(1, activitiesManager.historyNodeAllocations.size()); } + + /** + * Test recording app activities in multiple threads, + * only one activity info should be recorded by one of these threads. + */ + @Test + public void testRecordingAppActivitiesInMultiThreads() + throws Exception { + Random rand = new Random(); + // start recording activities for a random app + SchedulerApplicationAttempt randomApp = apps.get(rand.nextInt(NUM_APPS)); + activitiesManager + .turnOnAppActivitiesRecording(randomApp.getApplicationId(), 3); + List> futures = new ArrayList<>(); + // generate app activities + int nTasks = 20; + for (int i=0; i task = () -> { + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + (FiCaSchedulerNode) nodes.get(0), + SystemClock.getInstance().getTime(), randomApp); + for (SchedulerNode node : nodes) { + ActivitiesLogger.APP + .recordAppActivityWithoutAllocation(activitiesManager, node, + randomApp, Priority.newInstance(0), + ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, + ActivityState.REJECTED); + } + ActivitiesLogger.APP + .finishAllocatedAppAllocationRecording(activitiesManager, + randomApp.getApplicationId(), null, ActivityState.SKIPPED, + ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES); + return null; + }; + futures.add(threadPoolExecutor.submit(task)); + } + // Check activities for multi-nodes should be recorded only once + for (Future future : futures) { + future.get(); + } + Queue appAllocations = + activitiesManager.completedAppAllocations + .get(randomApp.getApplicationId()); + Assert.assertEquals(nTasks, appAllocations.size()); + for(AppAllocation aa : appAllocations) { + Assert.assertEquals(NUM_NODES, aa.getAllocationAttempts().size()); + } + } + /** * Testing activities manager which can record all history information about * node allocations. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java index 724d592d6f..7bc8634215 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java @@ -21,6 +21,7 @@ import com.google.inject.servlet.ServletModule; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; import com.sun.jersey.test.framework.WebAppDescriptor; import org.apache.hadoop.http.JettyUtils; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.GuiceServletConfig; @@ -84,6 +86,17 @@ protected void configureServlets() { // enable multi-nodes placement conf.setBoolean( CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true); + String policyName = "resource-based"; + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + policyName); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, + policyName); + String policyConfPrefix = + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + "." + + policyName; + conf.set(policyConfPrefix + ".class", + ResourceUsageMultiNodeLookupPolicy.class.getName()); + conf.set(policyConfPrefix + ".sorting-interval.ms", "0"); rm = new MockRM(conf); bind(ResourceManager.class).toInstance(rm); serve("/*").with(GuiceContainer.class); @@ -204,6 +217,59 @@ public void testSchedulingWithoutPendingRequests() } } + @Test + public void testAppAssignContainer() throws Exception { + rm.start(); + + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024); + MockNM nm2 = rm.registerNode("127.0.0.2:1234", 2 * 1024); + + try { + RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.UNDEFINED, "*", Resources.createResource(3072), + 1)), null); + + //Trigger recording for this app + WebResource r = resource(); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString()); + ClientResponse response = r.path("ws").path("v1").path("cluster") + .path("scheduler/app-activities").queryParams(params) + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + JSONObject json = response.getEntity(JSONObject.class); + assertEquals("waiting for display", json.getString("diagnostic")); + + //Trigger scheduling for this app + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode)); + + //Check app activities, it should contain one allocation and + // final allocation state is ALLOCATED + response = r.path("ws").path("v1").path("cluster") + .path("scheduler/app-activities").queryParams(params) + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, + response.getType().toString()); + json = response.getEntity(JSONObject.class); + + verifyNumberOfAllocations(json, 1); + + JSONObject allocations = json.getJSONObject("allocations"); + verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED"); + JSONArray allocationAttempts = + allocations.getJSONArray("allocationAttempt"); + assertEquals(2, allocationAttempts.length()); + } finally { + rm.stop(); + } + } + private void verifyNumberOfAllocations(JSONObject json, int realValue) throws Exception { if (json.isNull("allocations")) {