YARN-6363. Extending SLS: Synthetic Load Generator. (Carlo Curino via wangda)

This commit is contained in:
Wangda Tan 2017-04-20 21:54:18 -07:00
parent 667966c13c
commit de69d6e811
32 changed files with 2307 additions and 296 deletions

View File

@ -23,21 +23,37 @@ public class TaskInfo {
private final long bytesOut;
private final int recsOut;
private final long maxMemory;
private final long maxVcores;
private final ResourceUsageMetrics metrics;
public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
long maxMemory) {
this(bytesIn, recsIn, bytesOut, recsOut, maxMemory,
long maxMemory) {
this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1,
new ResourceUsageMetrics());
}
public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
long maxMemory, ResourceUsageMetrics
metrics) {
this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 1, metrics);
}
public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
long maxMemory, long maxVcores) {
this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, maxVcores,
new ResourceUsageMetrics());
}
public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
long maxMemory, ResourceUsageMetrics metrics) {
long maxMemory, long maxVcores, ResourceUsageMetrics
metrics) {
this.bytesIn = bytesIn;
this.recsIn = recsIn;
this.bytesOut = bytesOut;
this.recsOut = recsOut;
this.maxMemory = maxMemory;
this.maxVcores = maxVcores;
this.metrics = metrics;
}
@ -78,6 +94,13 @@ public long getTaskMemory() {
return maxMemory;
}
/**
* @return Vcores used by the task.
*/
public long getTaskVCores() {
return maxVcores;
}
/**
* @return Resource usage metrics
*/

View File

