YARN-6522. Make SLS JSON input file format simple and scalable (yufeigu via rkanter)

This commit is contained in:
Robert Kanter 2017-05-04 17:21:46 -07:00
parent 07761af357
commit 3082552b3b
6 changed files with 181 additions and 70 deletions

View File

@ -119,6 +119,9 @@ public class SLSRunner extends Configured implements Tool {
// logger
public final static Logger LOG = Logger.getLogger(SLSRunner.class);
private final static int DEFAULT_MAPPER_PRIORITY = 20;
private final static int DEFAULT_REDUCER_PRIORITY = 10;
/**
* The type of trace in input.
*/
@ -247,8 +250,8 @@ private void startNM() throws YarnException, IOException {
break;
case SYNTH:
stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
nodeSet.addAll(SLSUtils.generateNodesFromSynth(stjp.getNumNodes(),
stjp.getNodesPerRack()));
nodeSet.addAll(SLSUtils.generateNodes(stjp.getNumNodes(),
stjp.getNumNodes()/stjp.getNodesPerRack()));
break;
default:
throw new YarnException("Input configuration not recognized, "
@ -259,6 +262,10 @@ private void startNM() throws YarnException, IOException {
nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(nodeFile));
}
if (nodeSet.size() == 0) {
throw new YarnException("No node! Please configure nodes.");
}
// create NM simulators
Random random = new Random();
Set<String> rackSet = new HashSet<String>();
@ -348,7 +355,11 @@ private void startAMFromSLSTrace(String inputTrace) throws IOException {
private void createAMForJob(Map jsonJob) throws YarnException {
long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString());
long jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString());
long jobFinishTime = 0;
if (jsonJob.containsKey("job.end.ms")) {
jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString());
}
String user = (String) jsonJob.get("job.user");
if (user == null) {
@ -358,25 +369,49 @@ private void createAMForJob(Map jsonJob) throws YarnException {
String queue = jsonJob.get("job.queue.name").toString();
increaseQueueAppNum(queue);
String oldAppId = jsonJob.get("job.id").toString();
String oldAppId = (String)jsonJob.get("job.id");
if (oldAppId == null) {
oldAppId = Integer.toString(AM_ID);
}
// tasks
String amType = (String)jsonJob.get("am.type");
if (amType == null) {
amType = SLSUtils.DEFAULT_JOB_TYPE;
}
runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
getTaskContainers(jsonJob), null);
}
private List<ContainerSimulator> getTaskContainers(Map jsonJob)
throws YarnException {
List<ContainerSimulator> containers = new ArrayList<>();
List tasks = (List) jsonJob.get("job.tasks");
if (tasks == null || tasks.size() == 0) {
throw new YarnException("No task for the job!");
}
List<ContainerSimulator> containerList = new ArrayList<>();
for (Object o : tasks) {
Map jsonTask = (Map) o;
String hostname = jsonTask.get("container.host").toString();
long taskStart = Long.parseLong(jsonTask.get("container.start.ms")
.toString());
long taskFinish = Long.parseLong(jsonTask.get("container.end.ms")
.toString());
long lifeTime = taskFinish - taskStart;
// Set memory and vcores from job trace file
String hostname = (String) jsonTask.get("container.host");
long duration = 0;
if (jsonTask.containsKey("duration.ms")) {
duration = Integer.parseInt(jsonTask.get("duration.ms").toString());
} else if (jsonTask.containsKey("container.start.ms") &&
jsonTask.containsKey("container.end.ms")) {
long taskStart = Long.parseLong(jsonTask.get("container.start.ms")
.toString());
long taskFinish = Long.parseLong(jsonTask.get("container.end.ms")
.toString());
duration = taskFinish - taskStart;
}
if (duration <= 0) {
throw new YarnException("Duration of a task shouldn't be less or equal"
+ " to 0!");
}
Resource res = getDefaultContainerResource();
if (jsonTask.containsKey("container.memory")) {
int containerMemory =
@ -390,17 +425,30 @@ private void createAMForJob(Map jsonJob) throws YarnException {
res.setVirtualCores(containerVCores);
}
int priority = Integer.parseInt(jsonTask.get("container.priority")
.toString());
String type = jsonTask.get("container.type").toString();
containerList.add(
new ContainerSimulator(res, lifeTime, hostname, priority, type));
int priority = DEFAULT_MAPPER_PRIORITY;
if (jsonTask.containsKey("container.priority")) {
priority = Integer.parseInt(jsonTask.get("container.priority")
.toString());
}
String type = "map";
if (jsonTask.containsKey("container.type")) {
type = jsonTask.get("container.type").toString();
}
int count = 1;
if (jsonTask.containsKey("count")) {
count = Integer.parseInt(jsonTask.get("count").toString());
}
count = Math.max(count, 1);
for (int i = 0; i < count; i++) {
containers.add(
new ContainerSimulator(res, duration, hostname, priority, type));
}
}
// create a new AM
String amType = jsonJob.get("am.type").toString();
runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
containerList, null);
return containers;
}
/**
@ -463,7 +511,7 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs)
taskAttempt.getStartTime();
containerList.add(
new ContainerSimulator(getDefaultContainerResource(),
containerLifeTime, hostname, 10, "map"));
containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
}
// reducer
@ -479,7 +527,7 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs)
taskAttempt.getStartTime();
containerList.add(
new ContainerSimulator(getDefaultContainerResource(),
containerLifeTime, hostname, 20, "reduce"));
containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
}
// Only supports the default job type currently
@ -559,7 +607,7 @@ private void startAMFromSynthGenerator() throws YarnException, IOException {
Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
(int) tai.getTaskInfo().getTaskVCores());
containerList.add(new ContainerSimulator(containerResource,
containerLifeTime, hostname, 10, "map"));
containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
maxMapRes = Resources.componentwiseMax(maxMapRes, containerResource);
maxMapDur =
containerLifeTime > maxMapDur ? containerLifeTime : maxMapDur;
@ -579,7 +627,7 @@ private void startAMFromSynthGenerator() throws YarnException, IOException {
Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
(int) tai.getTaskInfo().getTaskVCores());
containerList.add(new ContainerSimulator(containerResource,
containerLifeTime, hostname, 20, "reduce"));
containerLifeTime, hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
maxRedRes = Resources.componentwiseMax(maxRedRes, containerResource);
maxRedDur =
containerLifeTime > maxRedDur ? containerLifeTime : maxRedDur;

View File

@ -400,26 +400,28 @@ protected List<ResourceRequest> packageRequests(
Map<String, ResourceRequest> nodeLocalRequestMap = new HashMap<String, ResourceRequest>();
ResourceRequest anyRequest = null;
for (ContainerSimulator cs : csList) {
String rackHostNames[] = SLSUtils.getRackHostName(cs.getHostname());
// check rack local
String rackname = "/" + rackHostNames[0];
if (rackLocalRequestMap.containsKey(rackname)) {
rackLocalRequestMap.get(rackname).setNumContainers(
rackLocalRequestMap.get(rackname).getNumContainers() + 1);
} else {
ResourceRequest request = createResourceRequest(
cs.getResource(), rackname, priority, 1);
rackLocalRequestMap.put(rackname, request);
}
// check node local
String hostname = rackHostNames[1];
if (nodeLocalRequestMap.containsKey(hostname)) {
nodeLocalRequestMap.get(hostname).setNumContainers(
nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
} else {
ResourceRequest request = createResourceRequest(
cs.getResource(), hostname, priority, 1);
nodeLocalRequestMap.put(hostname, request);
if (cs.getHostname() != null) {
String[] rackHostNames = SLSUtils.getRackHostName(cs.getHostname());
// check rack local
String rackname = "/" + rackHostNames[0];
if (rackLocalRequestMap.containsKey(rackname)) {
rackLocalRequestMap.get(rackname).setNumContainers(
rackLocalRequestMap.get(rackname).getNumContainers() + 1);
} else {
ResourceRequest request =
createResourceRequest(cs.getResource(), rackname, priority, 1);
rackLocalRequestMap.put(rackname, request);
}
// check node local
String hostname = rackHostNames[1];
if (nodeLocalRequestMap.containsKey(hostname)) {
nodeLocalRequestMap.get(hostname).setNumContainers(
nodeLocalRequestMap.get(hostname).getNumContainers() + 1);
} else {
ResourceRequest request =
createResourceRequest(cs.getResource(), hostname, priority, 1);
nodeLocalRequestMap.put(hostname, request);
}
}
// any
if (anyRequest == null) {

View File

@ -131,7 +131,7 @@ public long getSeed() {
}
public int getNodesPerRack() {
return trace.nodes_per_rack;
return trace.nodes_per_rack < 1 ? 1: trace.nodes_per_rack;
}
public int getNumNodes() {

View File

@ -101,7 +101,7 @@ public static Set<String> parseNodesFromRumenTrace(String jobTrace)
*/
public static Set<String> parseNodesFromSLSTrace(String jobTrace)
throws IOException {
Set<String> nodeSet = new HashSet<String>();
Set<String> nodeSet = new HashSet<>();
JsonFactory jsonF = new JsonFactory();
ObjectMapper mapper = new ObjectMapper();
Reader input =
@ -109,13 +109,7 @@ public static Set<String> parseNodesFromSLSTrace(String jobTrace)
try {
Iterator<Map> i = mapper.readValues(jsonF.createParser(input), Map.class);
while (i.hasNext()) {
Map jsonE = i.next();
List tasks = (List) jsonE.get("job.tasks");
for (Object o : tasks) {
Map jsonTask = (Map) o;
String hostname = jsonTask.get("container.host").toString();
nodeSet.add(hostname);
}
addNodes(nodeSet, i.next());
}
} finally {
input.close();
@ -123,6 +117,29 @@ public static Set<String> parseNodesFromSLSTrace(String jobTrace)
return nodeSet;
}
private static void addNodes(Set<String> nodeSet, Map jsonEntry) {
if (jsonEntry.containsKey("num.nodes")) {
int numNodes = Integer.parseInt(jsonEntry.get("num.nodes").toString());
int numRacks = 1;
if (jsonEntry.containsKey("num.racks")) {
numRacks = Integer.parseInt(
jsonEntry.get("num.racks").toString());
}
nodeSet.addAll(generateNodes(numNodes, numRacks));
}
if (jsonEntry.containsKey("job.tasks")) {
List tasks = (List) jsonEntry.get("job.tasks");
for (Object o : tasks) {
Map jsonTask = (Map) o;
String hostname = (String) jsonTask.get("container.host");
if (hostname != null) {
nodeSet.add(hostname);
}
}
}
}
/**
* parse the input node file, return each host name
*/
@ -150,11 +167,19 @@ public static Set<String> parseNodesFromNodeFile(String nodeFile)
return nodeSet;
}
public static Set<? extends String> generateNodesFromSynth(
int numNodes, int nodesPerRack) {
Set<String> nodeSet = new HashSet<String>();
public static Set<? extends String> generateNodes(int numNodes,
int numRacks){
Set<String> nodeSet = new HashSet<>();
if (numRacks < 1) {
numRacks = 1;
}
if (numRacks > numNodes) {
numRacks = numNodes;
}
for (int i = 0; i < numNodes; i++) {
nodeSet.add("/rack" + i % nodesPerRack + "/node" + i);
nodeSet.add("/rack" + i % numRacks + "/node" + i);
}
return nodeSet;
}

View File

@ -328,18 +328,24 @@ Appendix
Here we provide an example format of the sls json file, which contains 2 jobs. The first job has 3 map tasks and the second one has 2 map tasks.
{
"am.type" : "mapreduce",
"job.start.ms" : 0,
"job.end.ms" : 95375,
"job.queue.name" : "sls_queue_1",
"job.id" : "job_1",
"job.user" : "default",
"num.nodes": 3, // total number of nodes in the cluster
"num.racks": 1 // total number of racks in the cluster, it divides num.nodes into the racks evenly, optional, the default value is 1
}
{
"am.type" : "mapreduce", // type of AM, optional, the default value is "mapreduce"
"job.start.ms" : 0, // job start time
"job.end.ms" : 95375, // job finish time, optional, the default value is 0
"job.queue.name" : "sls_queue_1", // the queue job will be submitted to
"job.id" : "job_1", // the job id used to track the job, optional, the default value is an zero-based integer increasing with number of jobs
"job.user" : "default", // user, optional, the default value is "default"
"job.tasks" : [ {
"container.host" : "/default-rack/node1",
"container.start.ms" : 6664,
"container.end.ms" : 23707,
"container.priority" : 20,
"container.type" : "map"
"count": 1, // number of tasks, optional, the default value is 1
"container.host" : "/default-rack/node1", // host the container asks for
"container.start.ms" : 6664, // container start time, optional
"container.end.ms" : 23707, // container finish time, optional
"duration.ms": 50000, // duration of the container, optional if start and end time is specified
"container.priority" : 20, // priority of the container, optional, the default value is 20
"container.type" : "map" // type of the container, could be "map" or "reduce", optional, the default value is "map"
}, {
"container.host" : "/default-rack/node3",
"container.start.ms" : 6665,

View File

@ -21,6 +21,9 @@
import org.junit.Assert;
import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
public class TestSLSUtils {
@Test
@ -36,4 +39,31 @@ public void testGetRackHostname() {
Assert.assertEquals(rackHostname[1], "node1");
}
@Test
public void testGenerateNodes() {
Set<? extends String> nodes = SLSUtils.generateNodes(3, 3);
Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes));
nodes = SLSUtils.generateNodes(3, 1);
Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes));
nodes = SLSUtils.generateNodes(3, 4);
Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
Assert.assertEquals("Number of racks is wrong.", 3, getNumRack(nodes));
nodes = SLSUtils.generateNodes(3, 0);
Assert.assertEquals("Number of nodes is wrong.", 3, nodes.size());
Assert.assertEquals("Number of racks is wrong.", 1, getNumRack(nodes));
}
private int getNumRack(Set<? extends String> nodes) {
Set<String> racks = new HashSet<>();
for (String node : nodes) {
String[] rackHostname = SLSUtils.getRackHostName(node);
racks.add(rackHostname[0]);
}
return racks.size();
}
}