YARN-6505. Define the strings used in SLS JSON input file format. (Contributed by Gergely Novak)

This commit is contained in:
Yufei Gu 2017-10-27 14:41:37 -07:00
parent 5c799ecf09
commit 99880d0a16
4 changed files with 90 additions and 47 deletions

View File

@ -173,8 +173,8 @@ private void init(Configuration tempConf) throws ClassNotFoundException {
// <AMType, Class> map
for (Map.Entry e : tempConf) {
String key = e.getKey().toString();
if (key.startsWith(SLSConfiguration.AM_TYPE)) {
String amType = key.substring(SLSConfiguration.AM_TYPE.length());
if (key.startsWith(SLSConfiguration.AM_TYPE_PREFIX)) {
String amType = key.substring(SLSConfiguration.AM_TYPE_PREFIX.length());
amClassMap.put(amType, Class.forName(tempConf.get(key)));
}
}
@ -384,33 +384,36 @@ private void startAMFromSLSTrace(String inputTrace) throws IOException {
}
private void createAMForJob(Map jsonJob) throws YarnException {
long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString());
long jobStartTime = Long.parseLong(
jsonJob.get(SLSConfiguration.JOB_START_MS).toString());
long jobFinishTime = 0;
if (jsonJob.containsKey("job.end.ms")) {
jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString());
if (jsonJob.containsKey(SLSConfiguration.JOB_END_MS)) {
jobFinishTime = Long.parseLong(
jsonJob.get(SLSConfiguration.JOB_END_MS).toString());
}
String user = (String) jsonJob.get("job.user");
String user = (String) jsonJob.get(SLSConfiguration.JOB_USER);
if (user == null) {
user = "default";
}
String queue = jsonJob.get("job.queue.name").toString();
String queue = jsonJob.get(SLSConfiguration.JOB_QUEUE_NAME).toString();
increaseQueueAppNum(queue);
String amType = (String)jsonJob.get("am.type");
String amType = (String)jsonJob.get(SLSConfiguration.AM_TYPE);
if (amType == null) {
amType = SLSUtils.DEFAULT_JOB_TYPE;
}
int jobCount = 1;
if (jsonJob.containsKey("job.count")) {
jobCount = Integer.parseInt(jsonJob.get("job.count").toString());
if (jsonJob.containsKey(SLSConfiguration.JOB_COUNT)) {
jobCount = Integer.parseInt(
jsonJob.get(SLSConfiguration.JOB_COUNT).toString());
}
jobCount = Math.max(jobCount, 1);
String oldAppId = (String)jsonJob.get("job.id");
String oldAppId = (String)jsonJob.get(SLSConfiguration.JOB_ID);
// Job id is generated automatically if this job configuration allows
// multiple job instances
if(jobCount > 1) {
@ -426,7 +429,7 @@ private void createAMForJob(Map jsonJob) throws YarnException {
private List<ContainerSimulator> getTaskContainers(Map jsonJob)
throws YarnException {
List<ContainerSimulator> containers = new ArrayList<>();
List tasks = (List) jsonJob.get("job.tasks");
List tasks = (List) jsonJob.get(SLSConfiguration.JOB_TASKS);
if (tasks == null || tasks.size() == 0) {
throw new YarnException("No task for the job!");
}
@ -434,17 +437,22 @@ private List<ContainerSimulator> getTaskContainers(Map jsonJob)
for (Object o : tasks) {
Map jsonTask = (Map) o;
String hostname = (String) jsonTask.get("container.host");
String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST);
long duration = 0;
if (jsonTask.containsKey("duration.ms")) {
duration = Integer.parseInt(jsonTask.get("duration.ms").toString());
} else if (jsonTask.containsKey("container.start.ms") &&
jsonTask.containsKey("container.end.ms")) {
long taskStart = Long.parseLong(jsonTask.get("container.start.ms")
.toString());
long taskFinish = Long.parseLong(jsonTask.get("container.end.ms")
.toString());
if (jsonTask.containsKey(SLSConfiguration.TASK_DURATION_MS)) {
duration = Integer.parseInt(
jsonTask.get(SLSConfiguration.TASK_DURATION_MS).toString());
} else if (jsonTask.containsKey(SLSConfiguration.DURATION_MS)) {
// Also support "duration.ms" for backward compatibility
duration = Integer.parseInt(
jsonTask.get(SLSConfiguration.DURATION_MS).toString());
} else if (jsonTask.containsKey(SLSConfiguration.TASK_START_MS) &&
jsonTask.containsKey(SLSConfiguration.TASK_END_MS)) {
long taskStart = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_START_MS).toString());
long taskFinish = Long.parseLong(
jsonTask.get(SLSConfiguration.TASK_END_MS).toString());
duration = taskFinish - taskStart;
}
if (duration <= 0) {
@ -453,32 +461,33 @@ private List<ContainerSimulator> getTaskContainers(Map jsonJob)
}
Resource res = getDefaultContainerResource();
if (jsonTask.containsKey("container.memory")) {
int containerMemory =
Integer.parseInt(jsonTask.get("container.memory").toString());
if (jsonTask.containsKey(SLSConfiguration.TASK_MEMORY)) {
int containerMemory = Integer.parseInt(
jsonTask.get(SLSConfiguration.TASK_MEMORY).toString());
res.setMemorySize(containerMemory);
}
if (jsonTask.containsKey("container.vcores")) {
int containerVCores =
Integer.parseInt(jsonTask.get("container.vcores").toString());
if (jsonTask.containsKey(SLSConfiguration.CONTAINER_VCORES)) {
int containerVCores = Integer.parseInt(
jsonTask.get(SLSConfiguration.CONTAINER_VCORES).toString());
res.setVirtualCores(containerVCores);
}
int priority = DEFAULT_MAPPER_PRIORITY;
if (jsonTask.containsKey("container.priority")) {
priority = Integer.parseInt(jsonTask.get("container.priority")
.toString());
if (jsonTask.containsKey(SLSConfiguration.TASK_PRIORITY)) {
priority = Integer.parseInt(
jsonTask.get(SLSConfiguration.TASK_PRIORITY).toString());
}
String type = "map";
if (jsonTask.containsKey("container.type")) {
type = jsonTask.get("container.type").toString();
if (jsonTask.containsKey(SLSConfiguration.TASK_TYPE)) {
type = jsonTask.get(SLSConfiguration.TASK_TYPE).toString();
}
int count = 1;
if (jsonTask.containsKey("count")) {
count = Integer.parseInt(jsonTask.get("count").toString());
if (jsonTask.containsKey(SLSConfiguration.COUNT)) {
count = Integer.parseInt(
jsonTask.get(SLSConfiguration.COUNT).toString());
}
count = Math.max(count, 1);
@ -708,14 +717,14 @@ private Resource getAMContainerResource(Map jsonJob) {
return amContainerResource;
}
if (jsonJob.containsKey("am.memory")) {
if (jsonJob.containsKey(SLSConfiguration.AM_MEMORY)) {
amContainerResource.setMemorySize(
Long.parseLong(jsonJob.get("am.memory").toString()));
Long.parseLong(jsonJob.get(SLSConfiguration.AM_MEMORY).toString()));
}
if (jsonJob.containsKey("am.vcores")) {
if (jsonJob.containsKey(SLSConfiguration.AM_VCORES)) {
amContainerResource.setVirtualCores(
Integer.parseInt(jsonJob.get("am.vcores").toString()));
Integer.parseInt(jsonJob.get(SLSConfiguration.AM_VCORES).toString()));
}
return amContainerResource;
}

View File

@ -62,12 +62,15 @@ public class SLSConfiguration {
public static final String AM_HEARTBEAT_INTERVAL_MS = AM_PREFIX
+ "heartbeat.interval.ms";
public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
public static final String AM_TYPE = AM_PREFIX + "type.";
public static final String AM_TYPE = AM_PREFIX + "type";
public static final String AM_TYPE_PREFIX = AM_TYPE + ".";
public static final String AM_MEMORY = AM_PREFIX + "memory";
public static final String AM_CONTAINER_MEMORY = AM_PREFIX +
"container.memory";
public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024;
public static final String AM_VCORES = AM_PREFIX + "vcores";
public static final String AM_CONTAINER_VCORES = AM_PREFIX +
"container.vcores";
public static final int AM_CONTAINER_VCORES_DEFAULT = 1;
@ -85,4 +88,33 @@ public static Resource getAMContainerResource(Configuration conf) {
conf.getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT),
conf.getInt(AM_CONTAINER_VCORES, AM_CONTAINER_VCORES_DEFAULT));
}
// input file
// nodes
public static final String NUM_NODES = "num.nodes";
public static final String NUM_RACKS = "num.racks";
// job
public static final String JOB_PREFIX = "job.";
public static final String JOB_ID = JOB_PREFIX + "id";
public static final String JOB_START_MS = JOB_PREFIX + "start.ms";
public static final String JOB_END_MS = JOB_PREFIX + "end.ms";
public static final String JOB_QUEUE_NAME = JOB_PREFIX + "queue.name";
public static final String JOB_USER = JOB_PREFIX + "user";
public static final String JOB_COUNT = JOB_PREFIX + "count";
public static final String JOB_TASKS = JOB_PREFIX + "tasks";
// task
public static final String COUNT = "count";
public static final String TASK_CONTAINER = "container.";
public static final String TASK_HOST = TASK_CONTAINER + "host";
public static final String TASK_START_MS = TASK_CONTAINER + "start.ms";
public static final String TASK_END_MS = TASK_CONTAINER + "end.ms";
public static final String DURATION_MS = "duration.ms";
public static final String TASK_DURATION_MS = TASK_CONTAINER + DURATION_MS;
public static final String TASK_PRIORITY = TASK_CONTAINER + "priority";
public static final String TASK_TYPE = TASK_CONTAINER + "type";
public static final String TASK_MEMORY = TASK_CONTAINER + "memory";
}

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.tools.rumen.LoggedJob;
import org.apache.hadoop.tools.rumen.LoggedTask;
import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
@Private
@Unstable
@ -118,21 +119,22 @@ public static Set<String> parseNodesFromSLSTrace(String jobTrace)
}
private static void addNodes(Set<String> nodeSet, Map jsonEntry) {
if (jsonEntry.containsKey("num.nodes")) {
int numNodes = Integer.parseInt(jsonEntry.get("num.nodes").toString());
if (jsonEntry.containsKey(SLSConfiguration.NUM_NODES)) {
int numNodes = Integer.parseInt(
jsonEntry.get(SLSConfiguration.NUM_NODES).toString());
int numRacks = 1;
if (jsonEntry.containsKey("num.racks")) {
if (jsonEntry.containsKey(SLSConfiguration.NUM_RACKS)) {
numRacks = Integer.parseInt(
jsonEntry.get("num.racks").toString());
jsonEntry.get(SLSConfiguration.NUM_RACKS).toString());
}
nodeSet.addAll(generateNodes(numNodes, numRacks));
}
if (jsonEntry.containsKey("job.tasks")) {
List tasks = (List) jsonEntry.get("job.tasks");
if (jsonEntry.containsKey(SLSConfiguration.JOB_TASKS)) {
List tasks = (List) jsonEntry.get(SLSConfiguration.JOB_TASKS);
for (Object o : tasks) {
Map jsonTask = (Map) o;
String hostname = (String) jsonTask.get("container.host");
String hostname = (String) jsonTask.get(SLSConfiguration.TASK_HOST);
if (hostname != null) {
nodeSet.add(hostname);
}

View File

@ -344,7 +344,7 @@ Here we provide an example format of the sls json file, which contains 2 jobs. T
"container.host" : "/default-rack/node1", // host the container asks for
"container.start.ms" : 6664, // container start time, optional
"container.end.ms" : 23707, // container finish time, optional
"duration.ms": 50000, // duration of the container, optional if start and end time is specified
"container.duration.ms": 50000, // duration of the container, optional if start and end time is specified
"container.priority" : 20, // priority of the container, optional, the default value is 20
"container.type" : "map" // type of the container, could be "map" or "reduce", optional, the default value is "map"
}, {