@ -426,7 +426,7 @@ public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
if (loggedTask == null) {
// TODO insert parameters
TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0);
return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
taskNumber, locality);
}
@ -473,7 +473,7 @@ public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
LoggedTask loggedTask = getLoggedTask(taskType, taskNumber);
if (loggedTask == null) {
// TODO insert parameters
TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0);
TaskInfo taskInfo = new TaskInfo(0, 0, 0, 0, 0, 0);
return makeUpTaskAttemptInfo(taskType, taskInfo, taskAttemptNumber,
taskNumber, locality);
}
@ -639,7 +639,7 @@ private TaskAttemptInfo getTaskAttemptInfo(LoggedTask loggedTask,
private TaskInfo getTaskInfo(LoggedTask loggedTask) {
if (loggedTask == null) {
return new TaskInfo(0, 0, 0, 0, 0);
return new TaskInfo(0, 0, 0, 0, 0, 0);
}
List<LoggedTaskAttempt> attempts = loggedTask.getAttempts();
@ -688,9 +688,10 @@ private TaskInfo getTaskInfo(LoggedTask loggedTask) {
break;
}
//note: hardcoding vCores, as they are not collected
TaskInfo taskInfo =
new TaskInfo(inputBytes, (int) inputRecords, outputBytes,
(int) outputRecords, (int) heapMegabytes,
(int) outputRecords, (int) heapMegabytes, 1,
metrics);
return taskInfo;
}

View File

@ -132,6 +132,9 @@
<exclude>src/test/resources/simulate.html.template</exclude>
<exclude>src/test/resources/simulate.info.html.template</exclude>
<exclude>src/test/resources/track.html.template</exclude>
<exclude>src/test/resources/syn.json</exclude>
<exclude>src/test/resources/inputsls.json</exclude>
<exclude>src/test/resources/nodes.json</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -16,7 +16,9 @@
function hadoop_usage()
{
echo "Usage: slsrun.sh <OPTIONS> "
echo " --input-rumen=<FILE1,FILE2,...> | --input-sls=<FILE1,FILE2,...>"
echo " --tracetype=<SYNTH | SLS | RUMEN>"
echo " --tracelocation=<FILE1,FILE2,...>"
echo " (deprecated --input-rumen=<FILE1,FILE2,...> | --input-sls=<FILE1,FILE2,...>)"
echo " --output-dir=<SLS_SIMULATION_OUTPUT_DIRECTORY>"
echo " [--nodes=<SLS_NODES_FILE>]"
echo " [--track-jobs=<JOBID1,JOBID2,...>]"
@ -33,6 +35,12 @@ function parse_args()
--input-sls=*)
inputsls=${i#*=}
;;
--tracetype=*)
tracetype=${i#*=}
;;
--tracelocation=*)
tracelocation=${i#*=}
;;
--output-dir=*)
outputdir=${i#*=}
;;
@ -52,14 +60,12 @@ function parse_args()
esac
done
if [[ -z "${inputrumen}" && -z "${inputsls}" ]] ; then
hadoop_error "ERROR: Either --input-rumen or --input-sls must be specified."
hadoop_exit_with_usage 1
if [[ -z "${inputrumen}" && -z "${inputsls}" && -z "${tracetype}" ]] ; then
hadoop_error "ERROR: Either --input-rumen, --input-sls, or --tracetype (with --tracelocation) must be specified."
fi
if [[ -n "${inputrumen}" && -n "${inputsls}" ]] ; then
hadoop_error "ERROR: Only specify one of --input-rumen or --input-sls."
hadoop_exit_with_usage 1
if [[ -n "${inputrumen}" && -n "${inputsls}" && -n "${tracetype}" ]] ; then
hadoop_error "ERROR: Only specify one of --input-rumen, --input-sls, or --tracetype (with --tracelocation)"
fi
if [[ -z "${outputdir}" ]] ; then
@ -74,11 +80,17 @@ function calculate_classpath
}
function run_simulation() {
if [[ "${inputsls}" == "" ]] ; then
hadoop_add_param args -inputrumen "-inputrumen ${inputrumen}"
else
hadoop_add_param args -inputsls "-inputsls ${inputsls}"
fi
if [[ "${inputsls}" != "" ]] ; then
hadoop_add_param args -inputsls "-inputsls ${inputsls}"
fi
if [[ "${inputrumen}" != "" ]] ; then
hadoop_add_param args -inputrumen "-inputrumen ${inputrumen}"
fi
if [[ "${tracetype}" != "" ]] ; then
hadoop_add_param args -tracetype "-tracetype ${tracetype}"
hadoop_add_param args -tracelocation "-tracelocation ${tracelocation}"
fi
hadoop_add_param args -output "-output ${outputdir}"

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.*;
import java.util.ArrayList;
import java.util.List;
/**
* Simple support class, used to create reservation requests.
*/
public final class ReservationClientUtil {
private ReservationClientUtil(){
//avoid instantiation
}
/**
* Creates a request that envelopes a MR jobs, picking max number of maps and
* reducers, max durations, and max resources per container.
*
* @param reservationId the id of the reservation
* @param name the name of a reservation
* @param maxMapRes maximum resources used by any mapper
* @param numberMaps number of mappers
* @param maxMapDur maximum duration of any mapper
* @param maxRedRes maximum resources used by any reducer
* @param numberReduces number of reducers
* @param maxRedDur maximum duration of any reducer
* @param arrival start time of valid range for reservation
* @param deadline deadline for this reservation
* @param queueName queue to submit to
* @return a submission request
*/
@SuppressWarnings("checkstyle:parameternumber")
public static ReservationSubmissionRequest createMRReservation(
ReservationId reservationId, String name, Resource maxMapRes,
int numberMaps, long maxMapDur, Resource maxRedRes, int numberReduces,
long maxRedDur, long arrival, long deadline, String queueName) {
ReservationRequest mapRR = ReservationRequest.newInstance(maxMapRes,
numberMaps, numberMaps, maxMapDur);
ReservationRequest redRR = ReservationRequest.newInstance(maxRedRes,
numberReduces, numberReduces, maxRedDur);
List<ReservationRequest> listResReq = new ArrayList<ReservationRequest>();
listResReq.add(mapRR);
listResReq.add(redRR);
ReservationRequests reservationRequests = ReservationRequests
.newInstance(listResReq, ReservationRequestInterpreter.R_ORDER_NO_GAP);
ReservationDefinition resDef = ReservationDefinition.newInstance(arrival,
deadline, reservationRequests, name);
// outermost request
ReservationSubmissionRequest request = ReservationSubmissionRequest
.newInstance(resDef, queueName, reservationId);
return request;
}
}

View File

@ -41,17 +41,25 @@
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.JobTraceReader;
import org.apache.hadoop.tools.rumen.LoggedJob;
import org.apache.hadoop.tools.rumen.LoggedTask;
import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -67,25 +75,27 @@
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
import org.apache.hadoop.yarn.sls.resourcemanager.MockAMLauncher;
import org.apache.hadoop.yarn.sls.scheduler.*;
import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.log4j.Logger;
@Private
@Unstable
public class SLSRunner {
public class SLSRunner extends Configured implements Tool {
// RM, Runner
private ResourceManager rm;
private static TaskRunner runner = new TaskRunner();
private String[] inputTraces;
private Configuration conf;
private Map<String, Integer> queueAppNumMap;
// NM simulator
private HashMap<NodeId, NMSimulator> nmMap;
private int nmMemoryMB, nmVCores;
private String nodeFile;
// AM simulator
private int AM_ID;
private Map<String, AMSimulator> amMap;
@ -106,43 +116,64 @@ public class SLSRunner {
// logger
public final static Logger LOG = Logger.getLogger(SLSRunner.class);
// input traces, input-rumen or input-sls
private boolean isSLS;
public SLSRunner(boolean isSLS, String inputTraces[], String nodeFile,
String outputDir, Set<String> trackedApps,
boolean printsimulation)
throws IOException, ClassNotFoundException {
this.isSLS = isSLS;
this.inputTraces = inputTraces.clone();
this.nodeFile = nodeFile;
this.trackedApps = trackedApps;
this.printSimulation = printsimulation;
metricsOutputDir = outputDir;
/**
* The type of trace in input.
*/
public enum TraceType {
SLS, RUMEN, SYNTH
}
private TraceType inputType;
private SynthTraceJobProducer stjp;
public SLSRunner() throws ClassNotFoundException {
Configuration tempConf = new Configuration(false);
init(tempConf);
}
public SLSRunner(Configuration tempConf) throws ClassNotFoundException {
init(tempConf);
}
private void init(Configuration tempConf) throws ClassNotFoundException {
nmMap = new HashMap<>();
queueAppNumMap = new HashMap<>();
amMap = new ConcurrentHashMap<>();
amClassMap = new HashMap<>();
// runner configuration
conf = new Configuration(false);
conf.addResource("sls-runner.xml");
tempConf.addResource("sls-runner.xml");
super.setConf(tempConf);
// runner
int poolSize = conf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
SLSRunner.runner.setQueueSize(poolSize);
// <AMType, Class> map
for (Map.Entry e : conf) {
for (Map.Entry e : tempConf) {
String key = e.getKey().toString();
if (key.startsWith(SLSConfiguration.AM_TYPE)) {
String amType = key.substring(SLSConfiguration.AM_TYPE.length());
amClassMap.put(amType, Class.forName(conf.get(key)));
amClassMap.put(amType, Class.forName(tempConf.get(key)));
}
}
}
public void start() throws Exception {
public void setSimulationParams(TraceType inType, String[] inTraces,
String nodes, String outDir, Set<String> trackApps,
boolean printsimulation) throws IOException, ClassNotFoundException {
this.inputType = inType;
this.inputTraces = inTraces.clone();
this.nodeFile = nodes;
this.trackedApps = trackApps;
this.printSimulation = printsimulation;
metricsOutputDir = outDir;
}
public void start() throws IOException, ClassNotFoundException, YarnException,
InterruptedException {
// start resource manager
startRM();
// start node managers
@ -151,9 +182,9 @@ public void start() throws Exception {
startAM();
// set queue & tracked apps information
((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
.setQueueSet(this.queueAppNumMap.keySet());
.setQueueSet(this.queueAppNumMap.keySet());
((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
.setTrackedAppSet(this.trackedApps);
.setTrackedAppSet(this.trackedApps);
// print out simulation info
printSimulationInfo();
// blocked until all nodes RUNNING
@ -162,23 +193,23 @@ public void start() throws Exception {
runner.start();
}
private void startRM() throws Exception {
Configuration rmConf = new YarnConfiguration();
private void startRM() throws ClassNotFoundException, YarnException {
Configuration rmConf = new YarnConfiguration(getConf());
String schedulerClass = rmConf.get(YarnConfiguration.RM_SCHEDULER);
// For CapacityScheduler we use a sub-classing instead of wrapping
// to allow scheduler-specific invocations from monitors to work
// this can be used for other schedulers as well if we care to
// exercise/track behaviors that are not common to the scheduler api
if(Class.forName(schedulerClass) == CapacityScheduler.class) {
if (Class.forName(schedulerClass) == CapacityScheduler.class) {
rmConf.set(YarnConfiguration.RM_SCHEDULER,
SLSCapacityScheduler.class.getName());
} else if (Class.forName(schedulerClass) == FairScheduler.class) {
rmConf.set(YarnConfiguration.RM_SCHEDULER,
SLSFairScheduler.class.getName());
} else if (Class.forName(schedulerClass) == FifoScheduler.class){
} else if (Class.forName(schedulerClass) == FifoScheduler.class) {
// TODO add support for FifoScheduler
throw new Exception("Fifo Scheduler is not supported yet.");
throw new YarnException("Fifo Scheduler is not supported yet.");
}
rmConf.set(SLSConfiguration.METRICS_OUTPUT_DIR, metricsOutputDir);
@ -196,37 +227,47 @@ protected ApplicationMasterLauncher createAMLauncher() {
private void startNM() throws YarnException, IOException {
// nm configuration
nmMemoryMB = conf.getInt(SLSConfiguration.NM_MEMORY_MB,
SLSConfiguration.NM_MEMORY_MB_DEFAULT);
nmVCores = conf.getInt(SLSConfiguration.NM_VCORES,
SLSConfiguration.NM_VCORES_DEFAULT);
int heartbeatInterval = conf.getInt(
SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
nmMemoryMB = getConf().getInt(SLSConfiguration.NM_MEMORY_MB,
SLSConfiguration.NM_MEMORY_MB_DEFAULT);
nmVCores = getConf().getInt(SLSConfiguration.NM_VCORES,
SLSConfiguration.NM_VCORES_DEFAULT);
int heartbeatInterval =
getConf().getInt(SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS_DEFAULT);
// nm information (fetch from topology file, or from sls/rumen json file)
Set<String> nodeSet = new HashSet<String>();
if (nodeFile.isEmpty()) {
if (isSLS) {
for (String inputTrace : inputTraces) {
for (String inputTrace : inputTraces) {
switch (inputType) {
case SLS:
nodeSet.addAll(SLSUtils.parseNodesFromSLSTrace(inputTrace));
}
} else {
for (String inputTrace : inputTraces) {
break;
case RUMEN:
nodeSet.addAll(SLSUtils.parseNodesFromRumenTrace(inputTrace));
break;
case SYNTH:
stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
nodeSet.addAll(SLSUtils.generateNodesFromSynth(stjp.getNumNodes(),
stjp.getNodesPerRack()));
break;
default:
throw new YarnException("Input configuration not recognized, "
+ "trace type should be SLS, RUMEN, or SYNTH");
}
}
} else {
nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile));
}
// create NM simulators
Random random = new Random();
Set<String> rackSet = new HashSet<String>();
for (String hostName : nodeSet) {
// we randomize the heartbeat start time from zero to 1 interval
NMSimulator nm = new NMSimulator();
nm.init(hostName, nmMemoryMB, nmVCores,
random.nextInt(heartbeatInterval), heartbeatInterval, rm);
nm.init(hostName, nmMemoryMB, nmVCores, random.nextInt(heartbeatInterval),
heartbeatInterval, rm);
nmMap.put(nm.getNode().getNodeID(), nm);
runner.schedule(nm);
rackSet.add(nm.getNode().getRackName());
@ -241,39 +282,50 @@ private void waitForNodesRunning() throws InterruptedException {
int numRunningNodes = 0;
for (RMNode node : rm.getRMContext().getRMNodes().values()) {
if (node.getState() == NodeState.RUNNING) {
numRunningNodes ++;
numRunningNodes++;
}
}
if (numRunningNodes == numNMs) {
break;
}
LOG.info(MessageFormat.format("SLSRunner is waiting for all " +
"nodes RUNNING. {0} of {1} NMs initialized.",
numRunningNodes, numNMs));
LOG.info(MessageFormat.format(
"SLSRunner is waiting for all "
+ "nodes RUNNING. {0} of {1} NMs initialized.",
numRunningNodes, numNMs));
Thread.sleep(1000);
}
LOG.info(MessageFormat.format("SLSRunner takes {0} ms to launch all nodes.",
(System.currentTimeMillis() - startTimeMS)));
(System.currentTimeMillis() - startTimeMS)));
}
@SuppressWarnings("unchecked")
private void startAM() throws YarnException, IOException {
// application/container configuration
int heartbeatInterval = conf.getInt(
SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
int heartbeatInterval =
getConf().getInt(SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
int containerMemoryMB = conf.getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
int containerMemoryMB =
getConf().getInt(SLSConfiguration.CONTAINER_MEMORY_MB,
SLSConfiguration.CONTAINER_MEMORY_MB_DEFAULT);
int containerVCores = conf.getInt(SLSConfiguration.CONTAINER_VCORES,
SLSConfiguration.CONTAINER_VCORES_DEFAULT);
int containerVCores = getConf().getInt(SLSConfiguration.CONTAINER_VCORES,
SLSConfiguration.CONTAINER_VCORES_DEFAULT);
Resource containerResource =
BuilderUtils.newResource(containerMemoryMB, containerVCores);
BuilderUtils.newResource(containerMemoryMB, containerVCores);
// application workload
if (isSLS) {
switch (inputType) {
case SLS:
startAMFromSLSTraces(containerResource, heartbeatInterval);
} else {
break;
case RUMEN:
startAMFromRumenTraces(containerResource, heartbeatInterval);
break;
case SYNTH:
startAMFromSynthGenerator(heartbeatInterval);
break;
default:
throw new YarnException("Input configuration not recognized, "
+ "trace type should be SLS, RUMEN, or SYNTH");
}
numAMs = amMap.size();
remainingApps = numAMs;
@ -284,7 +336,7 @@ private void startAM() throws YarnException, IOException {
*/
@SuppressWarnings("unchecked")
private void startAMFromSLSTraces(Resource containerResource,
int heartbeatInterval) throws IOException {
int heartbeatInterval) throws IOException {
// parse from sls traces
JsonFactory jsonF = new JsonFactory();
ObjectMapper mapper = new ObjectMapper();
@ -292,26 +344,28 @@ private void startAMFromSLSTraces(Resource containerResource,
Reader input =
new InputStreamReader(new FileInputStream(inputTrace), "UTF-8");
try {
Iterator<Map> i = mapper.readValues(jsonF.createParser(input),
Map.class);
Iterator<Map> i =
mapper.readValues(jsonF.createParser(input), Map.class);
while (i.hasNext()) {
Map jsonJob = i.next();
// load job information
long jobStartTime = Long.parseLong(
jsonJob.get("job.start.ms").toString());
long jobFinishTime = Long.parseLong(
jsonJob.get("job.end.ms").toString());
long jobStartTime =
Long.parseLong(jsonJob.get("job.start.ms").toString());
long jobFinishTime =
Long.parseLong(jsonJob.get("job.end.ms").toString());
String user = (String) jsonJob.get("job.user");
if (user == null) user = "default";
if (user == null) {
user = "default";
}
String queue = jsonJob.get("job.queue.name").toString();
String oldAppId = jsonJob.get("job.id").toString();
boolean isTracked = trackedApps.contains(oldAppId);
int queueSize = queueAppNumMap.containsKey(queue) ?
queueAppNumMap.get(queue) : 0;
queueSize ++;
int queueSize =
queueAppNumMap.containsKey(queue) ? queueAppNumMap.get(queue) : 0;
queueSize++;
queueAppNumMap.put(queue, queueSize);
// tasks
List tasks = (List) jsonJob.get("job.tasks");
@ -319,45 +373,45 @@ private void startAMFromSLSTraces(Resource containerResource,
continue;
}
List<ContainerSimulator> containerList =
new ArrayList<ContainerSimulator>();
new ArrayList<ContainerSimulator>();
for (Object o : tasks) {
Map jsonTask = (Map) o;
String hostname = jsonTask.get("container.host").toString();
long taskStart = Long.parseLong(
jsonTask.get("container.start.ms").toString());
long taskFinish = Long.parseLong(
jsonTask.get("container.end.ms").toString());
long taskStart =
Long.parseLong(jsonTask.get("container.start.ms").toString());
long taskFinish =
Long.parseLong(jsonTask.get("container.end.ms").toString());
long lifeTime = taskFinish - taskStart;
// Set memory and vcores from job trace file
Resource res = Resources.clone(containerResource);
if (jsonTask.containsKey("container.memory")) {
int containerMemory = Integer.parseInt(
jsonTask.get("container.memory").toString());
int containerMemory =
Integer.parseInt(jsonTask.get("container.memory").toString());
res.setMemorySize(containerMemory);
}
if (jsonTask.containsKey("container.vcores")) {
int containerVCores = Integer.parseInt(
jsonTask.get("container.vcores").toString());
int containerVCores =
Integer.parseInt(jsonTask.get("container.vcores").toString());
res.setVirtualCores(containerVCores);
}
int priority = Integer.parseInt(
jsonTask.get("container.priority").toString());
int priority =
Integer.parseInt(jsonTask.get("container.priority").toString());
String type = jsonTask.get("container.type").toString();
containerList.add(new ContainerSimulator(res,
lifeTime, hostname, priority, type));
containerList.add(new ContainerSimulator(res, lifeTime, hostname,
priority, type));
}
// create a new AM
String amType = jsonJob.get("am.type").toString();
AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
amClassMap.get(amType), new Configuration());
AMSimulator amSim = (AMSimulator) ReflectionUtils
.newInstance(amClassMap.get(amType), new Configuration());
if (amSim != null) {
amSim.init(AM_ID++, heartbeatInterval, containerList, rm,
this, jobStartTime, jobFinishTime, user, queue,
isTracked, oldAppId);
amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this,
jobStartTime, jobFinishTime, user, queue, isTracked, oldAppId,
null, runner.getStartTimeMS());
runner.schedule(amSim);
maxRuntime = Math.max(maxRuntime, jobFinishTime);
numTasks += containerList.size();
@ -375,22 +429,21 @@ private void startAMFromSLSTraces(Resource containerResource,
*/
@SuppressWarnings("unchecked")
private void startAMFromRumenTraces(Resource containerResource,
int heartbeatInterval)
throws IOException {
int heartbeatInterval) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
long baselineTimeMS = 0;
for (String inputTrace : inputTraces) {
File fin = new File(inputTrace);
JobTraceReader reader = new JobTraceReader(
new Path(fin.getAbsolutePath()), conf);
JobTraceReader reader =
new JobTraceReader(new Path(fin.getAbsolutePath()), conf);
try {
LoggedJob job = null;
while ((job = reader.getNext()) != null) {
// only support MapReduce currently
String jobType = "mapreduce";
String user = job.getUser() == null ?
"default" : job.getUser().getValue();
String user =
job.getUser() == null ? "default" : job.getUser().getValue();
String jobQueue = job.getQueue().getValue();
String oldJobId = job.getJobID().toString();
long jobStartTimeMS = job.getSubmitTime();
@ -407,48 +460,48 @@ private void startAMFromRumenTraces(Resource containerResource,
}
boolean isTracked = trackedApps.contains(oldJobId);
int queueSize = queueAppNumMap.containsKey(jobQueue) ?
queueAppNumMap.get(jobQueue) : 0;
queueSize ++;
int queueSize = queueAppNumMap.containsKey(jobQueue)
? queueAppNumMap.get(jobQueue) : 0;
queueSize++;
queueAppNumMap.put(jobQueue, queueSize);
List<ContainerSimulator> containerList =
new ArrayList<ContainerSimulator>();
new ArrayList<ContainerSimulator>();
// map tasks
for(LoggedTask mapTask : job.getMapTasks()) {
for (LoggedTask mapTask : job.getMapTasks()) {
if (mapTask.getAttempts().size() == 0) {
continue;
}
LoggedTaskAttempt taskAttempt = mapTask.getAttempts()
.get(mapTask.getAttempts().size() - 1);
LoggedTaskAttempt taskAttempt =
mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
String hostname = taskAttempt.getHostName().getValue();
long containerLifeTime = taskAttempt.getFinishTime()
- taskAttempt.getStartTime();
long containerLifeTime =
taskAttempt.getFinishTime() - taskAttempt.getStartTime();
containerList.add(new ContainerSimulator(containerResource,
containerLifeTime, hostname, 10, "map"));
containerLifeTime, hostname, 10, "map"));
}
// reduce tasks
for(LoggedTask reduceTask : job.getReduceTasks()) {
for (LoggedTask reduceTask : job.getReduceTasks()) {
if (reduceTask.getAttempts().size() == 0) {
continue;
}
LoggedTaskAttempt taskAttempt = reduceTask.getAttempts()
.get(reduceTask.getAttempts().size() - 1);
.get(reduceTask.getAttempts().size() - 1);
String hostname = taskAttempt.getHostName().getValue();
long containerLifeTime = taskAttempt.getFinishTime()
- taskAttempt.getStartTime();
long containerLifeTime =
taskAttempt.getFinishTime() - taskAttempt.getStartTime();
containerList.add(new ContainerSimulator(containerResource,
containerLifeTime, hostname, 20, "reduce"));
containerLifeTime, hostname, 20, "reduce"));
}
// create a new AM
AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
amClassMap.get(jobType), conf);
AMSimulator amSim = (AMSimulator) ReflectionUtils
.newInstance(amClassMap.get(jobType), conf);
if (amSim != null) {
amSim.init(AM_ID ++, heartbeatInterval, containerList,
rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue,
isTracked, oldJobId);
amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this,
jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked,
oldJobId, null, runner.getStartTimeMS());
runner.schedule(amSim);
maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
numTasks += containerList.size();
@ -460,34 +513,168 @@ private void startAMFromRumenTraces(Resource containerResource,
}
}
}
/**
* parse workload information from synth-generator trace files.
*/
@SuppressWarnings("unchecked")
private void startAMFromSynthGenerator(int heartbeatInterval)
throws IOException {
Configuration localConf = new Configuration();
localConf.set("fs.defaultFS", "file:///");
long baselineTimeMS = 0;
// reservations use wall clock time, so need to have a reference for that
UTCClock clock = new UTCClock();
long now = clock.getTime();
try {
// if we use the nodeFile this could have been not initialized yet.
if (stjp == null) {
stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
}
SynthJob job = null;
// we use stjp, a reference to the job producer instantiated during node
// creation
while ((job = (SynthJob) stjp.getNextJob()) != null) {
// only support MapReduce currently
String jobType = "mapreduce";
String user = job.getUser();
String jobQueue = job.getQueueName();
String oldJobId = job.getJobID().toString();
long jobStartTimeMS = job.getSubmissionTime();
// CARLO: Finish time is only used for logging, omit for now
long jobFinishTimeMS = -1L;
if (baselineTimeMS == 0) {
baselineTimeMS = jobStartTimeMS;
}
jobStartTimeMS -= baselineTimeMS;
jobFinishTimeMS -= baselineTimeMS;
if (jobStartTimeMS < 0) {
LOG.warn("Warning: reset job " + oldJobId + " start time to 0.");
jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
jobStartTimeMS = 0;
}
boolean isTracked = trackedApps.contains(oldJobId);
int queueSize = queueAppNumMap.containsKey(jobQueue)
? queueAppNumMap.get(jobQueue) : 0;
queueSize++;
queueAppNumMap.put(jobQueue, queueSize);
List<ContainerSimulator> containerList =
new ArrayList<ContainerSimulator>();
ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
Random rand = new Random(stjp.getSeed());
Resource maxMapRes = Resource.newInstance(0, 0);
long maxMapDur = 0;
// map tasks
for (int i = 0; i < job.getNumberMaps(); i++) {
TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0);
RMNode node = nmMap
.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode();
String hostname = "/" + node.getRackName() + "/" + node.getHostName();
long containerLifeTime = tai.getRuntime();
Resource containerResource =
Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
(int) tai.getTaskInfo().getTaskVCores());
containerList.add(new ContainerSimulator(containerResource,
containerLifeTime, hostname, 10, "map"));
maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource);
maxMapDur =
containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur;
}
Resource maxRedRes = Resource.newInstance(0, 0);
long maxRedDur = 0;
// reduce tasks
for (int i = 0; i < job.getNumberReduces(); i++) {
TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0);
RMNode node = nmMap
.get(keyAsArray.get(rand.nextInt(keyAsArray.size()))).getNode();
String hostname = "/" + node.getRackName() + "/" + node.getHostName();
long containerLifeTime = tai.getRuntime();
Resource containerResource =
Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
(int) tai.getTaskInfo().getTaskVCores());
containerList.add(new ContainerSimulator(containerResource,
containerLifeTime, hostname, 20, "reduce"));
maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource);
maxRedDur =
containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur;
}
// generating reservations for the jobs that require them
ReservationSubmissionRequest rr = null;
if (job.hasDeadline()) {
ReservationId reservationId =
ReservationId.newInstance(this.rm.getStartTime(), AM_ID);
rr = ReservationClientUtil.createMRReservation(reservationId,
"reservation_" + AM_ID, maxMapRes, job.getNumberMaps(), maxMapDur,
maxRedRes, job.getNumberReduces(), maxRedDur,
now + jobStartTimeMS, now + job.getDeadline(),
job.getQueueName());
}
// create a new AM
AMSimulator amSim = (AMSimulator) ReflectionUtils
.newInstance(amClassMap.get(jobType), localConf);
if (amSim != null) {
amSim.init(AM_ID++, heartbeatInterval, containerList, rm, this,
jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked,
oldJobId, rr, runner.getStartTimeMS());
runner.schedule(amSim);
maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
numTasks += containerList.size();
amMap.put(oldJobId, amSim);
}
}
} finally {
stjp.close();
}
}
private void printSimulationInfo() {
if (printSimulation) {
// node
LOG.info("------------------------------------");
LOG.info(MessageFormat.format("# nodes = {0}, # racks = {1}, capacity " +
"of each node {2} MB memory and {3} vcores.",
numNMs, numRacks, nmMemoryMB, nmVCores));
LOG.info(MessageFormat.format(
"# nodes = {0}, # racks = {1}, capacity "
+ "of each node {2} MB memory and {3} vcores.",
numNMs, numRacks, nmMemoryMB, nmVCores));
LOG.info("------------------------------------");
// job
LOG.info(MessageFormat.format("# applications = {0}, # total " +
"tasks = {1}, average # tasks per application = {2}",
numAMs, numTasks, (int)(Math.ceil((numTasks + 0.0) / numAMs))));
LOG.info(MessageFormat.format(
"# applications = {0}, # total "
+ "tasks = {1}, average # tasks per application = {2}",
numAMs, numTasks, (int) (Math.ceil((numTasks + 0.0) / numAMs))));
LOG.info("JobId\tQueue\tAMType\tDuration\t#Tasks");
for (Map.Entry<String, AMSimulator> entry : amMap.entrySet()) {
AMSimulator am = entry.getValue();
LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType()
LOG.info(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType()
+ "\t" + am.getDuration() + "\t" + am.getNumTasks());
}
LOG.info("------------------------------------");
// queue
LOG.info(MessageFormat.format("number of queues = {0} average " +
"number of apps = {1}", queueAppNumMap.size(),
(int)(Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))));
LOG.info(MessageFormat.format(
"number of queues = {0} average " + "number of apps = {1}",
queueAppNumMap.size(),
(int) (Math.ceil((numAMs + 0.0) / queueAppNumMap.size()))));
LOG.info("------------------------------------");
// runtime
LOG.info(MessageFormat.format("estimated simulation time is {0}" +
" seconds", (long)(Math.ceil(maxRuntime / 1000.0))));
LOG.info(
MessageFormat.format("estimated simulation time is {0}" + " seconds",
(long) (Math.ceil(maxRuntime / 1000.0))));
LOG.info("------------------------------------");
}
// package these information in the simulateInfoMap used by other places
@ -510,69 +697,121 @@ public HashMap<NodeId, NMSimulator> getNmMap() {
return nmMap;
}
public static TaskRunner getRunner() {
return runner;
}
public static void decreaseRemainingApps() {
remainingApps --;
remainingApps--;
if (remainingApps == 0) {
LOG.info("SLSRunner tears down.");
System.exit(0);
}
}
public static void main(String args[]) throws Exception {
public void stop() throws InterruptedException {
rm.stop();
runner.stop();
}
public int run(final String[] argv) throws IOException, InterruptedException,
ParseException, ClassNotFoundException, YarnException {
Options options = new Options();
// Left for compatibility
options.addOption("inputrumen", true, "input rumen files");
options.addOption("inputsls", true, "input sls files");
// New more general format
options.addOption("tracetype", true, "the type of trace");
options.addOption("tracelocation", true, "input trace files");
options.addOption("nodes", true, "input topology");
options.addOption("output", true, "output directory");
options.addOption("trackjobs", true,
"jobs to be tracked during simulating");
"jobs to be tracked during simulating");
options.addOption("printsimulation", false,
"print out simulation information");
CommandLineParser parser = new GnuParser();
CommandLine cmd = parser.parse(options, args);
"print out simulation information");
CommandLineParser parser = new GnuParser();
CommandLine cmd = parser.parse(options, argv);
String traceType = null;
String traceLocation = null;
// compatibility with old commandline
if (cmd.hasOption("inputrumen")) {
traceType = "RUMEN";
traceLocation = cmd.getOptionValue("inputrumen");
}
if (cmd.hasOption("inputsls")) {
traceType = "SLS";
traceLocation = cmd.getOptionValue("inputsls");
}
if (cmd.hasOption("tracetype")) {
traceType = cmd.getOptionValue("tracetype");
traceLocation = cmd.getOptionValue("tracelocation");
}
String inputRumen = cmd.getOptionValue("inputrumen");
String inputSLS = cmd.getOptionValue("inputsls");
String output = cmd.getOptionValue("output");
if ((inputRumen == null && inputSLS == null) || output == null) {
System.err.println();
System.err.println("ERROR: Missing input or output file");
System.err.println();
System.err.println("Options: -inputrumen|-inputsls FILE,FILE... " +
"-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] " +
"[-printsimulation]");
System.err.println();
System.exit(1);
}
File outputFile = new File(output);
if (! outputFile.exists()
&& ! outputFile.mkdirs()) {
if (!outputFile.exists() && !outputFile.mkdirs()) {
System.err.println("ERROR: Cannot create output directory "
+ outputFile.getAbsolutePath());
System.exit(1);
+ outputFile.getAbsolutePath());
throw new YarnException("Cannot create output directory");
}
Set<String> trackedJobSet = new HashSet<String>();
if (cmd.hasOption("trackjobs")) {
String trackjobs = cmd.getOptionValue("trackjobs");
String jobIds[] = trackjobs.split(",");
trackedJobSet.addAll(Arrays.asList(jobIds));
}
String nodeFile = cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
boolean isSLS = inputSLS != null;
String inputFiles[] = isSLS ? inputSLS.split(",") : inputRumen.split(",");
SLSRunner sls = new SLSRunner(isSLS, inputFiles, nodeFile, output,
String tempNodeFile =
cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
TraceType tempTraceType = TraceType.SLS;
switch (traceType) {
case "SLS":
tempTraceType = TraceType.SLS;
break;
case "RUMEN":
tempTraceType = TraceType.RUMEN;
break;
case "SYNTH":
tempTraceType = TraceType.SYNTH;
break;
default:
printUsage();
throw new YarnException("Misconfigured input");
}
String[] inputFiles = traceLocation.split(",");
setSimulationParams(tempTraceType, inputFiles, tempNodeFile, output,
trackedJobSet, cmd.hasOption("printsimulation"));
sls.start();
start();
return 0;
}
public static void main(String[] argv) throws Exception {
ToolRunner.run(new Configuration(), new SLSRunner(), argv);
}
static void printUsage() {
System.err.println();
System.err.println("ERROR: Wrong tracetype");
System.err.println();
System.err.println(
"Options: -tracetype " + "SLS|RUMEN|SYNTH -tracelocation FILE,FILE... "
+ "(deprecated alternative options --inputsls FILE, FILE,... "
+ " | --inputrumen FILE,FILE,...)"
+ "-output FILE [-nodes FILE] [-trackjobs JobId,JobId...] "
+ "[-printsimulation]");
System.err.println();
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.sls.appmaster;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.text.MessageFormat;
@ -37,6 +38,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
@ -55,6 +57,7 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -97,6 +100,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
// am type
protected String amtype;
// job start/end time
private long baselineTimeMS;
protected long traceStartTimeMS;
protected long traceFinishTimeMS;
protected long simulateStartTimeMS;
@ -117,25 +121,30 @@ public abstract class AMSimulator extends TaskRunner.Task {
private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
private ReservationSubmissionRequest reservationRequest;
public AMSimulator() {
this.responseQueue = new LinkedBlockingQueue<>();
}
public void init(int id, int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId) {
super.init(traceStartTime, traceStartTime + 1000000L * heartbeatInterval,
heartbeatInterval);
this.user = user;
this.rm = rm;
this.se = se;
this.user = user;
this.queue = queue;
this.oldAppId = oldAppId;
this.isTracked = isTracked;
this.traceStartTimeMS = traceStartTime;
this.traceFinishTimeMS = traceFinishTime;
@SuppressWarnings("checkstyle:parameternumber")
public void init(int id, int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp,
ReservationSubmissionRequest rr, long baseTimeMS) {
super.init(startTime, startTime + 1000000L * heartbeatInterval,
heartbeatInterval);
this.user = simUser;
this.rm = resourceManager;
this.se = slsRunnner;
this.queue = simQueue;
this.oldAppId = oldApp;
this.isTracked = tracked;
this.baselineTimeMS = baseTimeMS;
this.traceStartTimeMS = startTime;
this.traceFinishTimeMS = finishTime;
this.reservationRequest = rr;
}
/**
@ -143,11 +152,21 @@ public void init(int id, int heartbeatInterval,
*/
@Override
public void firstStep() throws Exception {
simulateStartTimeMS = System.currentTimeMillis() -
SLSRunner.getRunner().getStartTimeMS();
simulateStartTimeMS = System.currentTimeMillis() - baselineTimeMS;
ReservationId reservationId = null;
// submit a reservation if one is required, exceptions naturally happen
// when the reservation does not fit, catch, log, and move on running job
// without reservation.
try {
reservationId = submitReservationWhenSpecified();
} catch (UndeclaredThrowableException y) {
LOG.warn("Unable to place reservation: " + y.getMessage());
}
// submit application, waiting until ACCEPTED
submitApp();
submitApp(reservationId);
// track app metrics
trackApp();
@ -161,6 +180,26 @@ public synchronized void notifyAMContainerLaunched(Container masterContainer)
isAMContainerRunning = true;
}
private ReservationId submitReservationWhenSpecified()
throws IOException, InterruptedException {
if (reservationRequest != null) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws YarnException, IOException {
rm.getClientRMService().submitReservation(reservationRequest);
LOG.info("RESERVATION SUCCESSFULLY SUBMITTED "
+ reservationRequest.getReservationId());
return null;
}
});
return reservationRequest.getReservationId();
} else {
return null;
}
}
@Override
public void middleStep() throws Exception {
if (isAMContainerRunning) {
@ -217,14 +256,13 @@ public Object run() throws Exception {
}
});
simulateFinishTimeMS = System.currentTimeMillis() -
SLSRunner.getRunner().getStartTimeMS();
simulateFinishTimeMS = System.currentTimeMillis() - baselineTimeMS;
// record job running information
SchedulerMetrics schedulerMetrics =
((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
((SchedulerWrapper)rm.getResourceScheduler()).getSchedulerMetrics();
if (schedulerMetrics != null) {
schedulerMetrics.addAMRuntime(appId, traceStartTimeMS, traceFinishTimeMS,
simulateStartTimeMS, simulateFinishTimeMS);
simulateStartTimeMS, simulateFinishTimeMS);
}
}
@ -261,7 +299,7 @@ protected AllocateRequest createAllocateRequest(List<ResourceRequest> ask) {
protected abstract void checkStop();
private void submitApp()
private void submitApp(ReservationId reservationId)
throws YarnException, InterruptedException, IOException {
// ask for new application
GetNewApplicationRequest newAppRequest =
@ -291,6 +329,11 @@ private void submitApp()
appSubContext.setResource(Resources
.createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
MR_AM_CONTAINER_RESOURCE_VCORES));
if(reservationId != null) {
appSubContext.setReservationID(reservationId);
}
subAppRequest.setApplicationSubmissionContext(appSubContext);
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
ugi.doAs(new PrivilegedExceptionAction<Object>() {

View File

@ -27,13 +27,13 @@
import java.util.List;
import java.util.Map;
import org.apache.avro.Protocol;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -42,7 +42,6 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.SLSRunner;
@ -114,13 +113,15 @@ scheduled when all maps have finished (not support slow-start currently).
public final Logger LOG = Logger.getLogger(MRAMSimulator.class);
@SuppressWarnings("checkstyle:parameternumber")
public void init(int id, int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId) {
boolean isTracked, String oldAppId, ReservationSubmissionRequest rr,
long baselineStartTimeMS) {
super.init(id, heartbeatInterval, containerList, rm, se,
traceStartTime, traceFinishTime, user, queue,
isTracked, oldAppId);
isTracked, oldAppId, rr, baselineStartTimeMS);
amtype = "mapreduce";
// get map/reduce tasks

View File

@ -65,6 +65,11 @@ protected void serviceStart() throws Exception {
// Do nothing
}
@Override
protected void serviceStop() throws Exception {
// Do nothing
}
private void setupAMRMToken(RMAppAttempt appAttempt) {
// Setup AMRMToken
Token<AMRMTokenIdentifier> amrmToken =

View File

@ -28,7 +28,6 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@ -52,7 +51,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.codahale.metrics.Timer;
@ -96,16 +94,6 @@ public void setConf(Configuration conf) {
} catch (Exception e) {
e.printStackTrace();
}
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override public void run() {
try {
schedulerMetrics.tearDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}, SLSUtils.SHUTDOWN_HOOK_PRIORITY);
}
}
@ -344,7 +332,6 @@ private void initQueueMetrics(CSQueue queue) {
initQueueMetrics(child);
}
}
@Override
public void serviceInit(Configuration configuration) throws Exception {
super.serviceInit(configuration);
@ -354,6 +341,17 @@ public void serviceInit(Configuration configuration) throws Exception {
}
}
@Override
public void serviceStop() throws Exception {
try {
schedulerMetrics.tearDown();
} catch (Exception e) {
e.printStackTrace();
}
super.serviceStop();
}
public SchedulerMetrics getSchedulerMetrics() {
return schedulerMetrics;
}

View File

@ -22,7 +22,6 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@ -44,7 +43,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
@ -90,16 +88,6 @@ public void setConf(Configuration conf) {
} catch (Exception e) {
e.printStackTrace();
}
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override public void run() {
try {
schedulerMetrics.tearDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}, SLSUtils.SHUTDOWN_HOOK_PRIORITY);
}
}
@ -335,5 +323,15 @@ public void serviceInit(Configuration conf) throws Exception {
initQueueMetrics(getQueueManager().getRootQueue());
}
}
@Override
public void serviceStop() throws Exception {
try {
schedulerMetrics.tearDown();
} catch (Exception e) {
e.printStackTrace();
}
super.serviceStop();
}
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.yarn.sls.scheduler;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Queue;
import java.util.concurrent.DelayQueue;
@ -27,7 +26,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.exceptions.YarnException;
@Private
@Unstable
@ -148,8 +146,8 @@ public void setQueueSize(int threadPoolSize) {
@SuppressWarnings("unchecked")
public void start() {
if (executor != null) {
throw new IllegalStateException("Already started");
if (executor != null && !executor.isTerminated()) {
throw new IllegalStateException("Executor already running");
}
DelayQueue preStartQueue = queue;
@ -164,8 +162,9 @@ public void start() {
}
}
public void stop() {
public void stop() throws InterruptedException {
executor.shutdownNow();
executor.awaitTermination(20, TimeUnit.SECONDS);
}
@SuppressWarnings("unchecked")

View File

@ -0,0 +1,306 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.synthetic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.distribution.LogNormalDistribution;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.*;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.mapreduce.MRJobConfig.QUEUE_NAME;
/**
* Generates random task data for a synthetic job.
*/
public class SynthJob implements JobStory {
@SuppressWarnings("StaticVariableName")
private static Log LOG = LogFactory.getLog(SynthJob.class);
private final Configuration conf;
private final int id;
@SuppressWarnings("ConstantName")
private static final AtomicInteger sequence = new AtomicInteger(0);
private final String name;
private final String queueName;
private final SynthJobClass jobClass;
// job timing
private final long submitTime;
private final long duration;
private final long deadline;
private final int numMapTasks;
private final int numRedTasks;
private final long mapMaxMemory;
private final long reduceMaxMemory;
private final long mapMaxVcores;
private final long reduceMaxVcores;
private final long[] mapRuntime;
private final float[] reduceRuntime;
private long totMapRuntime;
private long totRedRuntime;
public SynthJob(JDKRandomGenerator rand, Configuration conf,
SynthJobClass jobClass, long actualSubmissionTime) {
this.conf = conf;
this.jobClass = jobClass;
this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS);
this.numMapTasks = jobClass.getMtasks();
this.numRedTasks = jobClass.getRtasks();
// sample memory distributions, correct for sub-minAlloc sizes
long tempMapMaxMemory = jobClass.getMapMaxMemory();
this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB
? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory;
long tempReduceMaxMemory = jobClass.getReduceMaxMemory();
this.reduceMaxMemory =
tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB
? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory;
// sample vcores distributions, correct for sub-minAlloc sizes
long tempMapMaxVCores = jobClass.getMapMaxVcores();
this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES
? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores;
long tempReduceMaxVcores = jobClass.getReduceMaxVcores();
this.reduceMaxVcores =
tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES
? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores;
if (numMapTasks > 0) {
conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory);
conf.set(MRJobConfig.MAP_JAVA_OPTS,
"-Xmx" + (this.mapMaxMemory - 100) + "m");
}
if (numRedTasks > 0) {
conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory);
conf.set(MRJobConfig.REDUCE_JAVA_OPTS,
"-Xmx" + (this.reduceMaxMemory - 100) + "m");
}
boolean hasDeadline =
(rand.nextDouble() <= jobClass.jobClass.chance_of_reservation);
LogNormalDistribution deadlineFactor =
SynthUtils.getLogNormalDist(rand, jobClass.jobClass.deadline_factor_avg,
jobClass.jobClass.deadline_factor_stddev);
double deadlineFactorSample =
(deadlineFactor != null) ? deadlineFactor.sample() : -1;
this.queueName = jobClass.workload.getQueueName();
this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS);
this.deadline =
hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS)
+ (long) Math.ceil(deadlineFactorSample * duration) : -1;
conf.set(QUEUE_NAME, queueName);
// name and initialize job randomness
final long seed = rand.nextLong();
rand.setSeed(seed);
id = sequence.getAndIncrement();
name = String.format(jobClass.getClassName() + "_%06d", id);
LOG.debug(name + " (" + seed + ")");
LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime
+ " deadline:" + deadline + " duration:" + duration
+ " deadline-submission: " + (deadline - submitTime));
// generate map and reduce runtimes
mapRuntime = new long[numMapTasks];
for (int i = 0; i < numMapTasks; i++) {
mapRuntime[i] = jobClass.getMapTimeSample();
totMapRuntime += mapRuntime[i];
}
reduceRuntime = new float[numRedTasks];
for (int i = 0; i < numRedTasks; i++) {
reduceRuntime[i] = jobClass.getReduceTimeSample();
totRedRuntime += (long) Math.ceil(reduceRuntime[i]);
}
}
public boolean hasDeadline() {
return deadline > 0;
}
@Override
public String getName() {
return name;
}
@Override
public String getUser() {
return jobClass.getUserName();
}
@Override
public JobID getJobID() {
return new JobID("job_mock_" + name, id);
}
@Override
public Values getOutcome() {
return Values.SUCCESS;
}
@Override
public long getSubmissionTime() {
return submitTime;
}
@Override
public int getNumberMaps() {
return numMapTasks;
}
@Override
public int getNumberReduces() {
return numRedTasks;
}
@Override
public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
switch (taskType) {
case MAP:
return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores);
case REDUCE:
return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores);
default:
throw new IllegalArgumentException("Not interested");
}
}
@Override
public InputSplit[] getInputSplits() {
throw new UnsupportedOperationException();
}
@Override
public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
int taskAttemptNumber) {
switch (taskType) {
case MAP:
return new MapTaskAttemptInfo(State.SUCCEEDED,
getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null);
case REDUCE:
// We assume uniform split between pull/sort/reduce
// aligned with naive progress reporting assumptions
return new ReduceTaskAttemptInfo(State.SUCCEEDED,
getTaskInfo(taskType, taskNumber),
(long) Math.round((reduceRuntime[taskNumber] / 3)),
(long) Math.round((reduceRuntime[taskNumber] / 3)),
(long) Math.round((reduceRuntime[taskNumber] / 3)), null);
default:
break;
}
throw new UnsupportedOperationException();
}
@Override
public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
int taskAttemptNumber, int locality) {
throw new UnsupportedOperationException();
}
@Override
public org.apache.hadoop.mapred.JobConf getJobConf() {
return new JobConf(conf);
}
@Override
public String getQueueName() {
return queueName;
}
@Override
public String toString() {
return "SynthJob [\n" + " workload=" + jobClass.getWorkload().getId()
+ "\n" + " jobClass="
+ jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n"
+ " conf=" + conf + ",\n" + " id=" + id + ",\n" + " name=" + name
+ ",\n" + " mapRuntime=" + Arrays.toString(mapRuntime) + ",\n"
+ " reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n"
+ " submitTime=" + submitTime + ",\n" + " numMapTasks=" + numMapTasks
+ ",\n" + " numRedTasks=" + numRedTasks + ",\n" + " mapMaxMemory="
+ mapMaxMemory + ",\n" + " reduceMaxMemory=" + reduceMaxMemory + ",\n"
+ " queueName=" + queueName + "\n" + "]";
}
public SynthJobClass getJobClass() {
return jobClass;
}
public long getTotalSlotTime() {
return totMapRuntime + totRedRuntime;
}
public long getDuration() {
return duration;
}
public long getDeadline() {
return deadline;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof SynthJob)) {
return false;
}
SynthJob o = (SynthJob) other;
return Arrays.equals(mapRuntime, o.mapRuntime)
&& Arrays.equals(reduceRuntime, o.reduceRuntime)
&& submitTime == o.submitTime && numMapTasks == o.numMapTasks
&& numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory
&& reduceMaxMemory == o.reduceMaxMemory
&& mapMaxVcores == o.mapMaxVcores
&& reduceMaxVcores == o.reduceMaxVcores && queueName.equals(o.queueName)
&& jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime
&& totRedRuntime == o.totRedRuntime;
}
@Override
public int hashCode() {
// could have a bad distr; investigate if a relevant use case exists
return jobClass.hashCode() * (int) submitTime;
}
}

View File

@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.synthetic;
import org.apache.commons.math3.distribution.AbstractRealDistribution;
import org.apache.commons.math3.distribution.LogNormalDistribution;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace;
/**
* This is a class that represent a class of Jobs. It is used to generate an
* individual job, by picking random durations, task counts, container size,
* etc.
*/
public class SynthJobClass {
private final JDKRandomGenerator rand;
private final LogNormalDistribution dur;
private final LogNormalDistribution mapRuntime;
private final LogNormalDistribution redRuntime;
private final LogNormalDistribution mtasks;
private final LogNormalDistribution rtasks;
private final LogNormalDistribution mapMem;
private final LogNormalDistribution redMem;
private final LogNormalDistribution mapVcores;
private final LogNormalDistribution redVcores;
private final Trace trace;
@SuppressWarnings("VisibilityModifier")
protected final SynthWorkload workload;
@SuppressWarnings("VisibilityModifier")
protected final JobClass jobClass;
public SynthJobClass(JDKRandomGenerator rand, Trace trace,
SynthWorkload workload, int classId) {
this.trace = trace;
this.workload = workload;
this.rand = new JDKRandomGenerator();
this.rand.setSeed(rand.nextLong());
jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId);
this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg,
jobClass.dur_stddev);
this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg,
jobClass.mtime_stddev);
this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg,
jobClass.rtime_stddev);
this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg,
jobClass.mtasks_stddev);
this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg,
jobClass.rtasks_stddev);
this.mapMem = SynthUtils.getLogNormalDist(rand, jobClass.map_max_memory_avg,
jobClass.map_max_memory_stddev);
this.redMem = SynthUtils.getLogNormalDist(rand,
jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev);
this.mapVcores = SynthUtils.getLogNormalDist(rand,
jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev);
this.redVcores = SynthUtils.getLogNormalDist(rand,
jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev);
}
public JobStory getJobStory(Configuration conf, long actualSubmissionTime) {
return new SynthJob(rand, conf, this, actualSubmissionTime);
}
@Override
public String toString() {
return "SynthJobClass [workload=" + workload.getName() + ", class="
+ jobClass.class_name + " job_count=" + jobClass.class_weight + ", dur="
+ ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime="
+ ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0)
+ ", redRuntime="
+ ((redRuntime != null) ? redRuntime.getNumericalMean() : 0)
+ ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0)
+ ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0)
+ ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n";
}
public double getClassWeight() {
return jobClass.class_weight;
}
public long getDur() {
return genLongSample(dur);
}
public int getMtasks() {
return genIntSample(mtasks);
}
public int getRtasks() {
return genIntSample(rtasks);
}
public long getMapMaxMemory() {
return genLongSample(mapMem);
}
public long getReduceMaxMemory() {
return genLongSample(redMem);
}
public long getMapMaxVcores() {
return genLongSample(mapVcores);
}
public long getReduceMaxVcores() {
return genLongSample(redVcores);
}
public SynthWorkload getWorkload() {
return workload;
}
public int genIntSample(AbstractRealDistribution dist) {
if (dist == null) {
return 0;
}
double baseSample = dist.sample();
if (baseSample < 0) {
baseSample = 0;
}
return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample));
}
public long genLongSample(AbstractRealDistribution dist) {
return dist != null ? (long) Math.ceil(dist.sample()) : 0;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof SynthJobClass)) {
return false;
}
SynthJobClass o = (SynthJobClass) other;
return workload.equals(o.workload);
}
@Override
public int hashCode() {
return workload.hashCode() * workload.getId();
}
public String getClassName() {
return jobClass.class_name;
}
public long getMapTimeSample() {
return genLongSample(mapRuntime);
}
public long getReduceTimeSample() {
return genLongSample(redRuntime);
}
public String getUserName() {
return jobClass.user_name;
}
}

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.synthetic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.codehaus.jackson.JsonParser.Feature.INTERN_FIELD_NAMES;
import static org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES;
/**
* This is a JobStoryProducer that operates from distribution of different
* workloads. The .json input file is used to determine how many jobs, which
* size, number of maps/reducers and their duration, as well as the temporal
* distributed of submissions. For each parameter we control avg and stdev, and
* generate values via normal or log-normal distributions.
*/
public class SynthTraceJobProducer implements JobStoryProducer {
@SuppressWarnings("StaticVariableName")
private static final Log LOG = LogFactory.getLog(SynthTraceJobProducer.class);
private final Configuration conf;
private final AtomicInteger numJobs;
private final Trace trace;
private final long seed;
private int totalWeight;
private final List<Double> weightList;
private final Map<Integer, SynthWorkload> workloads;
private final Queue<StoryParams> listStoryParams;
private final JDKRandomGenerator rand;
public static final String SLS_SYNTHETIC_TRACE_FILE =
"sls.synthetic" + ".trace_file";
public SynthTraceJobProducer(Configuration conf) throws IOException {
this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE)));
}
public SynthTraceJobProducer(Configuration conf, Path path)
throws IOException {
LOG.info("SynthTraceJobProducer");
this.conf = conf;
this.rand = new JDKRandomGenerator();
workloads = new HashMap<Integer, SynthWorkload>();
weightList = new ArrayList<Double>();
ObjectMapper mapper = new ObjectMapper();
mapper.configure(INTERN_FIELD_NAMES, true);
mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
FileSystem ifs = path.getFileSystem(conf);
FSDataInputStream fileIn = ifs.open(path);
this.trace = mapper.readValue(fileIn, Trace.class);
seed = trace.rand_seed;
rand.setSeed(seed);
this.numJobs = new AtomicInteger(trace.num_jobs);
for (int workloadId = 0; workloadId < trace.workloads
.size(); workloadId++) {
SynthWorkload workload = new SynthWorkload(workloadId, trace);
for (int classId =
0; classId < trace.workloads.get(workloadId).job_classes
.size(); classId++) {
SynthJobClass cls = new SynthJobClass(rand, trace, workload, classId);
workload.add(cls);
}
workloads.put(workloadId, workload);
}
for (int i = 0; i < workloads.size(); i++) {
double w = workloads.get(i).getWorkloadWeight();
totalWeight += w;
weightList.add(w);
}
// create priority queue to keep start-time sorted
listStoryParams =
new PriorityQueue<StoryParams>(10, new Comparator<StoryParams>() {
@Override
public int compare(StoryParams o1, StoryParams o2) {
return Math
.toIntExact(o2.actualSubmissionTime - o1.actualSubmissionTime);
}
});
// initialize it
createStoryParams();
LOG.info("Generated " + listStoryParams.size() + " deadlines for "
+ this.numJobs.get() + " jobs ");
}
public long getSeed() {
return seed;
}
public int getNodesPerRack() {
return trace.nodes_per_rack;
}
public int getNumNodes() {
return trace.num_nodes;
}
/**
* Class used to parse a trace configuration file.
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
@XmlRootElement
public static class Trace {
@JsonProperty("description")
String description;
@JsonProperty("num_nodes")
int num_nodes;
@JsonProperty("nodes_per_rack")
int nodes_per_rack;
@JsonProperty("num_jobs")
int num_jobs;
// in sec (selects a portion of time_distribution
@JsonProperty("rand_seed")
long rand_seed;
@JsonProperty("workloads")
List<Workload> workloads;
}
/**
* Class used to parse a workload from file.
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
public static class Workload {
@JsonProperty("workload_name")
String workload_name;
// used to change probability this workload is picked for each job
@JsonProperty("workload_weight")
double workload_weight;
@JsonProperty("queue_name")
String queue_name;
@JsonProperty("job_classes")
List<JobClass> job_classes;
@JsonProperty("time_distribution")
List<TimeSample> time_distribution;
}
/**
* Class used to parse a job class from file.
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
public static class JobClass {
@JsonProperty("class_name")
String class_name;
@JsonProperty("user_name")
String user_name;
// used to change probability this class is chosen
@JsonProperty("class_weight")
double class_weight;
// reservation related params
@JsonProperty("chance_of_reservation")
double chance_of_reservation;
@JsonProperty("deadline_factor_avg")
double deadline_factor_avg;
@JsonProperty("deadline_factor_stddev")
double deadline_factor_stddev;
// durations in sec
@JsonProperty("dur_avg")
double dur_avg;
@JsonProperty("dur_stddev")
double dur_stddev;
@JsonProperty("mtime_avg")
double mtime_avg;
@JsonProperty("mtime_stddev")
double mtime_stddev;
@JsonProperty("rtime_avg")
double rtime_avg;
@JsonProperty("rtime_stddev")
double rtime_stddev;
// number of tasks
@JsonProperty("mtasks_avg")
double mtasks_avg;
@JsonProperty("mtasks_stddev")
double mtasks_stddev;
@JsonProperty("rtasks_avg")
double rtasks_avg;
@JsonProperty("rtasks_stddev")
double rtasks_stddev;
// memory in MB
@JsonProperty("map_max_memory_avg")
long map_max_memory_avg;
@JsonProperty("map_max_memory_stddev")
double map_max_memory_stddev;
@JsonProperty("reduce_max_memory_avg")
long reduce_max_memory_avg;
@JsonProperty("reduce_max_memory_stddev")
double reduce_max_memory_stddev;
// vcores
@JsonProperty("map_max_vcores_avg")
long map_max_vcores_avg;
@JsonProperty("map_max_vcores_stddev")
double map_max_vcores_stddev;
@JsonProperty("reduce_max_vcores_avg")
long reduce_max_vcores_avg;
@JsonProperty("reduce_max_vcores_stddev")
double reduce_max_vcores_stddev;
}
/**
* This is used to define time-varying probability of a job start-time (e.g.,
* to simulate daily patterns).
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
public static class TimeSample {
// in sec
@JsonProperty("time")
int time;
@JsonProperty("weight")
double jobs;
}
static class StoryParams {
private SynthJobClass pickedJobClass;
private long actualSubmissionTime;
StoryParams(SynthJobClass pickedJobClass, long actualSubmissionTime) {
this.pickedJobClass = pickedJobClass;
this.actualSubmissionTime = actualSubmissionTime;
}
}
void createStoryParams() {
for (int i = 0; i < numJobs.get(); i++) {
int workload = SynthUtils.getWeighted(weightList, rand);
SynthWorkload pickedWorkload = workloads.get(workload);
long jobClass =
SynthUtils.getWeighted(pickedWorkload.getWeightList(), rand);
SynthJobClass pickedJobClass =
pickedWorkload.getClassList().get((int) jobClass);
long actualSubmissionTime = pickedWorkload.getBaseSubmissionTime(rand);
// long actualSubmissionTime = (i + 1) * 10;
listStoryParams
.add(new StoryParams(pickedJobClass, actualSubmissionTime));
}
}
@Override
public JobStory getNextJob() throws IOException {
if (numJobs.decrementAndGet() < 0) {
return null;
}
StoryParams storyParams = listStoryParams.poll();
return storyParams.pickedJobClass.getJobStory(conf,
storyParams.actualSubmissionTime);
}
@Override
public void close() {
}
@Override
public String toString() {
return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs
+ ", weightList=" + weightList + ", r=" + rand + ", totalWeight="
+ totalWeight + ", workloads=" + workloads + "]";
}
public int getNumJobs() {
return trace.num_jobs;
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.synthetic;
import org.apache.commons.math3.distribution.LogNormalDistribution;
import org.apache.commons.math3.distribution.NormalDistribution;
import org.apache.commons.math3.random.JDKRandomGenerator;
import java.util.Collection;
import java.util.Random;
/**
* Utils for the Synthetic generator.
*/
public final class SynthUtils {
private SynthUtils(){
//class is not meant to be instantiated
}
public static int getWeighted(Collection<Double> weights, Random rr) {
double totalWeight = 0;
for (Double i : weights) {
totalWeight += i;
}
double rand = rr.nextDouble() * totalWeight;
double cur = 0;
int ind = 0;
for (Double i : weights) {
cur += i;
if (cur > rand) {
break;
}
ind++;
}
return ind;
}
public static NormalDistribution getNormalDist(JDKRandomGenerator rand,
double average, double stdDev) {
if (average <= 0) {
return null;
}
// set default for missing param
if (stdDev == 0) {
stdDev = average / 6;
}
NormalDistribution ret = new NormalDistribution(average, stdDev,
NormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY);
ret.reseedRandomGenerator(rand.nextLong());
return ret;
}
public static LogNormalDistribution getLogNormalDist(JDKRandomGenerator rand,
double mean, double stdDev) {
if (mean <= 0) {
return null;
}
// set default for missing param
if (stdDev == 0) {
stdDev = mean / 6;
}
// derive lognormal parameters for X = LogNormal(mu, sigma)
// sigma^2 = ln (1+Var[X]/(E[X])^2)
// mu = ln(E[X]) - 1/2 * sigma^2
double var = stdDev * stdDev;
double sigmasq = Math.log1p(var / (mean * mean));
double sigma = Math.sqrt(sigmasq);
double mu = Math.log(mean) - 0.5 * sigmasq;
LogNormalDistribution ret = new LogNormalDistribution(mu, sigma,
LogNormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY);
ret.reseedRandomGenerator(rand.nextLong());
return ret;
}
}

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.synthetic;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace;
import java.util.*;
/**
* This class represent a workload (made up of multiple SynthJobClass(es)). It
* also stores the temporal distributions of jobs in this workload.
*/
public class SynthWorkload {
private final int id;
private final List<SynthJobClass> classList;
private final Trace trace;
private final SortedMap<Integer, Double> timeWeights;
public SynthWorkload(int identifier, Trace inTrace) {
classList = new ArrayList<SynthJobClass>();
this.id = identifier;
this.trace = inTrace;
timeWeights = new TreeMap<Integer, Double>();
for (SynthTraceJobProducer.TimeSample ts : trace.workloads
.get(id).time_distribution) {
timeWeights.put(ts.time, ts.jobs);
}
}
public boolean add(SynthJobClass s) {
return classList.add(s);
}
public List<Double> getWeightList() {
ArrayList<Double> ret = new ArrayList<Double>();
for (SynthJobClass s : classList) {
ret.add(s.getClassWeight());
}
return ret;
}
public int getId() {
return id;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof SynthWorkload)) {
return false;
}
// assume ID determines job classes by construction
return getId() == ((SynthWorkload) other).getId();
}
@Override
public int hashCode() {
return getId();
}
@Override
public String toString() {
return "SynthWorkload " + trace.workloads.get(id).workload_name + "[\n"
+ classList + "]\n";
}
public String getName() {
return trace.workloads.get(id).workload_name;
}
public double getWorkloadWeight() {
return trace.workloads.get(id).workload_weight;
}
public String getQueueName() {
return trace.workloads.get(id).queue_name;
}
public long getBaseSubmissionTime(Random rand) {
// pick based on weights the "bucket" for this start time
int position = SynthUtils.getWeighted(timeWeights.values(), rand);
int[] time = new int[timeWeights.keySet().size()];
int index = 0;
for (Integer i : timeWeights.keySet()) {
time[index++] = i;
}
// uniformly pick a time between start and end time of this bucket
int startRange = time[position];
int endRange = startRange;
// if there is no subsequent bucket pick startRange
if (position < timeWeights.keySet().size() - 1) {
endRange = time[position + 1];
return startRange + rand.nextInt((endRange - startRange));
} else {
return startRange;
}
}
public List<SynthJobClass> getClassList() {
return classList;
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Classes comprising the synthetic load generator for SLS.
*/
package org.apache.hadoop.yarn.sls.synthetic;

View File

@ -149,4 +149,13 @@ public static Set<String> parseNodesFromNodeFile(String nodeFile)
}
return nodeSet;
}
public static Set<? extends String> generateNodesFromSynth(
int numNodes, int nodesPerRack) {
Set<String> nodeSet = new HashSet<String>();
for (int i = 0; i < numNodes; i++) {
nodeSet.add("/rack" + i % nodesPerRack + "/node" + i);
}
return nodeSet;
}
}

