From 84cea0011ffe510d24cf9f2952944f7a6fe622cf Mon Sep 17 00:00:00 2001 From: Carlo Curino Date: Tue, 20 Feb 2018 17:00:34 -0800 Subject: [PATCH] YARN-7732. Support Generic AM Simulator from SynthGenerator. (Contributed by Young Chen via curino) --- hadoop-tools/hadoop-sls/pom.xml | 2 + .../org/apache/hadoop/yarn/sls/SLSRunner.java | 141 ++--- .../yarn/sls/appmaster/AMSimulator.java | 2 +- .../yarn/sls/appmaster/MRAMSimulator.java | 7 +- .../yarn/sls/appmaster/StreamAMSimulator.java | 273 ++++++++++ .../yarn/sls/appmaster/package-info.java | 21 + .../hadoop/yarn/sls/synthetic/SynthJob.java | 379 +++++++------ .../yarn/sls/synthetic/SynthJobClass.java | 180 ------- .../sls/synthetic/SynthTraceJobProducer.java | 505 +++++++++++++++--- .../yarn/sls/synthetic/SynthWorkload.java | 121 ----- .../hadoop/yarn/sls/BaseSLSRunnerTest.java | 2 +- .../hadoop/yarn/sls/TestSLSGenericSynth.java | 76 +++ .../hadoop/yarn/sls/TestSLSStreamAMSynth.java | 76 +++ .../yarn/sls/TestSynthJobGeneration.java | 215 +++++++- .../yarn/sls/appmaster/TestAMSimulator.java | 2 +- .../src/test/resources/sls-runner.xml | 4 + .../hadoop-sls/src/test/resources/syn.json | 2 +- .../src/test/resources/syn_generic.json | 54 ++ .../src/test/resources/syn_stream.json | 46 ++ 19 files changed, 1448 insertions(+), 660 deletions(-) create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java create mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java delete mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java delete mode 100644 hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java create mode 100644 hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java create mode 100644 hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java create mode 100644 hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json create mode 100644 hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml index a7cb9b2179..ef5ac547b5 100644 --- a/hadoop-tools/hadoop-sls/pom.xml +++ b/hadoop-tools/hadoop-sls/pom.xml @@ -133,6 +133,8 @@ src/test/resources/simulate.info.html.template src/test/resources/track.html.template src/test/resources/syn.json + src/test/resources/syn_generic.json + src/test/resources/syn_stream.json src/test/resources/inputsls.json src/test/resources/nodes.json src/test/resources/exit-invariants.txt diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java index 456602f19a..951c09d2c5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java @@ -47,13 +47,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.tools.rumen.JobTraceReader; import org.apache.hadoop.tools.rumen.LoggedJob; import org.apache.hadoop.tools.rumen.LoggedTask; import org.apache.hadoop.tools.rumen.LoggedTaskAttempt; -import org.apache.hadoop.tools.rumen.TaskAttemptInfo; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -627,89 +625,66 @@ private void startAMFromSynthGenerator() throws YarnException, IOException { localConf.set("fs.defaultFS", "file:///"); long baselineTimeMS = 0; - try { + // if we use the nodeFile this could have been not initialized yet. + if (stjp == null) { + stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + } - // if we use the nodeFile this could have been not initialized yet. - if (stjp == null) { - stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); + SynthJob job = null; + // we use stjp, a reference to the job producer instantiated during node + // creation + while ((job = (SynthJob) stjp.getNextJob()) != null) { + // only support MapReduce currently + String user = job.getUser(); + String jobQueue = job.getQueueName(); + String oldJobId = job.getJobID().toString(); + long jobStartTimeMS = job.getSubmissionTime(); + + // CARLO: Finish time is only used for logging, omit for now + long jobFinishTimeMS = jobStartTimeMS + job.getDuration(); + + if (baselineTimeMS == 0) { + baselineTimeMS = jobStartTimeMS; + } + jobStartTimeMS -= baselineTimeMS; + jobFinishTimeMS -= baselineTimeMS; + if (jobStartTimeMS < 0) { + LOG.warn("Warning: reset job {} start time to 0.", oldJobId); + jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; + jobStartTimeMS = 0; } - SynthJob job = null; - // we use stjp, a reference to the job producer instantiated during node - // creation - while ((job = (SynthJob) stjp.getNextJob()) != null) { - // only support MapReduce currently - String user = job.getUser(); - String jobQueue = job.getQueueName(); - String oldJobId = job.getJobID().toString(); - long jobStartTimeMS = job.getSubmissionTime(); + increaseQueueAppNum(jobQueue); - // CARLO: Finish time is only used for logging, omit for now - long jobFinishTimeMS = -1L; - - if (baselineTimeMS == 0) { - baselineTimeMS = jobStartTimeMS; - } - jobStartTimeMS -= baselineTimeMS; - jobFinishTimeMS -= baselineTimeMS; - if (jobStartTimeMS < 0) { - LOG.warn("Warning: reset job {} start time to 0.", oldJobId); - jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS; - jobStartTimeMS = 0; - } - - increaseQueueAppNum(jobQueue); - - List containerList = - new ArrayList(); - ArrayList keyAsArray = new ArrayList(nmMap.keySet()); - Random rand = new Random(stjp.getSeed()); - - // map tasks - for (int i = 0; i < job.getNumberMaps(); i++) { - TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0); - RMNode node = - nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))) - .getNode(); - String hostname = "/" + node.getRackName() + "/" + node.getHostName(); - long containerLifeTime = tai.getRuntime(); - Resource containerResource = - Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), - (int) tai.getTaskInfo().getTaskVCores()); - containerList.add(new ContainerSimulator(containerResource, - containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map")); - } - - // reduce tasks - for (int i = 0; i < job.getNumberReduces(); i++) { - TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0); - RMNode node = - nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))) - .getNode(); - String hostname = "/" + node.getRackName() + "/" + node.getHostName(); - long containerLifeTime = tai.getRuntime(); - Resource containerResource = - Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(), - (int) tai.getTaskInfo().getTaskVCores()); - containerList.add( - new ContainerSimulator(containerResource, containerLifeTime, - hostname, DEFAULT_REDUCER_PRIORITY, "reduce")); - } - - ReservationId reservationId = null; - - if (job.hasDeadline()) { - reservationId = - ReservationId.newInstance(this.rm.getStartTime(), AM_ID); - } - - runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId, - jobStartTimeMS, jobFinishTimeMS, containerList, reservationId, - job.getDeadline(), getAMContainerResource(null)); + List containerList = + new ArrayList(); + ArrayList keyAsArray = new ArrayList(nmMap.keySet()); + Random rand = new Random(stjp.getSeed()); + for (SynthJob.SynthTask task : job.getTasks()) { + RMNode node = nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))) + .getNode(); + String hostname = "/" + node.getRackName() + "/" + node.getHostName(); + long containerLifeTime = task.getTime(); + Resource containerResource = Resource + .newInstance((int) task.getMemory(), (int) task.getVcores()); + containerList.add( + new ContainerSimulator(containerResource, containerLifeTime, + hostname, task.getPriority(), task.getType())); } - } finally { - stjp.close(); + + + ReservationId reservationId = null; + + if(job.hasDeadline()){ + reservationId = ReservationId + .newInstance(this.rm.getStartTime(), AM_ID); + } + + runNewAM(job.getType(), user, jobQueue, oldJobId, + jobStartTimeMS, jobFinishTimeMS, containerList, reservationId, + job.getDeadline(), getAMContainerResource(null), + job.getParams()); } } @@ -753,14 +728,14 @@ private void runNewAM(String jobType, String user, Resource amContainerResource) { runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, jobFinishTimeMS, containerList, null, -1, - amContainerResource); + amContainerResource, null); } private void runNewAM(String jobType, String user, String jobQueue, String oldJobId, long jobStartTimeMS, long jobFinishTimeMS, List containerList, - ReservationId reservationId, long deadline, - Resource amContainerResource) { + ReservationId reservationId, long deadline, Resource amContainerResource, + Map params) { AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( amClassMap.get(jobType), new Configuration()); @@ -777,7 +752,7 @@ private void runNewAM(String jobType, String user, AM_ID++; amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, - runner.getStartTimeMS(), amContainerResource); + runner.getStartTimeMS(), amContainerResource, params); if(reservationId != null) { // if we have a ReservationId, delegate reservation creation to // AMSim (reservation shape is impl specific) diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java index 5727b5f37b..bf85fff3b1 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java @@ -121,7 +121,7 @@ public void init(int heartbeatInterval, List containerList, ResourceManager resourceManager, SLSRunner slsRunnner, long startTime, long finishTime, String simUser, String simQueue, boolean tracked, String oldApp, long baseTimeMS, - Resource amResource) { + Resource amResource, Map params) { super.init(startTime, startTime + 1000000L * heartbeatInterval, heartbeatInterval); this.user = simUser; diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java index 18a155cb2a..6f0f85ff90 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java @@ -65,6 +65,9 @@ public class MRAMSimulator extends AMSimulator { scheduled when all maps have finished (not support slow-start currently). */ + public static final String MAP_TYPE = "map"; + public static final String REDUCE_TYPE = "reduce"; + private static final int PRIORITY_REDUCE = 10; private static final int PRIORITY_MAP = 20; @@ -123,10 +126,10 @@ public void init(int heartbeatInterval, List containerList, ResourceManager rm, SLSRunner se, long traceStartTime, long traceFinishTime, String user, String queue, boolean isTracked, String oldAppId, long baselineStartTimeMS, - Resource amContainerResource) { + Resource amContainerResource, Map params) { super.init(heartbeatInterval, containerList, rm, se, traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, - baselineStartTimeMS, amContainerResource); + baselineStartTimeMS, amContainerResource, params); amtype = "mapreduce"; // get map/reduce tasks diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java new file mode 100644 index 0000000000..b41f5f2029 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls.appmaster; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.sls.SLSRunner; +import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * AMSimulator that simulates streaming services - it keeps tasks + * running and resubmits them whenever they fail or complete. It finishes + * when the specified duration expires. + */ + +@Private +@Unstable +public class StreamAMSimulator extends AMSimulator { + /* + Vocabulary Used: + pending -> requests which are NOT yet sent to RM + scheduled -> requests which are sent to RM but not yet assigned + assigned -> requests which are assigned to a container + completed -> request corresponding to which container has completed + + streams are constantly scheduled. If a streaming job is killed, we restart it + */ + + private static final int PRIORITY_MAP = 20; + + // pending streams + private LinkedList pendingStreams = + new LinkedList<>(); + + // scheduled streams + private LinkedList scheduledStreams = + new LinkedList(); + + // assigned streams + private Map assignedStreams = + new HashMap(); + + // all streams + private LinkedList allStreams = + new LinkedList(); + + // finished + private boolean isFinished = false; + private long duration = 0; + + private static final Logger LOG = + LoggerFactory.getLogger(StreamAMSimulator.class); + + @SuppressWarnings("checkstyle:parameternumber") + public void init(int heartbeatInterval, + List containerList, ResourceManager rm, SLSRunner se, + long traceStartTime, long traceFinishTime, String user, String queue, + boolean isTracked, String oldAppId, long baselineStartTimeMS, + Resource amContainerResource, Map params) { + super.init(heartbeatInterval, containerList, rm, se, traceStartTime, + traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, + amContainerResource, params); + amtype = "stream"; + + allStreams.addAll(containerList); + + duration = traceFinishTime - traceStartTime; + + LOG.info("Added new job with {} streams, running for {}", + allStreams.size(), duration); + } + + @Override + public synchronized void notifyAMContainerLaunched(Container masterContainer) + throws Exception { + if (null != masterContainer) { + restart(); + super.notifyAMContainerLaunched(masterContainer); + } + } + + @Override + @SuppressWarnings("unchecked") + protected void processResponseQueue() throws Exception { + while (!responseQueue.isEmpty()) { + AllocateResponse response = responseQueue.take(); + + // check completed containers + if (!response.getCompletedContainersStatuses().isEmpty()) { + for (ContainerStatus cs : response.getCompletedContainersStatuses()) { + ContainerId containerId = cs.getContainerId(); + if(assignedStreams.containsKey(containerId)){ + // One of our containers completed. Regardless of reason, + // we want to maintain our streaming process + LOG.debug("Application {} has one streamer finished ({}).", appId, + containerId); + pendingStreams.add(assignedStreams.remove(containerId)); + } else if (amContainer.getId().equals(containerId)){ + // Our am container completed + if(cs.getExitStatus() == ContainerExitStatus.SUCCESS){ + // am container released event (am container completed on success) + isAMContainerRunning = false; + isFinished = true; + LOG.info("Application {} goes to finish.", appId); + } else { + // am container killed - wait for re allocation + LOG.info("Application {}'s AM is " + + "going to be killed. Waiting for rescheduling...", appId); + isAMContainerRunning = false; + } + } + } + } + + // check finished + if (isAMContainerRunning && + (System.currentTimeMillis() - simulateStartTimeMS >= duration)) { + LOG.debug("Application {} sends out event to clean up" + + " its AM container.", appId); + isAMContainerRunning = false; + isFinished = true; + break; + } + + // check allocated containers + for (Container container : response.getAllocatedContainers()) { + if (!scheduledStreams.isEmpty()) { + ContainerSimulator cs = scheduledStreams.remove(); + LOG.debug("Application {} starts to launch a stream ({}).", appId, + container.getId()); + assignedStreams.put(container.getId(), cs); + se.getNmMap().get(container.getNodeId()).addNewContainer(container, + cs.getLifeTime()); + } + } + } + } + + /** + * restart running because of the am container killed. + */ + private void restart() + throws YarnException, IOException, InterruptedException { + // clear + isFinished = false; + pendingStreams.clear(); + pendingStreams.addAll(allStreams); + + amContainer = null; + } + + private List mergeLists(List left, + List right) { + List list = new ArrayList<>(); + list.addAll(left); + list.addAll(right); + return list; + } + + @Override + protected void sendContainerRequest() + throws YarnException, IOException, InterruptedException { + + // send out request + List ask = new ArrayList<>(); + List release = new ArrayList<>(); + if (!isFinished) { + if (!pendingStreams.isEmpty()) { + ask = packageRequests(mergeLists(pendingStreams, scheduledStreams), + PRIORITY_MAP); + LOG.debug("Application {} sends out request for {} streams.", + appId, pendingStreams.size()); + scheduledStreams.addAll(pendingStreams); + pendingStreams.clear(); + } + } + + if(isFinished){ + release.addAll(assignedStreams.keySet()); + ask.clear(); + } + + final AllocateRequest request = createAllocateRequest(ask, release); + if (totalContainers == 0) { + request.setProgress(1.0f); + } else { + request.setProgress((float) finishedContainers / totalContainers); + } + + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(appAttemptId.toString()); + Token token = rm.getRMContext().getRMApps() + .get(appAttemptId.getApplicationId()) + .getRMAppAttempt(appAttemptId).getAMRMToken(); + ugi.addTokenIdentifier(token.decodeIdentifier()); + AllocateResponse response = ugi.doAs( + new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + return rm.getApplicationMasterService().allocate(request); + } + }); + if (response != null) { + responseQueue.put(response); + } + } + + @Override + public void initReservation( + ReservationId reservationId, long deadline, long now){ + // Streaming AM currently doesn't do reservations + setReservationRequest(null); + } + + @Override + protected void checkStop() { + if (isFinished) { + super.setEndTime(System.currentTimeMillis()); + } + } + + @Override + public void lastStep() throws Exception { + super.lastStep(); + + // clear data structures + allStreams.clear(); + assignedStreams.clear(); + pendingStreams.clear(); + scheduledStreams.clear(); + responseQueue.clear(); + } +} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java new file mode 100644 index 0000000000..ead315be5b --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Application Master simulators for the SLS. + */ +package org.apache.hadoop.yarn.sls.appmaster; \ No newline at end of file diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java index 3ed81e1f2c..27156c7403 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java @@ -19,19 +19,25 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.math3.distribution.LogNormalDistribution; import org.apache.commons.math3.random.JDKRandomGenerator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskStatus.State; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.tools.rumen.*; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo; +import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo; +import org.apache.hadoop.tools.rumen.TaskAttemptInfo; +import org.apache.hadoop.tools.rumen.TaskInfo; import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; +import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -46,6 +52,9 @@ public class SynthJob implements JobStory { @SuppressWarnings("StaticVariableName") private static Log LOG = LogFactory.getLog(SynthJob.class); + private static final long MIN_MEMORY = 1024; + private static final long MIN_VCORES = 1; + private final Configuration conf; private final int id; @@ -53,75 +62,93 @@ public class SynthJob implements JobStory { private static final AtomicInteger sequence = new AtomicInteger(0); private final String name; private final String queueName; - private final SynthJobClass jobClass; + private final SynthTraceJobProducer.JobDefinition jobDef; + + private String type; // job timing private final long submitTime; private final long duration; private final long deadline; - private final int numMapTasks; - private final int numRedTasks; - private final long mapMaxMemory; - private final long reduceMaxMemory; - private final long mapMaxVcores; - private final long reduceMaxVcores; - private final long[] mapRuntime; - private final float[] reduceRuntime; - private long totMapRuntime; - private long totRedRuntime; + private Map params; - public SynthJob(JDKRandomGenerator rand, Configuration conf, - SynthJobClass jobClass, long actualSubmissionTime) { + private long totalSlotTime = 0; + + // task information + private List tasks = new ArrayList<>(); + private Map> taskByType = new HashMap<>(); + private Map taskCounts = new HashMap<>(); + private Map taskMemory = new HashMap<>(); + private Map taskVcores = new HashMap<>(); + + /** + * Nested class used to represent a task instance in a job. Each task + * corresponds to one container allocation for the job. + */ + public static final class SynthTask{ + private String type; + private long time; + private long maxMemory; + private long maxVcores; + private int priority; + + private SynthTask(String type, long time, long maxMemory, long maxVcores, + int priority){ + this.type = type; + this.time = time; + this.maxMemory = maxMemory; + this.maxVcores = maxVcores; + this.priority = priority; + } + + public String getType(){ + return type; + } + + public long getTime(){ + return time; + } + + public long getMemory(){ + return maxMemory; + } + + public long getVcores(){ + return maxVcores; + } + + public int getPriority(){ + return priority; + } + + @Override + public String toString(){ + return String.format("[task]\ttype: %1$-10s\ttime: %2$3s\tmemory: " + + "%3$4s\tvcores: %4$2s%n", getType(), getTime(), getMemory(), + getVcores()); + } + } + + + protected SynthJob(JDKRandomGenerator rand, Configuration conf, + SynthTraceJobProducer.JobDefinition jobDef, + String queue, long actualSubmissionTime) { this.conf = conf; - this.jobClass = jobClass; + this.jobDef = jobDef; - this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS); - this.numMapTasks = jobClass.getMtasks(); - this.numRedTasks = jobClass.getRtasks(); + this.queueName = queue; - // sample memory distributions, correct for sub-minAlloc sizes - long tempMapMaxMemory = jobClass.getMapMaxMemory(); - this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB - ? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory; - long tempReduceMaxMemory = jobClass.getReduceMaxMemory(); - this.reduceMaxMemory = - tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB - ? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory; - - // sample vcores distributions, correct for sub-minAlloc sizes - long tempMapMaxVCores = jobClass.getMapMaxVcores(); - this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES - ? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores; - long tempReduceMaxVcores = jobClass.getReduceMaxVcores(); - this.reduceMaxVcores = - tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES - ? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores; - - if (numMapTasks > 0) { - conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory); - conf.set(MRJobConfig.MAP_JAVA_OPTS, - "-Xmx" + (this.mapMaxMemory - 100) + "m"); - } - - if (numRedTasks > 0) { - conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory); - conf.set(MRJobConfig.REDUCE_JAVA_OPTS, - "-Xmx" + (this.reduceMaxMemory - 100) + "m"); - } + this.duration = MILLISECONDS.convert(jobDef.duration.getInt(), + SECONDS); boolean hasDeadline = - (rand.nextDouble() <= jobClass.jobClass.chance_of_reservation); + (rand.nextDouble() <= jobDef.reservation.getDouble()); - LogNormalDistribution deadlineFactor = - SynthUtils.getLogNormalDist(rand, jobClass.jobClass.deadline_factor_avg, - jobClass.jobClass.deadline_factor_stddev); + double deadlineFactorSample = jobDef.deadline_factor.getDouble(); - double deadlineFactorSample = - (deadlineFactor != null) ? deadlineFactor.sample() : -1; - - this.queueName = jobClass.workload.getQueueName(); + this.type = jobDef.type; this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS); @@ -129,6 +156,8 @@ public SynthJob(JDKRandomGenerator rand, Configuration conf, hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS) + (long) Math.ceil(deadlineFactorSample * duration) : -1; + this.params = jobDef.params; + conf.set(QUEUE_NAME, queueName); // name and initialize job randomness @@ -136,75 +165,145 @@ public SynthJob(JDKRandomGenerator rand, Configuration conf, rand.setSeed(seed); id = sequence.getAndIncrement(); - name = String.format(jobClass.getClassName() + "_%06d", id); + name = String.format(jobDef.class_name + "_%06d", id); LOG.debug(name + " (" + seed + ")"); LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime + " deadline:" + deadline + " duration:" + duration + " deadline-submission: " + (deadline - submitTime)); - // generate map and reduce runtimes - mapRuntime = new long[numMapTasks]; - for (int i = 0; i < numMapTasks; i++) { - mapRuntime[i] = jobClass.getMapTimeSample(); - totMapRuntime += mapRuntime[i]; - } - reduceRuntime = new float[numRedTasks]; - for (int i = 0; i < numRedTasks; i++) { - reduceRuntime[i] = jobClass.getReduceTimeSample(); - totRedRuntime += (long) Math.ceil(reduceRuntime[i]); + // Expand tasks + for(SynthTraceJobProducer.TaskDefinition task : jobDef.tasks){ + int num = task.count.getInt(); + String taskType = task.type; + long memory = task.max_memory.getLong(); + memory = memory < MIN_MEMORY ? MIN_MEMORY: memory; + long vcores = task.max_vcores.getLong(); + vcores = vcores < MIN_VCORES ? MIN_VCORES : vcores; + int priority = task.priority; + + // Save task information by type + taskByType.put(taskType, new ArrayList<>()); + taskCounts.put(taskType, num); + taskMemory.put(taskType, memory); + taskVcores.put(taskType, vcores); + + for(int i = 0; i < num; ++i){ + long time = task.time.getLong(); + totalSlotTime += time; + SynthTask t = new SynthTask(taskType, time, memory, vcores, + priority); + tasks.add(t); + taskByType.get(taskType).add(t); + } } + + } + + public String getType(){ + return type; + } + + public List getTasks(){ + return tasks; } public boolean hasDeadline() { return deadline > 0; } - @Override public String getName() { return name; } - @Override public String getUser() { - return jobClass.getUserName(); + return jobDef.user_name; } - @Override public JobID getJobID() { return new JobID("job_mock_" + name, id); } - @Override - public Values getOutcome() { - return Values.SUCCESS; - } - - @Override public long getSubmissionTime() { return submitTime; } + public String getQueueName() { + return queueName; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + String res = "\nSynthJob [" + jobDef.class_name + "]: \n" + + "\tname: " + getName() + "\n" + + "\ttype: " + getType() + "\n" + + "\tid: " + id + "\n" + + "\tqueue: " + getQueueName() + "\n" + + "\tsubmission: " + getSubmissionTime() + "\n" + + "\tduration: " + getDuration() + "\n" + + "\tdeadline: " + getDeadline() + "\n"; + sb.append(res); + int taskno = 0; + for(SynthJob.SynthTask t : getTasks()){ + sb.append("\t"); + sb.append(taskno); + sb.append(": \t"); + sb.append(t.toString()); + taskno++; + } + return sb.toString(); + } + + public long getTotalSlotTime() { + return totalSlotTime; + } + + public long getDuration() { + return duration; + } + + public long getDeadline() { + return deadline; + } + + public Map getParams() { + return params; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SynthJob)) { + return false; + } + SynthJob o = (SynthJob) other; + return tasks.equals(o.tasks) + && submitTime == o.submitTime + && type.equals(o.type) + && queueName.equals(o.queueName) + && jobDef.class_name.equals(o.jobDef.class_name); + } + + @Override + public int hashCode() { + return jobDef.class_name.hashCode() + * (int) submitTime * (int) duration; + } + + + @Override + public JobConf getJobConf() { + return new JobConf(conf); + } + @Override public int getNumberMaps() { - return numMapTasks; + return taskCounts.get(MRAMSimulator.MAP_TYPE); } @Override public int getNumberReduces() { - return numRedTasks; - } - - @Override - public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { - switch (taskType) { - case MAP: - return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores); - case REDUCE: - return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores); - default: - throw new IllegalArgumentException("Not interested"); - } + return taskCounts.get(MRAMSimulator.REDUCE_TYPE); } @Override @@ -212,23 +311,43 @@ public InputSplit[] getInputSplits() { throw new UnsupportedOperationException(); } + @Override + public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) { + switch(taskType){ + case MAP: + return new TaskInfo(-1, -1, -1, -1, + taskMemory.get(MRAMSimulator.MAP_TYPE), + taskVcores.get(MRAMSimulator.MAP_TYPE)); + case REDUCE: + return new TaskInfo(-1, -1, -1, -1, + taskMemory.get(MRAMSimulator.REDUCE_TYPE), + taskVcores.get(MRAMSimulator.REDUCE_TYPE)); + default: + break; + } + throw new UnsupportedOperationException(); + } + @Override public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber, int taskAttemptNumber) { switch (taskType) { case MAP: return new MapTaskAttemptInfo(State.SUCCEEDED, - getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null); - + getTaskInfo(taskType, taskNumber), + taskByType.get(MRAMSimulator.MAP_TYPE).get(taskNumber).time, + null); case REDUCE: // We assume uniform split between pull/sort/reduce // aligned with naive progress reporting assumptions return new ReduceTaskAttemptInfo(State.SUCCEEDED, getTaskInfo(taskType, taskNumber), - (long) Math.round((reduceRuntime[taskNumber] / 3)), - (long) Math.round((reduceRuntime[taskNumber] / 3)), - (long) Math.round((reduceRuntime[taskNumber] / 3)), null); - + taskByType.get(MRAMSimulator.MAP_TYPE) + .get(taskNumber).time / 3, + taskByType.get(MRAMSimulator.MAP_TYPE) + .get(taskNumber).time / 3, + taskByType.get(MRAMSimulator.MAP_TYPE) + .get(taskNumber).time / 3, null); default: break; } @@ -242,65 +361,7 @@ public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber, } @Override - public org.apache.hadoop.mapred.JobConf getJobConf() { - return new JobConf(conf); - } - - @Override - public String getQueueName() { - return queueName; - } - - @Override - public String toString() { - return "SynthJob [\n" + " workload=" + jobClass.getWorkload().getId() - + "\n" + " jobClass=" - + jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n" - + " conf=" + conf + ",\n" + " id=" + id + ",\n" + " name=" + name - + ",\n" + " mapRuntime=" + Arrays.toString(mapRuntime) + ",\n" - + " reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n" - + " submitTime=" + submitTime + ",\n" + " numMapTasks=" + numMapTasks - + ",\n" + " numRedTasks=" + numRedTasks + ",\n" + " mapMaxMemory=" - + mapMaxMemory + ",\n" + " reduceMaxMemory=" + reduceMaxMemory + ",\n" - + " queueName=" + queueName + "\n" + "]"; - } - - public SynthJobClass getJobClass() { - return jobClass; - } - - public long getTotalSlotTime() { - return totMapRuntime + totRedRuntime; - } - - public long getDuration() { - return duration; - } - - public long getDeadline() { - return deadline; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof SynthJob)) { - return false; - } - SynthJob o = (SynthJob) other; - return Arrays.equals(mapRuntime, o.mapRuntime) - && Arrays.equals(reduceRuntime, o.reduceRuntime) - && submitTime == o.submitTime && numMapTasks == o.numMapTasks - && numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory - && reduceMaxMemory == o.reduceMaxMemory - && mapMaxVcores == o.mapMaxVcores - && reduceMaxVcores == o.reduceMaxVcores && queueName.equals(o.queueName) - && jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime - && totRedRuntime == o.totRedRuntime; - } - - @Override - public int hashCode() { - // could have a bad distr; investigate if a relevant use case exists - return jobClass.hashCode() * (int) submitTime; + public Values getOutcome() { + return Values.SUCCESS; } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java deleted file mode 100644 index 439698f8a4..0000000000 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.sls.synthetic; - -import org.apache.commons.math3.distribution.AbstractRealDistribution; -import org.apache.commons.math3.distribution.LogNormalDistribution; -import org.apache.commons.math3.random.JDKRandomGenerator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.tools.rumen.JobStory; -import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass; -import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; - -/** - * This is a class that represent a class of Jobs. It is used to generate an - * individual job, by picking random durations, task counts, container size, - * etc. - */ -public class SynthJobClass { - - private final JDKRandomGenerator rand; - private final LogNormalDistribution dur; - private final LogNormalDistribution mapRuntime; - private final LogNormalDistribution redRuntime; - private final LogNormalDistribution mtasks; - private final LogNormalDistribution rtasks; - private final LogNormalDistribution mapMem; - private final LogNormalDistribution redMem; - private final LogNormalDistribution mapVcores; - private final LogNormalDistribution redVcores; - - private final Trace trace; - @SuppressWarnings("VisibilityModifier") - protected final SynthWorkload workload; - @SuppressWarnings("VisibilityModifier") - protected final JobClass jobClass; - - public SynthJobClass(JDKRandomGenerator rand, Trace trace, - SynthWorkload workload, int classId) { - - this.trace = trace; - this.workload = workload; - this.rand = new JDKRandomGenerator(); - this.rand.setSeed(rand.nextLong()); - jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId); - - this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg, - jobClass.dur_stddev); - this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg, - jobClass.mtime_stddev); - this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg, - jobClass.rtime_stddev); - this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg, - jobClass.mtasks_stddev); - this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg, - jobClass.rtasks_stddev); - - this.mapMem = SynthUtils.getLogNormalDist(rand, jobClass.map_max_memory_avg, - jobClass.map_max_memory_stddev); - this.redMem = SynthUtils.getLogNormalDist(rand, - jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev); - this.mapVcores = SynthUtils.getLogNormalDist(rand, - jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev); - this.redVcores = SynthUtils.getLogNormalDist(rand, - jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev); - } - - public JobStory getJobStory(Configuration conf, long actualSubmissionTime) { - return new SynthJob(rand, conf, this, actualSubmissionTime); - } - - @Override - public String toString() { - return "SynthJobClass [workload=" + workload.getName() + ", class=" - + jobClass.class_name + " job_count=" + jobClass.class_weight + ", dur=" - + ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime=" - + ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0) - + ", redRuntime=" - + ((redRuntime != null) ? redRuntime.getNumericalMean() : 0) - + ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0) - + ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0) - + ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n"; - - } - - public double getClassWeight() { - return jobClass.class_weight; - } - - public long getDur() { - return genLongSample(dur); - } - - public int getMtasks() { - return genIntSample(mtasks); - } - - public int getRtasks() { - return genIntSample(rtasks); - } - - public long getMapMaxMemory() { - return genLongSample(mapMem); - } - - public long getReduceMaxMemory() { - return genLongSample(redMem); - } - - public long getMapMaxVcores() { - return genLongSample(mapVcores); - } - - public long getReduceMaxVcores() { - return genLongSample(redVcores); - } - - public SynthWorkload getWorkload() { - return workload; - } - - public int genIntSample(AbstractRealDistribution dist) { - if (dist == null) { - return 0; - } - double baseSample = dist.sample(); - if (baseSample < 0) { - baseSample = 0; - } - return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample)); - } - - public long genLongSample(AbstractRealDistribution dist) { - return dist != null ? (long) Math.ceil(dist.sample()) : 0; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof SynthJobClass)) { - return false; - } - SynthJobClass o = (SynthJobClass) other; - return workload.equals(o.workload); - } - - @Override - public int hashCode() { - return workload.hashCode() * workload.getId(); - } - - public String getClassName() { - return jobClass.class_name; - } - - public long getMapTimeSample() { - return genLongSample(mapRuntime); - } - - public long getReduceTimeSample() { - return genLongSample(redRuntime); - } - - public String getUserName() { - return jobClass.user_name; - } -} diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java index c89e4e26a5..09bc9b97d6 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java @@ -19,6 +19,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.math3.distribution.AbstractRealDistribution; import org.apache.commons.math3.random.JDKRandomGenerator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -26,7 +27,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.tools.rumen.JobStory; import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator; +import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; import javax.xml.bind.annotation.XmlRootElement; @@ -39,7 +44,7 @@ /** * This is a JobStoryProducer that operates from distribution of different - * workloads. The .json input file is used to determine how many jobs, which + * workloads. The .json input file is used to determine how many weight, which * size, number of maps/reducers and their duration, as well as the temporal * distributed of submissions. For each parameter we control avg and stdev, and * generate values via normal or log-normal distributions. @@ -55,8 +60,6 @@ public class SynthTraceJobProducer implements JobStoryProducer { private final long seed; private int totalWeight; - private final List weightList; - private final Map workloads; private final Queue listStoryParams; @@ -65,6 +68,9 @@ public class SynthTraceJobProducer implements JobStoryProducer { public static final String SLS_SYNTHETIC_TRACE_FILE = "sls.synthetic" + ".trace_file"; + private final static int DEFAULT_MAPPER_PRIORITY = 20; + private final static int DEFAULT_REDUCER_PRIORITY = 10; + public SynthTraceJobProducer(Configuration conf) throws IOException { this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE))); } @@ -76,8 +82,6 @@ public SynthTraceJobProducer(Configuration conf, Path path) this.conf = conf; this.rand = new JDKRandomGenerator(); - workloads = new HashMap(); - weightList = new ArrayList(); ObjectMapper mapper = new ObjectMapper(); mapper.configure(INTERN_FIELD_NAMES, true); @@ -86,44 +90,132 @@ public SynthTraceJobProducer(Configuration conf, Path path) FileSystem ifs = path.getFileSystem(conf); FSDataInputStream fileIn = ifs.open(path); + // Initialize the random generator and the seed this.trace = mapper.readValue(fileIn, Trace.class); - seed = trace.rand_seed; - rand.setSeed(seed); + this.seed = trace.rand_seed; + this.rand.setSeed(seed); + // Initialize the trace + this.trace.init(rand); this.numJobs = new AtomicInteger(trace.num_jobs); - for (int workloadId = 0; workloadId < trace.workloads - .size(); workloadId++) { - SynthWorkload workload = new SynthWorkload(workloadId, trace); - for (int classId = - 0; classId < trace.workloads.get(workloadId).job_classes - .size(); classId++) { - SynthJobClass cls = new SynthJobClass(rand, trace, workload, classId); - workload.add(cls); - } - workloads.put(workloadId, workload); - } - - for (int i = 0; i < workloads.size(); i++) { - double w = workloads.get(i).getWorkloadWeight(); + for (Double w : trace.workload_weights) { totalWeight += w; - weightList.add(w); } + // Initialize our story parameters + listStoryParams = createStory(); + + LOG.info("Generated " + listStoryParams.size() + " deadlines for " + + this.numJobs.get() + " jobs"); + } + + // StoryParams hold the minimum amount of information needed to completely + // specify a job run: job definition, start time, and queue. + // This allows us to create "jobs" and then order them according to start time + static class StoryParams { + // Time the job gets submitted to + private long actualSubmissionTime; + // The queue the job gets submitted to + private String queue; + // Definition to construct the job from + private JobDefinition jobDef; + + StoryParams(long actualSubmissionTime, String queue, JobDefinition jobDef) { + this.actualSubmissionTime = actualSubmissionTime; + this.queue = queue; + this.jobDef = jobDef; + } + } + + + private Queue createStory() { // create priority queue to keep start-time sorted - listStoryParams = - new PriorityQueue(10, new Comparator() { + Queue storyQueue = + new PriorityQueue<>(this.numJobs.get(), new Comparator() { @Override public int compare(StoryParams o1, StoryParams o2) { return Math - .toIntExact(o2.actualSubmissionTime - o1.actualSubmissionTime); + .toIntExact(o1.actualSubmissionTime - o2.actualSubmissionTime); } }); + for (int i = 0; i < numJobs.get(); i++) { + // Generate a workload + Workload wl = trace.generateWorkload(); + // Save all the parameters needed to completely define a job + long actualSubmissionTime = wl.generateSubmissionTime(); + String queue = wl.queue_name; + JobDefinition job = wl.generateJobDefinition(); + storyQueue.add(new StoryParams(actualSubmissionTime, queue, job)); + } + return storyQueue; + } - // initialize it - createStoryParams(); - LOG.info("Generated " + listStoryParams.size() + " deadlines for " - + this.numJobs.get() + " jobs "); + @Override + public JobStory getNextJob() throws IOException { + if (numJobs.decrementAndGet() < 0) { + return null; + } + StoryParams storyParams = listStoryParams.poll(); + return new SynthJob(rand, conf, storyParams.jobDef, storyParams.queue, + storyParams.actualSubmissionTime); + } + + @Override + public void close(){ + } + + @Override + public String toString() { + return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs + + ", r=" + rand + ", totalWeight=" + + totalWeight + ", workloads=" + trace.workloads + "]"; + } + + public int getNumJobs() { + return trace.num_jobs; + } + + // Helper to parse and maintain backwards compatibility with + // syn json formats + private static void validateJobDef(JobDefinition jobDef){ + if(jobDef.tasks == null) { + LOG.info("Detected old JobDefinition format. Converting."); + try { + jobDef.tasks = new ArrayList<>(); + jobDef.type = "mapreduce"; + jobDef.deadline_factor = new Sample(jobDef.deadline_factor_avg, + jobDef.deadline_factor_stddev); + jobDef.duration = new Sample(jobDef.dur_avg, + jobDef.dur_stddev); + jobDef.reservation = new Sample(jobDef.chance_of_reservation); + + TaskDefinition map = new TaskDefinition(); + map.type = MRAMSimulator.MAP_TYPE; + map.count = new Sample(jobDef.mtasks_avg, jobDef.mtasks_stddev); + map.time = new Sample(jobDef.mtime_avg, jobDef.mtime_stddev); + map.max_memory = new Sample((double) jobDef.map_max_memory_avg, + jobDef.map_max_memory_stddev); + map.max_vcores = new Sample((double) jobDef.map_max_vcores_avg, + jobDef.map_max_vcores_stddev); + map.priority = DEFAULT_MAPPER_PRIORITY; + + jobDef.tasks.add(map); + TaskDefinition reduce = new TaskDefinition(); + reduce.type = MRAMSimulator.REDUCE_TYPE; + reduce.count = new Sample(jobDef.rtasks_avg, jobDef.rtasks_stddev); + reduce.time = new Sample(jobDef.rtime_avg, jobDef.rtime_stddev); + reduce.max_memory = new Sample((double) jobDef.reduce_max_memory_avg, + jobDef.reduce_max_memory_stddev); + reduce.max_vcores = new Sample((double) jobDef.reduce_max_vcores_avg, + jobDef.reduce_max_vcores_stddev); + reduce.priority = DEFAULT_REDUCER_PRIORITY; + + jobDef.tasks.add(reduce); + } catch (JsonMappingException e) { + LOG.warn("Error converting old JobDefinition format", e); + } + } } public long getSeed() { @@ -159,6 +251,25 @@ public static class Trace { @JsonProperty("workloads") List workloads; + List workload_weights; + JDKRandomGenerator rand; + + public void init(JDKRandomGenerator random){ + this.rand = random; + // Pass rand forward + for(Workload w : workloads){ + w.init(rand); + } + // Initialize workload weights + workload_weights = new ArrayList<>(); + for(Workload w : workloads){ + workload_weights.add(w.workload_weight); + } + } + + Workload generateWorkload(){ + return workloads.get(SynthUtils.getWeighted(workload_weights, rand)); + } } /** @@ -174,16 +285,67 @@ public static class Workload { @JsonProperty("queue_name") String queue_name; @JsonProperty("job_classes") - List job_classes; + List job_classes; @JsonProperty("time_distribution") List time_distribution; + + JDKRandomGenerator rand; + + List job_weights; + List time_weights; + + public void init(JDKRandomGenerator random){ + this.rand = random; + // Validate and pass rand forward + for(JobDefinition def : job_classes){ + validateJobDef(def); + def.init(rand); + } + + // Initialize job weights + job_weights = new ArrayList<>(); + job_weights = new ArrayList<>(); + for(JobDefinition j : job_classes){ + job_weights.add(j.class_weight); + } + + // Initialize time weights + time_weights = new ArrayList<>(); + for(TimeSample ts : time_distribution){ + time_weights.add(ts.weight); + } + } + + public long generateSubmissionTime(){ + int index = SynthUtils.getWeighted(time_weights, rand); + // Retrieve the lower and upper bounds for this time "bucket" + int start = time_distribution.get(index).time; + // Get the beginning of the next time sample (if it exists) + index = (index+1)0 ? rand.nextInt(range) : 0); + } + + public JobDefinition generateJobDefinition(){ + return job_classes.get(SynthUtils.getWeighted(job_weights, rand)); + } + + @Override + public String toString(){ + return "\nWorkload " + workload_name + ", weight: " + workload_weight + + ", queue: " + queue_name + " " + + job_classes.toString().replace("\n", "\n\t"); + } } /** * Class used to parse a job class from file. */ @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) - public static class JobClass { + public static class JobDefinition { @JsonProperty("class_name") String class_name; @@ -194,6 +356,23 @@ public static class JobClass { @JsonProperty("class_weight") double class_weight; + // am type to launch + @JsonProperty("type") + String type; + @JsonProperty("deadline_factor") + Sample deadline_factor; + @JsonProperty("duration") + Sample duration; + @JsonProperty("reservation") + Sample reservation; + + @JsonProperty("tasks") + List tasks; + + @JsonProperty("params") + Map params; + + // Old JSON fields for backwards compatibility // reservation related params @JsonProperty("chance_of_reservation") double chance_of_reservation; @@ -246,6 +425,215 @@ public static class JobClass { @JsonProperty("reduce_max_vcores_stddev") double reduce_max_vcores_stddev; + public void init(JDKRandomGenerator rand){ + deadline_factor.init(rand); + duration.init(rand); + reservation.init(rand); + + for(TaskDefinition t : tasks){ + t.count.init(rand); + t.time.init(rand); + t.max_memory.init(rand); + t.max_vcores.init(rand); + } + } + + @Override + public String toString(){ + return "\nJobDefinition " + class_name + ", weight: " + class_weight + + ", type: " + type + " " + + tasks.toString().replace("\n", "\n\t"); + } + } + + /** + * A task representing a type of container - e.g. "map" in mapreduce + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + public static class TaskDefinition { + + @JsonProperty("type") + String type; + @JsonProperty("count") + Sample count; + @JsonProperty("time") + Sample time; + @JsonProperty("max_memory") + Sample max_memory; + @JsonProperty("max_vcores") + Sample max_vcores; + @JsonProperty("priority") + int priority; + + @Override + public String toString(){ + return "\nTaskDefinition " + type + + " Count[" + count + "] Time[" + time + "] Memory[" + max_memory + + "] Vcores[" + max_vcores + "] Priority[" + priority + "]"; + } + } + + /** + * Class used to parse value sample information. + */ + @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" }) + public static class Sample { + private static final Dist DEFAULT_DIST = Dist.LOGNORM; + + private final double val; + private final double std; + private final Dist dist; + private AbstractRealDistribution dist_instance; + private final List discrete; + private final List weights; + private final Mode mode; + + private JDKRandomGenerator rand; + + private enum Mode{ + CONST, + DIST, + DISC + } + + private enum Dist{ + LOGNORM, + NORM + } + + public Sample(Double val) throws JsonMappingException{ + this(val, null); + } + + public Sample(Double val, Double std) throws JsonMappingException{ + this(val, std, null, null, null); + } + + @JsonCreator + public Sample(@JsonProperty("val") Double val, + @JsonProperty("std") Double std, @JsonProperty("dist") String dist, + @JsonProperty("discrete") List discrete, + @JsonProperty("weights") List weights) + throws JsonMappingException{ + // Different Modes + // - Constant: val must be specified, all else null. Sampling will + // return val. + // - Distribution: val, std specified, dist optional (defaults to + // LogNormal). Sampling will sample from the appropriate distribution + // - Discrete: discrete must be set to a list of strings or numbers, + // weights optional (defaults to uniform) + + if(val!=null){ + if(std==null){ + // Constant + if(dist!=null || discrete!=null || weights!=null){ + throw new JsonMappingException("Instantiation of " + Sample.class + + " failed"); + } + mode = Mode.CONST; + this.val = val; + this.std = 0; + this.dist = null; + this.discrete = null; + this.weights = null; + } else { + // Distribution + if(discrete!=null || weights != null){ + throw new JsonMappingException("Instantiation of " + Sample.class + + " failed"); + } + mode = Mode.DIST; + this.val = val; + this.std = std; + this.dist = dist!=null ? Dist.valueOf(dist) : DEFAULT_DIST; + this.discrete = null; + this.weights = null; + } + } else { + // Discrete + if(discrete==null){ + throw new JsonMappingException("Instantiation of " + Sample.class + + " failed"); + } + mode = Mode.DISC; + this.val = 0; + this.std = 0; + this.dist = null; + this.discrete = discrete; + if(weights == null){ + weights = new ArrayList<>(Collections.nCopies( + discrete.size(), 1.0)); + } + if(weights.size() != discrete.size()){ + throw new JsonMappingException("Instantiation of " + Sample.class + + " failed"); + } + this.weights = weights; + } + } + + public void init(JDKRandomGenerator random){ + if(this.rand != null){ + throw new YarnRuntimeException("init called twice"); + } + this.rand = random; + if(mode == Mode.DIST){ + switch(this.dist){ + case LOGNORM: + this.dist_instance = SynthUtils.getLogNormalDist(rand, val, std); + return; + case NORM: + this.dist_instance = SynthUtils.getNormalDist(rand, val, std); + return; + default: + throw new YarnRuntimeException("Unknown distribution " + dist.name()); + } + } + } + + public int getInt(){ + return Math.toIntExact(getLong()); + } + + public long getLong(){ + return Math.round(getDouble()); + } + + public double getDouble(){ + return Double.parseDouble(getString()); + } + + public String getString(){ + if(this.rand == null){ + throw new YarnRuntimeException("getValue called without init"); + } + switch(mode){ + case CONST: + return Double.toString(val); + case DIST: + return Double.toString(dist_instance.sample()); + case DISC: + return this.discrete.get(SynthUtils.getWeighted(this.weights, rand)); + default: + throw new YarnRuntimeException("Unknown sampling mode " + mode.name()); + } + } + + @Override + public String toString(){ + switch(mode){ + case CONST: + return "value: " + Double.toString(val); + case DIST: + return "value: " + this.val + " std: " + this.std + " dist: " + + this.dist.name(); + case DISC: + return "discrete: " + this.discrete + ", weights: " + this.weights; + default: + throw new YarnRuntimeException("Unknown sampling mode " + mode.name()); + } + } + } /** @@ -258,59 +646,6 @@ public static class TimeSample { @JsonProperty("time") int time; @JsonProperty("weight") - double jobs; + double weight; } - - static class StoryParams { - private SynthJobClass pickedJobClass; - private long actualSubmissionTime; - - StoryParams(SynthJobClass pickedJobClass, long actualSubmissionTime) { - this.pickedJobClass = pickedJobClass; - this.actualSubmissionTime = actualSubmissionTime; - } - } - - - void createStoryParams() { - - for (int i = 0; i < numJobs.get(); i++) { - int workload = SynthUtils.getWeighted(weightList, rand); - SynthWorkload pickedWorkload = workloads.get(workload); - long jobClass = - SynthUtils.getWeighted(pickedWorkload.getWeightList(), rand); - SynthJobClass pickedJobClass = - pickedWorkload.getClassList().get((int) jobClass); - long actualSubmissionTime = pickedWorkload.getBaseSubmissionTime(rand); - // long actualSubmissionTime = (i + 1) * 10; - listStoryParams - .add(new StoryParams(pickedJobClass, actualSubmissionTime)); - } - } - - @Override - public JobStory getNextJob() throws IOException { - if (numJobs.decrementAndGet() < 0) { - return null; - } - StoryParams storyParams = listStoryParams.poll(); - return storyParams.pickedJobClass.getJobStory(conf, - storyParams.actualSubmissionTime); - } - - @Override - public void close() { - } - - @Override - public String toString() { - return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs - + ", weightList=" + weightList + ", r=" + rand + ", totalWeight=" - + totalWeight + ", workloads=" + workloads + "]"; - } - - public int getNumJobs() { - return trace.num_jobs; - } - } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java deleted file mode 100644 index 9e5fd4ef74..0000000000 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.sls.synthetic; - -import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace; - -import java.util.*; - -/** - * This class represent a workload (made up of multiple SynthJobClass(es)). It - * also stores the temporal distributions of jobs in this workload. - */ -public class SynthWorkload { - - private final int id; - private final List classList; - private final Trace trace; - private final SortedMap timeWeights; - - public SynthWorkload(int identifier, Trace inTrace) { - classList = new ArrayList(); - this.id = identifier; - this.trace = inTrace; - timeWeights = new TreeMap(); - for (SynthTraceJobProducer.TimeSample ts : trace.workloads - .get(id).time_distribution) { - timeWeights.put(ts.time, ts.jobs); - } - } - - public boolean add(SynthJobClass s) { - return classList.add(s); - } - - public List getWeightList() { - ArrayList ret = new ArrayList(); - for (SynthJobClass s : classList) { - ret.add(s.getClassWeight()); - } - return ret; - } - - public int getId() { - return id; - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof SynthWorkload)) { - return false; - } - // assume ID determines job classes by construction - return getId() == ((SynthWorkload) other).getId(); - } - - @Override - public int hashCode() { - return getId(); - } - - @Override - public String toString() { - return "SynthWorkload " + trace.workloads.get(id).workload_name + "[\n" - + classList + "]\n"; - } - - public String getName() { - return trace.workloads.get(id).workload_name; - } - - public double getWorkloadWeight() { - return trace.workloads.get(id).workload_weight; - } - - public String getQueueName() { - return trace.workloads.get(id).queue_name; - } - - public long getBaseSubmissionTime(Random rand) { - - // pick based on weights the "bucket" for this start time - int position = SynthUtils.getWeighted(timeWeights.values(), rand); - - int[] time = new int[timeWeights.keySet().size()]; - int index = 0; - for (Integer i : timeWeights.keySet()) { - time[index++] = i; - } - - // uniformly pick a time between start and end time of this bucket - int startRange = time[position]; - int endRange = startRange; - // if there is no subsequent bucket pick startRange - if (position < timeWeights.keySet().size() - 1) { - endRange = time[position + 1]; - return startRange + rand.nextInt((endRange - startRange)); - } else { - return startRange; - } - } - - public List getClassList() { - return classList; - } - -} diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java index 6b369f2a6f..668be145d7 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java @@ -125,7 +125,7 @@ public void uncaughtException(Thread t, Throwable e) { if (!exceptionList.isEmpty()) { sls.stop(); Assert.fail("TestSLSRunner catched exception from child thread " - + "(TaskRunner.Task): " + exceptionList); + + "(TaskRunner.TaskDefinition): " + exceptionList); break; } timeout--; diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java new file mode 100644 index 0000000000..79ebe219bf --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import net.jcip.annotations.NotThreadSafe; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; + +/** + * This test performs simple runs of the SLS with the generic syn json format. + */ +@RunWith(value = Parameterized.class) +@NotThreadSafe +public class TestSLSGenericSynth extends BaseSLSRunnerTest { + + @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})") + public static Collection data() { + + String capScheduler = CapacityScheduler.class.getCanonicalName(); + String fairScheduler = FairScheduler.class.getCanonicalName(); + String synthTraceFile = "src/test/resources/syn_generic.json"; + String nodeFile = "src/test/resources/nodes.json"; + + // Test with both schedulers + return Arrays.asList(new Object[][] { + + // covering the no nodeFile case + {capScheduler, "SYNTH", synthTraceFile, null }, + + // covering new commandline and CapacityScheduler + {capScheduler, "SYNTH", synthTraceFile, nodeFile }, + + // covering FairScheduler + {fairScheduler, "SYNTH", synthTraceFile, nodeFile }, + }); + } + + @Before + public void setup() { + ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt"; + exitInvariantFile = "src/test/resources/exit-invariants.txt"; + } + + @Test(timeout = 90000) + @SuppressWarnings("all") + public void testSimulatorRunning() throws Exception { + Configuration conf = new Configuration(false); + long timeTillShutdownInsec = 20L; + runSLS(conf, timeTillShutdownInsec); + } +} diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java new file mode 100644 index 0000000000..a5d30e02d8 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.sls; + +import net.jcip.annotations.NotThreadSafe; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; + +/** + * This test performs simple runs of the SLS with the generic syn json format. + */ +@RunWith(value = Parameterized.class) +@NotThreadSafe +public class TestSLSStreamAMSynth extends BaseSLSRunnerTest { + + @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})") + public static Collection data() { + + String capScheduler = CapacityScheduler.class.getCanonicalName(); + String fairScheduler = FairScheduler.class.getCanonicalName(); + String synthTraceFile = "src/test/resources/syn_stream.json"; + String nodeFile = "src/test/resources/nodes.json"; + + // Test with both schedulers + return Arrays.asList(new Object[][] { + + // covering the no nodeFile case + {capScheduler, "SYNTH", synthTraceFile, null }, + + // covering new commandline and CapacityScheduler + {capScheduler, "SYNTH", synthTraceFile, nodeFile }, + + // covering FairScheduler + {fairScheduler, "SYNTH", synthTraceFile, nodeFile }, + }); + } + + @Before + public void setup() { + ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt"; + exitInvariantFile = "src/test/resources/exit-invariants.txt"; + } + + @Test(timeout = 90000) + @SuppressWarnings("all") + public void testSimulatorRunning() throws Exception { + Configuration conf = new Configuration(false); + long timeTillShutdownInsec = 20L; + runSLS(conf, timeTillShutdownInsec); + } +} diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java index 2b1971a8ec..794cd47646 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSynthJobGeneration.java @@ -17,20 +17,25 @@ */ package org.apache.hadoop.yarn.sls; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.tools.rumen.TaskAttemptInfo; import org.apache.hadoop.yarn.sls.synthetic.SynthJob; import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import static org.junit.Assert.assertTrue; +import static org.codehaus.jackson.JsonParser.Feature.INTERN_FIELD_NAMES; +import static org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES; + /** * Simple test class driving the {@code SynthTraceJobProducer}, and validating * jobs produce are within expected range. @@ -38,10 +43,60 @@ public class TestSynthJobGeneration { public final static Logger LOG = - Logger.getLogger(TestSynthJobGeneration.class); + LoggerFactory.getLogger(TestSynthJobGeneration.class); @Test - public void test() throws IllegalArgumentException, IOException { + public void testWorkloadGenerateTime() + throws IllegalArgumentException, IOException { + + String workloadJson = "{\"job_classes\": [], \"time_distribution\":[" + + "{\"time\": 0, \"weight\": 1}, " + "{\"time\": 30, \"weight\": 0}," + + "{\"time\": 60, \"weight\": 2}," + "{\"time\": 90, \"weight\": 1}" + + "]}"; + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(INTERN_FIELD_NAMES, true); + mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + SynthTraceJobProducer.Workload wl = + mapper.readValue(workloadJson, SynthTraceJobProducer.Workload.class); + + JDKRandomGenerator rand = new JDKRandomGenerator(); + rand.setSeed(0); + + wl.init(rand); + + int bucket0 = 0; + int bucket1 = 0; + int bucket2 = 0; + int bucket3 = 0; + for (int i = 0; i < 1000; ++i) { + long time = wl.generateSubmissionTime(); + LOG.info("Generated time " + time); + if (time < 30) { + bucket0++; + } else if (time < 60) { + bucket1++; + } else if (time < 90) { + bucket2++; + } else { + bucket3++; + } + } + + Assert.assertTrue(bucket0 > 0); + Assert.assertTrue(bucket1 == 0); + Assert.assertTrue(bucket2 > 0); + Assert.assertTrue(bucket3 > 0); + Assert.assertTrue(bucket2 > bucket0); + Assert.assertTrue(bucket2 > bucket3); + + LOG.info("bucket0 {}, bucket1 {}, bucket2 {}, bucket3 {}", bucket0, bucket1, + bucket2, bucket3); + + } + + @Test + public void testMapReduce() throws IllegalArgumentException, IOException { Configuration conf = new Configuration(); @@ -50,47 +105,155 @@ public void test() throws IllegalArgumentException, IOException { SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf); + LOG.info(stjp.toString()); + SynthJob js = (SynthJob) stjp.getNextJob(); int jobCount = 0; while (js != null) { - LOG.info((jobCount++) + " " + js.getQueueName() + " -- " - + js.getJobClass().getClassName() + " (conf: " - + js.getJobConf().get(MRJobConfig.QUEUE_NAME) + ") " + " submission: " - + js.getSubmissionTime() + ", " + " duration: " + js.getDuration() - + " numMaps: " + js.getNumberMaps() + " numReduces: " - + js.getNumberReduces()); - + LOG.info(js.toString()); validateJob(js); js = (SynthJob) stjp.getNextJob(); + jobCount++; } Assert.assertEquals(stjp.getNumJobs(), jobCount); } + @Test + public void testGeneric() throws IllegalArgumentException, IOException { + Configuration conf = new Configuration(); + + conf.set(SynthTraceJobProducer.SLS_SYNTHETIC_TRACE_FILE, + "src/test/resources/syn_generic.json"); + + SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf); + + LOG.info(stjp.toString()); + + SynthJob js = (SynthJob) stjp.getNextJob(); + + int jobCount = 0; + + while (js != null) { + LOG.info(js.toString()); + validateJob(js); + js = (SynthJob) stjp.getNextJob(); + jobCount++; + } + + Assert.assertEquals(stjp.getNumJobs(), jobCount); + } + + @Test + public void testStream() throws IllegalArgumentException, IOException { + Configuration conf = new Configuration(); + + conf.set(SynthTraceJobProducer.SLS_SYNTHETIC_TRACE_FILE, + "src/test/resources/syn_stream.json"); + + SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf); + + LOG.info(stjp.toString()); + + SynthJob js = (SynthJob) stjp.getNextJob(); + + int jobCount = 0; + + while (js != null) { + LOG.info(js.toString()); + validateJob(js); + js = (SynthJob) stjp.getNextJob(); + jobCount++; + } + + Assert.assertEquals(stjp.getNumJobs(), jobCount); + } + + @Test + public void testSample() throws IOException { + + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(INTERN_FIELD_NAMES, true); + mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + + JDKRandomGenerator rand = new JDKRandomGenerator(); + rand.setSeed(0); + + String valJson = "{\"val\" : 5 }"; + SynthTraceJobProducer.Sample valSample = + mapper.readValue(valJson, SynthTraceJobProducer.Sample.class); + valSample.init(rand); + int val = valSample.getInt(); + Assert.assertEquals(5, val); + + String distJson = "{\"val\" : 5, \"std\" : 1 }"; + SynthTraceJobProducer.Sample distSample = + mapper.readValue(distJson, SynthTraceJobProducer.Sample.class); + distSample.init(rand); + double dist = distSample.getDouble(); + Assert.assertTrue(dist > 2 && dist < 8); + + String normdistJson = "{\"val\" : 5, \"std\" : 1, \"dist\": \"NORM\" }"; + SynthTraceJobProducer.Sample normdistSample = + mapper.readValue(normdistJson, SynthTraceJobProducer.Sample.class); + normdistSample.init(rand); + double normdist = normdistSample.getDouble(); + Assert.assertTrue(normdist > 2 && normdist < 8); + + String discreteJson = "{\"discrete\" : [2, 4, 6, 8]}"; + SynthTraceJobProducer.Sample discreteSample = + mapper.readValue(discreteJson, SynthTraceJobProducer.Sample.class); + discreteSample.init(rand); + int discrete = discreteSample.getInt(); + Assert.assertTrue( + Arrays.asList(new Integer[] {2, 4, 6, 8}).contains(discrete)); + + String discreteWeightsJson = + "{\"discrete\" : [2, 4, 6, 8], " + "\"weights\": [0, 0, 0, 1]}"; + SynthTraceJobProducer.Sample discreteWeightsSample = mapper + .readValue(discreteWeightsJson, SynthTraceJobProducer.Sample.class); + discreteWeightsSample.init(rand); + int discreteWeights = discreteWeightsSample.getInt(); + Assert.assertEquals(8, discreteWeights); + + String invalidJson = "{\"val\" : 5, \"discrete\" : [2, 4, 6, 8], " + + "\"weights\": [0, 0, 0, 1]}"; + try { + mapper.readValue(invalidJson, SynthTraceJobProducer.Sample.class); + Assert.fail(); + } catch (JsonMappingException e) { + Assert.assertTrue(e.getMessage().startsWith("Instantiation of")); + } + + String invalidDistJson = + "{\"val\" : 5, \"std\" : 1, " + "\"dist\": \"INVALID\" }"; + try { + mapper.readValue(invalidDistJson, SynthTraceJobProducer.Sample.class); + Assert.fail(); + } catch (JsonMappingException e) { + Assert.assertTrue(e.getMessage().startsWith("Instantiation of")); + } + } + private void validateJob(SynthJob js) { assertTrue(js.getSubmissionTime() > 0); assertTrue(js.getDuration() > 0); - assertTrue(js.getNumberMaps() >= 0); - assertTrue(js.getNumberReduces() >= 0); - assertTrue(js.getNumberMaps() + js.getNumberReduces() > 0); assertTrue(js.getTotalSlotTime() >= 0); - for (int i = 0; i < js.getNumberMaps(); i++) { - TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.MAP, i, 0); - assertTrue(tai.getRuntime() > 0); - } - - for (int i = 0; i < js.getNumberReduces(); i++) { - TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.REDUCE, i, 0); - assertTrue(tai.getRuntime() > 0); - } - if (js.hasDeadline()) { assertTrue(js.getDeadline() > js.getSubmissionTime() + js.getDuration()); } + assertTrue(js.getTasks().size() > 0); + + for (SynthJob.SynthTask t : js.getTasks()) { + assertTrue(t.getType() != null); + assertTrue(t.getTime() > 0); + assertTrue(t.getMemory() > 0); + assertTrue(t.getVcores() > 0); + } } } diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java index a67845bb8e..bfc7d0c6c3 100644 --- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java +++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java @@ -139,7 +139,7 @@ public void testAMSimulator() throws Exception { String queue = "default"; List containers = new ArrayList<>(); app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true, - appId, 0, SLSConfiguration.getAMContainerResource(conf)); + appId, 0, SLSConfiguration.getAMContainerResource(conf), null); app.firstStep(); verifySchedulerMetrics(appId); diff --git a/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml b/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml index 2f076c27ab..344024a8dc 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml +++ b/hadoop-tools/hadoop-sls/src/test/resources/sls-runner.xml @@ -45,6 +45,10 @@ yarn.sls.am.type.mapreduce org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator + + yarn.sls.am.type.stream + org.apache.hadoop.yarn.sls.appmaster.StreamAMSimulator + diff --git a/hadoop-tools/hadoop-sls/src/test/resources/syn.json b/hadoop-tools/hadoop-sls/src/test/resources/syn.json index 8479d23c31..c6e2c9236e 100644 --- a/hadoop-tools/hadoop-sls/src/test/resources/syn.json +++ b/hadoop-tools/hadoop-sls/src/test/resources/syn.json @@ -45,7 +45,7 @@ }, { "time": 60, - "jobs": 0 + "weight": 0 } ] } diff --git a/hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json b/hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json new file mode 100644 index 0000000000..bde4cd0a69 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/resources/syn_generic.json @@ -0,0 +1,54 @@ +{ + "description": "tiny jobs workload", + "num_nodes": 20, + "nodes_per_rack": 4, + "num_jobs": 10, + "rand_seed": 2, + "workloads": [ + { + "workload_name": "tiny-test", + "workload_weight": 0.5, + "description": "Sort jobs", + "queue_name": "sls_queue_1", + "job_classes": [ + { + "class_name": "class_1", + "user_name": "foobar", + "class_weight": 1.0, + "type": "mapreduce", + "deadline_factor": {"val": 10}, + "duration": {"val": 60, "std": 5}, + "reservation": {"val": 0.5}, + "tasks":[ + { + "type": "map", + "priority": 20, + "count": { "val": 5, "std": 1}, + "time": {"val": 10, "std": 2}, + "max_memory": {"val": 1024}, + "max_vcores": {"val": 1} + }, + { + "type": "reduce", + "priority": 10, + "count": { "val": 5, "std": 1}, + "time": {"val": 20, "std": 4}, + "max_memory": {"val": 2048}, + "max_vcores": {"val": 2} + } + ] + } + ], + "time_distribution": [ + { + "time": 1, + "weight": 100 + }, + { + "time": 60, + "weight": 0 + } + ] + } + ] +} diff --git a/hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json b/hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json new file mode 100644 index 0000000000..a85065b5c9 --- /dev/null +++ b/hadoop-tools/hadoop-sls/src/test/resources/syn_stream.json @@ -0,0 +1,46 @@ +{ + "description": "stream workload", + "num_nodes": 20, + "nodes_per_rack": 4, + "num_jobs": 5, + "rand_seed": 2, + "workloads": [ + { + "workload_name": "tiny-test", + "workload_weight": 1, + "description": "long lived streaming jobs", + "queue_name": "sls_queue_1", + "job_classes": [ + { + "class_name": "class_1", + "user_name": "foobar", + "class_weight": 1.0, + "type": "stream", + "deadline_factor": {"val": 10}, + "duration": {"val": 30, "std": 5}, + "reservation": {"val": 0.5}, + "tasks":[ + { + "type": "stream", + "priority": 20, + "count": { "val": 2}, + "time": {"val": 60000}, + "max_memory": {"val": 4096}, + "max_vcores": {"val": 4} + } + ] + } + ], + "time_distribution": [ + { + "time": 1, + "weight": 100 + }, + { + "time": 2, + "weight": 0 + } + ] + } + ] +}