YARN-5067 Support specifying resources for AM containers in SLS. (Yufei Gu via Haibo Chen)

This commit is contained in:
Haibo Chen 2017-06-30 16:50:06 -07:00
parent 38996fdcf0
commit 147df300bf
5 changed files with 68 additions and 39 deletions

View File

@ -406,7 +406,7 @@ private void createAMForJob(Map jsonJob) throws YarnException {
}
runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
getTaskContainers(jsonJob), null);
getTaskContainers(jsonJob), null, getAMContainerResource(jsonJob));
}
private List<ContainerSimulator> getTaskContainers(Map jsonJob)
@ -558,7 +558,8 @@ private void createAMForJob(LoggedJob job, long baselineTimeMs)
// Only supports the default job type currently
runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
jobStartTimeMS, jobFinishTimeMS, containerList, null);
jobStartTimeMS, jobFinishTimeMS, containerList, null,
getAMContainerResource(null));
}
private Resource getDefaultContainerResource() {
@ -676,7 +677,8 @@ private void startAMFromSynthGenerator() throws YarnException, IOException {
}
runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
jobStartTimeMS, jobFinishTimeMS, containerList, rr);
jobStartTimeMS, jobFinishTimeMS, containerList, rr,
getAMContainerResource(null));
}
} finally {
stjp.close();
@ -684,6 +686,26 @@ private void startAMFromSynthGenerator() throws YarnException, IOException {
}
private Resource getAMContainerResource(Map jsonJob) {
Resource amContainerResource =
SLSConfiguration.getAMContainerResource(getConf());
if (jsonJob == null) {
return amContainerResource;
}
if (jsonJob.containsKey("am.memory")) {
amContainerResource.setMemorySize(
Long.parseLong(jsonJob.get("am.memory").toString()));
}
if (jsonJob.containsKey("am.vcores")) {
amContainerResource.setVirtualCores(
Integer.parseInt(jsonJob.get("am.vcores").toString()));
}
return amContainerResource;
}
private void increaseQueueAppNum(String queue) throws YarnException {
SchedulerWrapper wrapper = (SchedulerWrapper)rm.getResourceScheduler();
String queueName = wrapper.getRealQueueName(queue);
@ -700,7 +722,7 @@ private void increaseQueueAppNum(String queue) throws YarnException {
private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList,
ReservationSubmissionRequest rr) {
ReservationSubmissionRequest rr, Resource amContainerResource) {
AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
amClassMap.get(jobType), new Configuration());
@ -710,9 +732,11 @@ private void runNewAM(String jobType, String user,
SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS,
SLSConfiguration.AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
boolean isTracked = trackedApps.contains(oldJobId);
amSim.init(AM_ID++, heartbeatInterval, containerList,
rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue,
isTracked, oldJobId, rr, runner.getStartTimeMS());
AM_ID++;
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, rr,
runner.getStartTimeMS(), amContainerResource);
runner.schedule(amSim);
maxRuntime = Math.max(maxRuntime, jobFinishTimeMS);
numTasks += containerList.size();

View File

@ -20,7 +20,6 @@
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
@ -35,18 +34,13 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords
.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -54,7 +48,6 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -66,7 +59,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerMetrics;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.SchedulerWrapper;
import org.apache.hadoop.yarn.sls.SLSRunner;
@ -116,9 +108,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
private static final Logger LOG = LoggerFactory.getLogger(AMSimulator.class);
// resource for AM container
private final static int MR_AM_CONTAINER_RESOURCE_MEMORY_MB = 1024;
private final static int MR_AM_CONTAINER_RESOURCE_VCORES = 1;
private Resource amContainerResource;
private ReservationSubmissionRequest reservationRequest;
@ -127,11 +117,12 @@ public AMSimulator() {
}
@SuppressWarnings("checkstyle:parameternumber")
public void init(int id, int heartbeatInterval,
public void init(int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp,
ReservationSubmissionRequest rr, long baseTimeMS) {
ReservationSubmissionRequest rr, long baseTimeMS,
Resource amContainerResource) {
super.init(startTime, startTime + 1000000L * heartbeatInterval,
heartbeatInterval);
this.user = simUser;
@ -144,6 +135,7 @@ public void init(int id, int heartbeatInterval,
this.traceStartTimeMS = startTime;
this.traceFinishTimeMS = finishTime;
this.reservationRequest = rr;
this.amContainerResource = amContainerResource;
}
/**
@ -318,16 +310,13 @@ private void submitApp(ReservationId reservationId)
appSubContext.setPriority(Priority.newInstance(0));
ContainerLaunchContext conLauContext =
Records.newRecord(ContainerLaunchContext.class);
conLauContext.setApplicationACLs(
new HashMap<ApplicationAccessType, String>());
conLauContext.setCommands(new ArrayList<String>());
conLauContext.setEnvironment(new HashMap<String, String>());
conLauContext.setLocalResources(new HashMap<String, LocalResource>());
conLauContext.setServiceData(new HashMap<String, ByteBuffer>());
conLauContext.setApplicationACLs(new HashMap<>());
conLauContext.setCommands(new ArrayList<>());
conLauContext.setEnvironment(new HashMap<>());
conLauContext.setLocalResources(new HashMap<>());
conLauContext.setServiceData(new HashMap<>());
appSubContext.setAMContainerSpec(conLauContext);
appSubContext.setResource(Resources
.createResource(MR_AM_CONTAINER_RESOURCE_MEMORY_MB,
MR_AM_CONTAINER_RESOURCE_VCORES));
appSubContext.setResource(amContainerResource);
if(reservationId != null) {
appSubContext.setReservationID(reservationId);

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -114,14 +115,14 @@ scheduled when all maps have finished (not support slow-start currently).
LoggerFactory.getLogger(MRAMSimulator.class);
@SuppressWarnings("checkstyle:parameternumber")
public void init(int id, int heartbeatInterval,
public void init(int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, ReservationSubmissionRequest rr,
long baselineStartTimeMS) {
super.init(id, heartbeatInterval, containerList, rm, se,
traceStartTime, traceFinishTime, user, queue,
isTracked, oldAppId, rr, baselineStartTimeMS);
long baselineStartTimeMS, Resource amContainerResource) {
super.init(heartbeatInterval, containerList, rm, se,
traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
rr, baselineStartTimeMS, amContainerResource);
amtype = "mapreduce";
// get map/reduce tasks

View File

@ -20,6 +20,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
@Private
@Unstable
@ -62,6 +64,14 @@ public class SLSConfiguration {
public static final int AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
public static final String AM_TYPE = AM_PREFIX + "type.";
public static final String AM_CONTAINER_MEMORY = AM_PREFIX +
"container.memory";
public static final int AM_CONTAINER_MEMORY_DEFAULT = 1024;
public static final String AM_CONTAINER_VCORES = AM_PREFIX +
"container.vcores";
public static final int AM_CONTAINER_VCORES_DEFAULT = 1;
// container
public static final String CONTAINER_PREFIX = PREFIX + "container.";
public static final String CONTAINER_MEMORY_MB = CONTAINER_PREFIX
@ -70,4 +80,9 @@ public class SLSConfiguration {
public static final String CONTAINER_VCORES = CONTAINER_PREFIX + "vcores";
public static final int CONTAINER_VCORES_DEFAULT = 1;
public static Resource getAMContainerResource(Configuration conf) {
return Resource.newInstance(
conf.getLong(AM_CONTAINER_MEMORY, AM_CONTAINER_MEMORY_DEFAULT),
conf.getInt(AM_CONTAINER_VCORES, AM_CONTAINER_VCORES_DEFAULT));
}
}

View File

@ -133,8 +133,8 @@ public void testAMSimulator() throws Exception {
String appId = "app1";
String queue = "default";
List<ContainerSimulator> containers = new ArrayList<>();
app.init(1, 1000, containers, rm, null, 0, 1000000L, "user1", queue,
true, appId, null, 0);
app.init(1000, containers, rm, null, 0, 1000000L, "user1", queue, true,
appId, null, 0, SLSConfiguration.getAMContainerResource(conf));
app.firstStep();
verifySchedulerMetrics(appId);