View File

@ -27,9 +27,11 @@ Yarn Scheduler Load Simulator (SLS)
* [Metrics](#Metrics)
* [Real-time Tracking](#Real-time_Tracking)
* [Offline Analysis](#Offline_Analysis)
* [Synthetic Load Generator](#SynthGen)
* [Appendix](#Appendix)
* [Resources](#Resources)
* [SLS JSON input file format](#SLS_JSON_input_file_format)
* [SYNTH JSON input file format](#SYNTH_JSON_input_file_format)
* [Simulator input topology file format](#Simulator_input_topology_file_format)
Overview
@ -72,7 +74,7 @@ The following figure illustrates the implementation architecture of the simulato
![The architecture of the simulator](images/sls_arch.png)
The simulator takes input of workload traces, and fetches the cluster and applications information. For each NM and AM, the simulator builds a simulator to simulate their running. All NM/AM simulators run in a thread pool. The simulator reuses Yarn Resource Manager, and builds a wrapper out of the scheduler. The Scheduler Wrapper can track the scheduler behaviors and generates several logs, which are the outputs of the simulator and can be further analyzed.
The simulator takes input of workload traces, or synthetic load distributions and generaters the cluster and applications information. For each NM and AM, the simulator builds a simulator to simulate their running. All NM/AM simulators run in a thread pool. The simulator reuses Yarn Resource Manager, and builds a wrapper out of the scheduler. The Scheduler Wrapper can track the scheduler behaviors and generates several logs, which are the outputs of the simulator and can be further analyzed.
### Usecases
@ -179,17 +181,30 @@ The simulator supports two types of input files: the rumen traces and its own in
$ cd $HADOOP_ROOT/share/hadoop/tools/sls
$ bin/slsrun.sh
--input-rumen |--input-sls=<TRACE_FILE1,TRACE_FILE2,...>
--output-dir=<SLS_SIMULATION_OUTPUT_DIRECTORY> [--nodes=<SLS_NODES_FILE>]
[--track-jobs=<JOBID1,JOBID2,...>] [--print-simulation]
Usage: slsrun.sh <OPTIONS>
--tracetype=<SYNTH | SLS | RUMEN>
--tracelocation=<FILE1,FILE2,...>
(deprecated --input-rumen=<FILE1,FILE2,...> | --input-sls=<FILE1,FILE2,...>)
--output-dir=<SLS_SIMULATION_OUTPUT_DIRECTORY>
[--nodes=<SLS_NODES_FILE>]
[--track-jobs=<JOBID1,JOBID2,...>]
[--print-simulation]
* `--input-rumen`: The input rumen trace files. Users can input multiple
files, separated by comma. One example trace is provided in
`$HADOOP_ROOT/share/hadoop/tools/sls/sample-data/2jobs2min-rumen-jh.json`.
This is equivalent to `--tracetype=RUMEN --tracelocation=<path_to_trace>`.
* `--input-sls`: Simulator its own file format. The simulator also
provides a tool to convert rumen traces to sls traces (`rumen2sls.sh`).
Refer to appendix for an example of sls input json file.
This is equivalent to `--tracetype=SLS --tracelocation=<path_to_trace>`.
* `--tracetype`: This is the new way to configure the trace generation and
takes values RUMEN, SLS, or SYNTH, to trigger the three type of load generation
* `--tracelocation`: Path to the input file, matching the tracetype above.
* `--output-dir`: The output directory for generated running logs and
metrics.
@ -279,12 +294,34 @@ After the simulator finishes, all logs are saved in the output directory specifi
* Folder `metrics`: logs generated by the Metrics.
Users can also reproduce those real-time tracking charts in offline mode. Just upload the `realtimetrack.json` to `$HADOOP_ROOT/share/hadoop/tools/sls/html/showSimulationTrace.html`. For browser security problem, need to put files `realtimetrack.json` and `showSimulationTrace.html` in the same directory.
Synthetic Load Generator
------------------------
The Synthetic Load Generator complements the extensive nature of SLS-native and RUMEN traces, by providing a
distribution-driven generation of load. The load generator is organized as a JobStoryProducer
(compatible with rumen, and thus gridmix for later integration). We seed the Random number generator so
that results randomized but deterministic---hence reproducible.
We organize the jobs being generated around */workloads/job_class* hierarchy, which allow to easily
group jobs with similar behaviors and categorize them (e.g., jobs with long running containers, or maponly
computations, etc..). The user can control average and standard deviations for many of the
important parameters, such as number of mappers/reducers, duration of mapper/reducers, size
(mem/cpu) of containers, chance of reservation, etc. We use weighted-random sampling (whenever we
pick among a small number of options) or LogNormal distributions (to avoid negative values) when we
pick from wide ranges of values---see appendix on LogNormal distributions.
The SYNTH mode of SLS is very convenient to generate very large loads without the need for extensive input
files. This allows to easily explore wide range of use cases (e.g., imagine simulating 100k jobs, and in different
runs simply tune the average number of mappers, or average task duration), in an efficient and compact way.
Appendix
--------
### Resources
[YARN-1021](https://issues.apache.org/jira/browse/YARN-1021) is the main JIRA that introduces Yarn Scheduler Load Simulator to Hadoop Yarn project.
[YARN-6363](https://issues.apache.org/jira/browse/YARN-6363) is the main JIRA that introduces the Synthetic Load Generator to SLS.
### SLS JSON input file format
@ -339,6 +376,77 @@ Here we provide an example format of the sls json file, which contains 2 jobs. T
} ]
}
### SYNTH JSON input file format
Here we provide an example format of the synthetic generator json file. We use *(json-non-conforming)* inline comments to explain the use of each parameter.
{
"description" : "tiny jobs workload", //description of the meaning of this collection of workloads
"num_nodes" : 10, //total nodes in the simulated cluster
"nodes_per_rack" : 4, //number of nodes in each simulated rack
"num_jobs" : 10, // total number of jobs being simulated
"rand_seed" : 2, //the random seed used for deterministic randomized runs
// a list of “workloads”, each of which has job classes, and temporal properties
"workloads" : [
{
"workload_name" : "tiny-test", // name of the workload
"workload_weight": 0.5, // used for weighted random selection of which workload to sample from
"queue_name" : "sls_queue_1", //queue the job will be submitted to
//different classes of jobs for this workload
"job_classes" : [
{
"class_name" : "class_1", //name of the class
"class_weight" : 1.0, //used for weighted random selection of class within workload
//nextr group controls average and standard deviation of a LogNormal distribution that
//determines the number of mappers and reducers for thejob.
"mtasks_avg" : 5,
"mtasks_stddev" : 1,
"rtasks_avg" : 5,
"rtasks_stddev" : 1,
//averge and stdev input param of LogNormal distribution controlling job duration
"dur_avg" : 60,
"dur_stddev" : 5,
//averge and stdev input param of LogNormal distribution controlling mappers and reducers durations
"mtime_avg" : 10,
"mtime_stddev" : 2,
"rtime_avg" : 20,
"rtime_stddev" : 4,
//averge and stdev input param of LogNormal distribution controlling memory and cores for map and reduce
"map_max_memory_avg" : 1024,
"map_max_memory_stddev" : 0.001,
"reduce_max_memory_avg" : 2048,
"reduce_max_memory_stddev" : 0.001,
"map_max_vcores_avg" : 1,
"map_max_vcores_stddev" : 0.001,
"reduce_max_vcores_avg" : 2,
"reduce_max_vcores_stddev" : 0.001,
//probability of running this job with a reservation
"chance_of_reservation" : 0.5,
//input parameters of LogNormal distribution that determines the deadline slack (as a multiplier of job duration)
"deadline_factor_avg" : 10.0,
"deadline_factor_stddev" : 0.001,
}
],
// for each workload determines with what probability each time bucket is picked to choose the job starttime.
// In the example below the jobs have twice as much chance to start in the first minute than in the second minute
// of simulation, and then zero chance thereafter.
"time_distribution" : [
{ "time" : 1, "weight" : 66 },
{ "time" : 60, "weight" : 33 },
{ "time" : 120, "jobs" : 0 }
]
}
]
}
### Simulator input topology file format
Here is an example input topology file which has 3 nodes organized in 1 rack.
@ -353,3 +461,9 @@ Here is an example input topology file which has 3 nodes organized in 1 rack.
"node" : "node3"
}]
}
### Notes on LogNormal distribution:
LogNormal distributions represent well many of the parameters we see in practice (e.g., most jobs have
a small number of mappers, but few might be very large, and few very small, but greater than zero. It is
however worth noticing that it might be tricky to use, as the average is typically on the right side of the
peak (most common value) of the distribution, because the distribution has a one-side tail.

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls;
import net.jcip.annotations.NotThreadSafe;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
/**
* This is a base class to ease the implementation of SLS-based tests.
*/
@RunWith(value = Parameterized.class)
@NotThreadSafe
@SuppressWarnings("VisibilityModifier")
public class BaseSLSRunnerTest {
@Parameter(value = 0)
public String schedulerType;
@Parameter(value = 1)
public String traceType;
@Parameter(value = 2)
public String traceLocation;
@Parameter(value = 3)
public String nodeFile;
protected SLSRunner sls;
@After
public void tearDown() throws InterruptedException {
sls.stop();
}
public void runSLS(Configuration conf, long timeout) throws Exception {
File tempDir = new File("target", UUID.randomUUID().toString());
final List<Throwable> exceptionList =
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.setDefaultUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
e.printStackTrace();
exceptionList.add(e);
}
});
// start the simulator
File slsOutputDir = new File(tempDir.getAbsolutePath() + "/slsoutput/");
String[] args;
switch (traceType) {
case "OLD_SLS":
args = new String[] {"-inputsls", traceLocation, "-output",
slsOutputDir.getAbsolutePath()};
break;
case "OLD_RUMEN":
args = new String[] {"-inputrumen", traceLocation, "-output",
slsOutputDir.getAbsolutePath()};
break;
default:
args = new String[] {"-tracetype", traceType, "-tracelocation",
traceLocation, "-output", slsOutputDir.getAbsolutePath()};
}
if (nodeFile != null) {
args = ArrayUtils.addAll(args, new String[] {"-nodes", nodeFile});
}
conf.set(YarnConfiguration.RM_SCHEDULER, schedulerType);
sls = new SLSRunner(conf);
sls.run(args);
// wait for timeout seconds before stop, unless there is an uncaught
// exception in which
// case fail fast.
while (timeout >= 0) {
Thread.sleep(1000);
if (!exceptionList.isEmpty()) {
sls.stop();
Assert.fail("TestSLSRunner catched exception from child thread "
+ "(TaskRunner.Task): " + exceptionList);
break;
}
timeout--;
}
}
}

View File

@ -18,53 +18,67 @@
package org.apache.hadoop.yarn.sls;
import org.junit.Assert;
import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.*;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.*;
public class TestSLSRunner {
/**
* This test performs simple runs of the SLS with different trace types and
* schedulers.
*/
@RunWith(value = Parameterized.class)
@NotThreadSafe
public class TestSLSRunner extends BaseSLSRunnerTest {
@Test
@Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})")
public static Collection<Object[]> data() {
String capScheduler =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler."
+ "capacity.CapacityScheduler";
String fairScheduler =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler."
+ "fair.FairScheduler";
String slsTraceFile = "src/test/resources/inputsls.json";
String rumenTraceFile = "src/main/data/2jobs2min-rumen-jh.json";
String synthTraceFile = "src/test/resources/syn.json";
String nodeFile = "src/test/resources/nodes.json";
// Test with both schedulers, and all three load producers.
return Arrays.asList(new Object[][] {
// covering old commandline in tests
{capScheduler, "OLD_RUMEN", rumenTraceFile, nodeFile },
{capScheduler, "OLD_SLS", slsTraceFile, nodeFile },
// covering the no nodeFile case
{capScheduler, "SYNTH", synthTraceFile, null },
{capScheduler, "RUMEN", rumenTraceFile, null },
{capScheduler, "SLS", slsTraceFile, null },
// covering new commandline and CapacityScheduler
{capScheduler, "SYNTH", synthTraceFile, nodeFile },
{capScheduler, "RUMEN", rumenTraceFile, nodeFile },
{capScheduler, "SLS", slsTraceFile, nodeFile },
// covering FairScheduler
{fairScheduler, "SYNTH", synthTraceFile, nodeFile },
{fairScheduler, "RUMEN", rumenTraceFile, nodeFile },
{fairScheduler, "SLS", slsTraceFile, nodeFile }
});
}
@Test(timeout = 60000)
@SuppressWarnings("all")
public void testSimulatorRunning() throws Exception {
File tempDir = new File("target", UUID.randomUUID().toString());
final List<Throwable> exceptionList =
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
exceptionList.add(e);
}
});
// start the simulator
File slsOutputDir = new File(tempDir.getAbsolutePath() + "/slsoutput/");
String args[] = new String[]{
"-inputrumen", "src/main/data/2jobs2min-rumen-jh.json",
"-output", slsOutputDir.getAbsolutePath()};
SLSRunner.main(args);
// wait for 20 seconds before stop
int count = 20;
while (count >= 0) {
Thread.sleep(1000);
if (! exceptionList.isEmpty()) {
SLSRunner.getRunner().stop();
Assert.fail("TestSLSRunner catched exception from child thread " +
"(TaskRunner.Task): " + exceptionList.get(0).getMessage());
break;
}
count--;
}
SLSRunner.getRunner().stop();
Configuration conf = new Configuration(false);
long timeTillShutdownInsec = 20L;
runSLS(conf, timeTillShutdownInsec);
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
import org.apache.hadoop.yarn.sls.synthetic.SynthJob;
import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertTrue;
/**
* Simple test class driving the {@code SynthTraceJobProducer}, and validating
* jobs produce are within expected range.
*/
public class TestSynthJobGeneration {
public final static Logger LOG =
Logger.getLogger(TestSynthJobGeneration.class);
@Test
public void test() throws IllegalArgumentException, IOException {
Configuration conf = new Configuration();
conf.set(SynthTraceJobProducer.SLS_SYNTHETIC_TRACE_FILE,
"src/test/resources/syn.json");
SynthTraceJobProducer stjp = new SynthTraceJobProducer(conf);
SynthJob js = (SynthJob) stjp.getNextJob();
int jobCount = 0;
while (js != null) {
LOG.info((jobCount++) + " " + js.getQueueName() + " -- "
+ js.getJobClass().getClassName() + " (conf: "
+ js.getJobConf().get(MRJobConfig.QUEUE_NAME) + ") " + " submission: "
+ js.getSubmissionTime() + ", " + " duration: " + js.getDuration()
+ " numMaps: " + js.getNumberMaps() + " numReduces: "
+ js.getNumberReduces());
validateJob(js);
js = (SynthJob) stjp.getNextJob();
}
Assert.assertEquals(stjp.getNumJobs(), jobCount);
}
private void validateJob(SynthJob js) {
assertTrue(js.getSubmissionTime() > 0);
assertTrue(js.getDuration() > 0);
assertTrue(js.getNumberMaps() >= 0);
assertTrue(js.getNumberReduces() >= 0);
assertTrue(js.getNumberMaps() + js.getNumberReduces() > 0);
assertTrue(js.getTotalSlotTime() >= 0);
for (int i = 0; i < js.getNumberMaps(); i++) {
TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.MAP, i, 0);
assertTrue(tai.getRuntime() > 0);
}
for (int i = 0; i < js.getNumberReduces(); i++) {
TaskAttemptInfo tai = js.getTaskAttemptInfo(TaskType.REDUCE, i, 0);
assertTrue(tai.getRuntime() > 0);
}
if (js.hasDeadline()) {
assertTrue(js.getDeadline() > js.getSubmissionTime() + js.getDuration());
}
}
}

