From 134ae8fc8045e2ae1ed7ca54df95f14ffc863d09 Mon Sep 17 00:00:00 2001 From: bibinchundatt Date: Thu, 14 Feb 2019 22:56:52 +0530 Subject: [PATCH] YARN-9293. Optimize MockAMLauncher event handling. Contributed by Bibin A Chundatt. --- .../org/apache/hadoop/yarn/sls/SLSRunner.java | 11 +++-- .../yarn/sls/appmaster/AMSimulator.java | 10 ++++- .../yarn/sls/appmaster/MRAMSimulator.java | 9 ++-- .../yarn/sls/appmaster/StreamAMSimulator.java | 5 ++- .../sls/resourcemanager/MockAMLauncher.java | 44 +++++++++---------- .../yarn/sls/appmaster/TestAMSimulator.java | 10 +++-- 6 files changed, 52 insertions(+), 37 deletions(-) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 1fadd42c3d..b775d8bd98 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -60,6 +60,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; @@ -113,6 +114,7 @@ public class SLSRunner extends Configured implements Tool { // AM simulator private int AM_ID; private Map amMap; + private Map appIdAMSim; private Set trackedApps; private Map amClassMap; private static int remainingApps = 0; @@ -170,7 +172,7 @@ private void init(Configuration tempConf) throws ClassNotFoundException { queueAppNumMap = new HashMap<>(); amMap = new ConcurrentHashMap<>(); amClassMap = new HashMap<>(); - + appIdAMSim = new ConcurrentHashMap<>(); // runner configuration setConf(tempConf); @@ -277,7 +279,7 @@ private void startRM() throws ClassNotFoundException, YarnException { rm = new ResourceManager() { @Override protected ApplicationMasterLauncher createAMLauncher() { - return new MockAMLauncher(se, this.rmContext, amMap); + return new MockAMLauncher(se, this.rmContext, appIdAMSim); } }; @@ -587,7 +589,7 @@ private void startAMFromRumenTrace(String inputTrace, long baselineTimeMS) try { createAMForJob(job, baselineTimeMS); } catch (Exception e) { - LOG.error("Failed to create an AM: {}", e.getMessage()); + LOG.error("Failed to create an AM", e); } job = reader.getNext(); @@ -808,7 +810,8 @@ private void runNewAM(String jobType, String user, AM_ID++; amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, - runner.getStartTimeMS(), amContainerResource, labelExpr, params); + runner.getStartTimeMS(), amContainerResource, labelExpr, params, + appIdAMSim); if(reservationId != null) { // if we have a ReservationId, delegate reservation creation to // AMSim (reservation shape is impl specific) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 5f34cfccfb..ac83ab258f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -116,6 +116,8 @@ public abstract class AMSimulator extends TaskRunner.Task { private ReservationSubmissionRequest reservationRequest; + private Map appIdToAMSim; + public AMSimulator() { this.responseQueue = new LinkedBlockingQueue<>(); } @@ -125,8 +127,8 @@ public void init(int heartbeatInterval, List containerList, ResourceManager resourceManager, SLSRunner slsRunnner, long startTime, long finishTime, String simUser, String simQueue, boolean tracked, String oldApp, long baseTimeMS, - Resource amResource, String nodeLabelExpr, - Map params) { + Resource amResource, String nodeLabelExpr, Map params, + Map appIdAMSim) { super.init(startTime, startTime + 1000000L * heartbeatInterval, heartbeatInterval); this.user = simUser; @@ -140,6 +142,7 @@ public void init(int heartbeatInterval, this.traceFinishTimeMS = finishTime; this.amContainerResource = amResource; this.nodeLabelExpression = nodeLabelExpr; + this.appIdToAMSim = appIdAMSim; } /** @@ -163,6 +166,9 @@ public void firstStep() throws Exception { // submit application, waiting until ACCEPTED submitApp(reservationId); + // add submitted app to mapping + appIdToAMSim.put(appId, this); + // track app metrics trackApp(); } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 71fc5b2772..586c671afe 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -127,10 +128,10 @@ public void init(int heartbeatInterval, long traceStartTime, long traceFinishTime, String user, String queue, boolean isTracked, String oldAppId, long baselineStartTimeMS, Resource amContainerResource, String nodeLabelExpr, - Map params) { - super.init(heartbeatInterval, containerList, rm, se, - traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, - baselineStartTimeMS, amContainerResource, nodeLabelExpr, params); + Map params, Map appIdAMSim) { + super.init(heartbeatInterval, containerList, rm, se, traceStartTime, + traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, + amContainerResource, nodeLabelExpr, params, appIdAMSim); amtype = "mapreduce"; // get map/reduce tasks diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java index 862e5ec0ac..46bc90a337 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -24,6 +24,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -97,10 +98,10 @@ public void init(int heartbeatInterval, long traceStartTime, long traceFinishTime, String user, String queue, boolean isTracked, String oldAppId, long baselineStartTimeMS, Resource amContainerResource, String nodeLabelExpr, - Map params) { + Map params, Map appIdAMSim) { super.init(heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, - amContainerResource, nodeLabelExpr, params); + amContainerResource, nodeLabelExpr, params, appIdAMSim); amtype = "stream"; allStreams.addAll(containerList); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java index 9fb83ec24d..208629afdb 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java @@ -45,13 +45,14 @@ public class MockAMLauncher extends ApplicationMasterLauncher private static final Logger LOG = LoggerFactory.getLogger( MockAMLauncher.class); - Map amMap; + private Map appIdAMSim; + SLSRunner se; public MockAMLauncher(SLSRunner se, RMContext rmContext, - Map amMap) { + Map appIdAMSim) { super(rmContext); - this.amMap = amMap; + this.appIdAMSim = appIdAMSim; this.se = se; } @@ -86,30 +87,28 @@ public void handle(AMLauncherEvent event) { event.getAppAttempt().getAppAttemptId().getApplicationId(); // find AMSimulator - for (AMSimulator ams : amMap.values()) { - if (ams.getApplicationId() != null && ams.getApplicationId().equals( - appId)) { - try { - Container amContainer = event.getAppAttempt().getMasterContainer(); + AMSimulator ams = appIdAMSim.get(appId); + if (ams != null) { + try { + Container amContainer = event.getAppAttempt().getMasterContainer(); - setupAMRMToken(event.getAppAttempt()); + setupAMRMToken(event.getAppAttempt()); - // Notify RMAppAttempt to change state - super.context.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(), - RMAppAttemptEventType.LAUNCHED)); + // Notify RMAppAttempt to change state + super.context.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(), + RMAppAttemptEventType.LAUNCHED)); - ams.notifyAMContainerLaunched( - event.getAppAttempt().getMasterContainer()); - LOG.info("Notify AM launcher launched:" + amContainer.getId()); + ams.notifyAMContainerLaunched( + event.getAppAttempt().getMasterContainer()); + LOG.info("Notify AM launcher launched:" + amContainer.getId()); - se.getNmMap().get(amContainer.getNodeId()) - .addNewContainer(amContainer, 100000000L); + se.getNmMap().get(amContainer.getNodeId()) + .addNewContainer(amContainer, 100000000L); - return; - } catch (Exception e) { - throw new YarnRuntimeException(e); - } + return; + } catch (Exception e) { + throw new YarnRuntimeException(e); } } @@ -117,4 +116,5 @@ public void handle(AMLauncherEvent event) { "Didn't find any AMSimulator for applicationId=" + appId); } } + } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index 2efa846441..cef41d6244 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.sls.appmaster; import com.codahale.metrics.MetricRegistry; +import java.util.HashMap; import org.apache.commons.io.FileUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; @@ -144,8 +145,10 @@ public void testAMSimulator() throws Exception { String appId = "app1"; String queue = "default"; List containers = new ArrayList<>(); + HashMap map = new HashMap<>(); app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null); + appId, 0, SLSConfiguration.getAMContainerResource(conf), null, null, + map); app.firstStep(); verifySchedulerMetrics(appId); @@ -169,9 +172,10 @@ public void testAMSimulatorWithNodeLabels() throws Exception { String appId = "app1"; String queue = "default"; List containers = new ArrayList<>(); + HashMap map = new HashMap<>(); app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, 0, SLSConfiguration.getAMContainerResource(conf), - "label1", null); + appId, 0, SLSConfiguration.getAMContainerResource(conf), "label1", + null, map); app.firstStep(); verifySchedulerMetrics(appId);