View File

@ -134,7 +134,7 @@ public void testAMSimulator() throws Exception {
String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>();
app.init(1, 1000, containers, rm, null, 0, 1000000L, "user1", queue,
true, appId);
true, appId, null, 0);
app.firstStep();
verifySchedulerMetrics(appId);

View File

@ -35,7 +35,7 @@ public void setUp() {
}
@After
public void cleanUp() {
public void cleanUp() throws InterruptedException {
runner.stop();
}

View File

@ -38,6 +38,16 @@
<value>100</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.sls_queue_1.reservable</name>
<value>true</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.sls_queue_1.show-reservations-as-queues</name>
<value>true</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.sls_queue_2.capacity</name>
<value>25</value>

View File

@ -21,6 +21,7 @@
-->
<allocations>
<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>
<user name="jenkins">
<!-- Limit on running jobs for the user across all pools. If more
jobs than this are submitted, only the first <maxRunningJobs> will
@ -31,20 +32,21 @@
<userMaxAppsDefault>1000</userMaxAppsDefault>
<queue name="sls_queue_1">
<minResources>1024 mb, 1 vcores</minResources>
<schedulingMode>fair</schedulingMode>
<schedulingPolicy>drf</schedulingPolicy>
<weight>0.25</weight>
<minSharePreemptionTimeout>2</minSharePreemptionTimeout>
<reservation>true</reservation>
</queue>
<queue name="sls_queue_2">
<minResources>1024 mb, 1 vcores</minResources>
<schedulingMode>fair</schedulingMode>
<schedulingMode>drf</schedulingMode>
<weight>0.25</weight>
<minSharePreemptionTimeout>2</minSharePreemptionTimeout>
</queue>
<queue name="sls_queue_3">
<minResources>1024 mb, 1 vcores</minResources>
<weight>0.5</weight>
<schedulingMode>fair</schedulingMode>
<schedulingMode>drf</schedulingMode>
<minSharePreemptionTimeout>2</minSharePreemptionTimeout>
</queue>
</allocations>

View File

@ -0,0 +1,55 @@
{
"am.type": "mapreduce",
"job.start.ms": 0,
"job.end.ms": 95375,
"job.queue.name": "sls_queue_1",
"job.id": "job_1",
"job.user": "default",
"job.tasks": [
{
"container.host": "/default-rack/node1",
"container.start.ms": 6664,
"container.end.ms": 23707,
"container.priority": 20,
"container.type": "map"
},
{
"container.host": "/default-rack/node3",
"container.start.ms": 6665,
"container.end.ms": 21593,
"container.priority": 20,
"container.type": "map"
},
{
"container.host": "/default-rack/node2",
"container.start.ms": 68770,
"container.end.ms": 86613,
"container.priority": 20,
"container.type": "map"
}
]
}
{
"am.type": "mapreduce",
"job.start.ms": 105204,
"job.end.ms": 197256,
"job.queue.name": "sls_queue_2",
"job.id": "job_2",
"job.user": "default",
"job.tasks": [
{
"container.host": "/default-rack/node1",
"container.start.ms": 111822,
"container.end.ms": 133985,
"container.priority": 20,
"container.type": "map"
},
{
"container.host": "/default-rack/node2",
"container.start.ms": 111788,
"container.end.ms": 131377,
"container.priority": 20,
"container.type": "map"
}
]
}

View File

@ -0,0 +1,84 @@
{
"rack": "rack1",
"nodes": [
{
"node": "node1"
},
{
"node": "node2"
},
{
"node": "node3"
},
{
"node": "node4"
}
]
}
{
"rack": "rack2",
"nodes": [
{
"node": "node5"
},
{
"node": "node6"
},
{
"node": "node7"
},
{
"node": "node8"
}
]
}
{
"rack": "rack3",
"nodes": [
{
"node": "node9"
},
{
"node": "node10"
},
{
"node": "node11"
},
{
"node": "node12"
}
]
}
{
"rack": "rack4",
"nodes": [
{
"node": "node13"
},
{
"node": "node14"
},
{
"node": "node15"
},
{
"node": "node16"
}
]
}
{
"rack": "rack5",
"nodes": [
{
"node": "node17"
},
{
"node": "node18"
},
{
"node": "node19"
},
{
}
]
}

View File

@ -25,11 +25,11 @@
<!-- Nodes configuration -->
<property>
<name>yarn.sls.nm.memory.mb</name>
<value>10240</value>
<value>100240</value>
</property>
<property>
<name>yarn.sls.nm.vcores</name>
<value>10</value>
<value>100</value>
</property>
<property>
<name>yarn.sls.nm.heartbeat.interval.ms</name>
@ -77,5 +77,5 @@
<name>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</name>
<value>org.apache.hadoop.yarn.sls.scheduler.CapacitySchedulerMetrics</value>
</property>
</configuration>

View File

@ -0,0 +1,53 @@
{
"description": "tiny jobs workload",
"num_nodes": 20,
"nodes_per_rack": 4,
"num_jobs": 10,
"rand_seed": 2,
"workloads": [
{
"workload_name": "tiny-test",
"workload_weight": 0.5,
"description": "Sort jobs",
"queue_name": "sls_queue_1",
"job_classes": [
{
"class_name": "class_1",
"user_name": "foobar",
"class_weight": 1.0,
"mtasks_avg": 5,
"mtasks_stddev": 1,
"rtasks_avg": 5,
"rtasks_stddev": 1,
"dur_avg": 60,
"dur_stddev": 5,
"mtime_avg": 10,
"mtime_stddev": 2,
"rtime_avg": 20,
"rtime_stddev": 4,
"map_max_memory_avg": 1024,
"map_max_memory_stddev": 0.001,
"reduce_max_memory_avg": 2048,
"reduce_max_memory_stddev": 0.001,
"map_max_vcores_avg": 1,
"map_max_vcores_stddev": 0.001,
"reduce_max_vcores_avg": 2,
"reduce_max_vcores_stddev": 0.001,
"chance_of_reservation": 0.5,
"deadline_factor_avg": 10.0,
"deadline_factor_stddev": 0.001
}
],
"time_distribution": [
{
"time": 1,
"weight": 100
},
{
"time": 60,
"jobs": 0
}
]
}
]
}

View File

@ -17,7 +17,7 @@
<configuration>
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
<!-- <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value> -->
</property>
@ -79,4 +79,12 @@
<name>yarn.scheduler.fair.assignmultiple</name>
<value>true</value>
</property>
<property>
<description>Enable reservation system.</description>
<name>yarn.resourcemanager.reservation-system.enable</name>
<value>true</value>
</property>
</configuration>