YARN-787. Removed minimum resource from RegisterApplicationMasterResponse. Contributed by Alejandro Abdelnur.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1493509 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
18a4cb4872
commit
98d97d316c
@ -122,6 +122,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
@ -1245,9 +1246,9 @@ private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
|
||||
int slotMemoryReq =
|
||||
taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
|
||||
|
||||
int minSlotMemSize =
|
||||
taskAttempt.appContext.getClusterInfo().getMinContainerCapability()
|
||||
.getMemory();
|
||||
int minSlotMemSize = taskAttempt.conf.getInt(
|
||||
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
|
||||
|
||||
int simSlotsRequired =
|
||||
minSlotMemSize == 0 ? 0 : (int) Math.ceil((float) slotMemoryReq
|
||||
|
@ -220,13 +220,9 @@ protected void serviceInit(Configuration conf) throws Exception {
|
||||
|
||||
super.serviceInit(conf);
|
||||
if (this.clusterInfo != null) {
|
||||
getContext().getClusterInfo().setMinContainerCapability(
|
||||
this.clusterInfo.getMinContainerCapability());
|
||||
getContext().getClusterInfo().setMaxContainerCapability(
|
||||
this.clusterInfo.getMaxContainerCapability());
|
||||
} else {
|
||||
getContext().getClusterInfo().setMinContainerCapability(
|
||||
Resource.newInstance(1024, 1));
|
||||
getContext().getClusterInfo().setMaxContainerCapability(
|
||||
Resource.newInstance(10240, 1));
|
||||
}
|
||||
|
@ -204,8 +204,6 @@ protected AMRMProtocol createSchedulerProxy() {
|
||||
throws IOException {
|
||||
RegisterApplicationMasterResponse response =
|
||||
Records.newRecord(RegisterApplicationMasterResponse.class);
|
||||
response.setMinimumResourceCapability(Resource.newInstance(
|
||||
1024, 1));
|
||||
response.setMaximumResourceCapability(Resource.newInstance(
|
||||
10240, 1));
|
||||
return response;
|
||||
|
@ -1329,8 +1329,7 @@ private static AppContext createAppContext(
|
||||
when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
|
||||
when(context.getJob(isA(JobId.class))).thenReturn(job);
|
||||
when(context.getClusterInfo()).thenReturn(
|
||||
new ClusterInfo(Resource.newInstance(1024, 1), Resource.newInstance(
|
||||
10240, 1)));
|
||||
new ClusterInfo(Resource.newInstance(10240, 1)));
|
||||
when(context.getEventHandler()).thenReturn(new EventHandler() {
|
||||
@Override
|
||||
public void handle(Event event) {
|
||||
|
@ -1424,8 +1424,6 @@ private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {
|
||||
when(minContainerRequirements.getMemory()).thenReturn(1000);
|
||||
|
||||
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||
when(clusterInfo.getMinContainerCapability()).thenReturn(
|
||||
minContainerRequirements);
|
||||
AppContext appContext = mock(AppContext.class);
|
||||
when(appContext.getClusterInfo()).thenReturn(clusterInfo);
|
||||
|
||||
|
@ -82,6 +82,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.junit.Test;
|
||||
@ -197,8 +198,9 @@ public void verifySlotMillis(int mapMemMb, int reduceMemMb,
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
|
||||
conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
|
||||
app.setClusterInfo(new ClusterInfo(Resource
|
||||
.newInstance(minContainerSize, 1), Resource.newInstance(10240, 1)));
|
||||
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||
minContainerSize);
|
||||
app.setClusterInfo(new ClusterInfo(Resource.newInstance(10240, 1)));
|
||||
|
||||
Job job = app.submit(conf);
|
||||
app.waitForState(job, JobState.RUNNING);
|
||||
@ -392,7 +394,6 @@ public void testContainerCleanedWhileRunning() throws Exception {
|
||||
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||
Resource resource = mock(Resource.class);
|
||||
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
|
||||
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
|
||||
when(resource.getMemory()).thenReturn(1024);
|
||||
|
||||
TaskAttemptImpl taImpl =
|
||||
@ -450,7 +451,6 @@ public void testContainerCleanedWhileCommitting() throws Exception {
|
||||
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||
Resource resource = mock(Resource.class);
|
||||
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
|
||||
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
|
||||
when(resource.getMemory()).thenReturn(1024);
|
||||
|
||||
TaskAttemptImpl taImpl =
|
||||
@ -511,7 +511,6 @@ public void testDoubleTooManyFetchFailure() throws Exception {
|
||||
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||
Resource resource = mock(Resource.class);
|
||||
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
|
||||
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
|
||||
when(resource.getMemory()).thenReturn(1024);
|
||||
|
||||
TaskAttemptImpl taImpl =
|
||||
@ -579,7 +578,6 @@ public void testAppDiognosticEventOnUnassignedTask() throws Exception {
|
||||
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||
Resource resource = mock(Resource.class);
|
||||
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
|
||||
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
|
||||
when(resource.getMemory()).thenReturn(1024);
|
||||
|
||||
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
|
||||
@ -629,7 +627,6 @@ public void testAppDiognosticEventOnNewTask() throws Exception {
|
||||
ClusterInfo clusterInfo = mock(ClusterInfo.class);
|
||||
Resource resource = mock(Resource.class);
|
||||
when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
|
||||
when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
|
||||
when(resource.getMemory()).thenReturn(1024);
|
||||
|
||||
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
|
||||
|
@ -117,8 +117,7 @@ private static AppContext createAppContext() {
|
||||
when(ctx.getApplicationAttemptId()).thenReturn(attemptId);
|
||||
when(ctx.getJob(isA(JobId.class))).thenReturn(job);
|
||||
when(ctx.getClusterInfo()).thenReturn(
|
||||
new ClusterInfo(Resource.newInstance(1024, 1), Resource.newInstance(
|
||||
10240, 1)));
|
||||
new ClusterInfo(Resource.newInstance(10240, 1)));
|
||||
when(ctx.getEventHandler()).thenReturn(eventHandler);
|
||||
return ctx;
|
||||
}
|
||||
|
@ -149,6 +149,9 @@ Release 2.1.0-beta - UNRELEASED
|
||||
YARN-821. Renamed setFinishApplicationStatus to setFinalApplicationStatus in
|
||||
FinishApplicationMasterRequest for consistency. (Jian He via vinodk)
|
||||
|
||||
YARN-787. Removed minimum resource from RegisterApplicationMasterResponse.
|
||||
(tucu via acmurthy)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
YARN-482. FS: Extend SchedulingMode to intermediate queues.
|
||||
|
@ -51,24 +51,10 @@ public static RegisterApplicationMasterResponse newInstance(
|
||||
Map<ApplicationAccessType, String> acls) {
|
||||
RegisterApplicationMasterResponse response =
|
||||
Records.newRecord(RegisterApplicationMasterResponse.class);
|
||||
response.setMinimumResourceCapability(minCapability);
|
||||
response.setMaximumResourceCapability(maxCapability);
|
||||
response.setApplicationACLs(acls);
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the minimum capability for any {@link Resource} allocated by the
|
||||
* <code>ResourceManager</code> in the cluster.
|
||||
* @return minimum capability of allocated resources in the cluster
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract Resource getMinimumResourceCapability();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setMinimumResourceCapability(Resource capability);
|
||||
|
||||
/**
|
||||
* Get the maximum capability for any {@link Resource} allocated by the
|
||||
|
@ -42,7 +42,6 @@ public class RegisterApplicationMasterResponsePBImpl extends
|
||||
RegisterApplicationMasterResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private Resource minimumResourceCapability;
|
||||
private Resource maximumResourceCapability;
|
||||
private Map<ApplicationAccessType, String> applicationACLS = null;
|
||||
|
||||
@ -91,10 +90,6 @@ private void mergeLocalToProto() {
|
||||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (this.minimumResourceCapability != null) {
|
||||
builder.setMinimumCapability(
|
||||
convertToProtoFormat(this.minimumResourceCapability));
|
||||
}
|
||||
if (this.maximumResourceCapability != null) {
|
||||
builder.setMaximumCapability(
|
||||
convertToProtoFormat(this.maximumResourceCapability));
|
||||
@ -127,21 +122,6 @@ public Resource getMaximumResourceCapability() {
|
||||
return this.maximumResourceCapability;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getMinimumResourceCapability() {
|
||||
if (this.minimumResourceCapability != null) {
|
||||
return this.minimumResourceCapability;
|
||||
}
|
||||
|
||||
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasMinimumCapability()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
this.minimumResourceCapability = convertFromProtoFormat(p.getMinimumCapability());
|
||||
return this.minimumResourceCapability;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaximumResourceCapability(Resource capability) {
|
||||
maybeInitBuilder();
|
||||
@ -151,16 +131,6 @@ public void setMaximumResourceCapability(Resource capability) {
|
||||
this.maximumResourceCapability = capability;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMinimumResourceCapability(Resource capability) {
|
||||
maybeInitBuilder();
|
||||
if(minimumResourceCapability == null) {
|
||||
builder.clearMinimumCapability();
|
||||
}
|
||||
this.minimumResourceCapability = capability;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<ApplicationAccessType, String> getApplicationACLs() {
|
||||
initApplicationACLs();
|
||||
|
@ -35,9 +35,8 @@ message RegisterApplicationMasterRequestProto {
|
||||
}
|
||||
|
||||
message RegisterApplicationMasterResponseProto {
|
||||
optional ResourceProto minimumCapability = 1;
|
||||
optional ResourceProto maximumCapability = 2;
|
||||
repeated ApplicationACLMapProto application_ACLs = 3;
|
||||
optional ResourceProto maximumCapability = 1;
|
||||
repeated ApplicationACLMapProto application_ACLs = 2;
|
||||
}
|
||||
|
||||
message FinishApplicationMasterRequestProto {
|
||||
|
@ -463,22 +463,11 @@ public boolean run() throws YarnException, IOException {
|
||||
appMasterTrackingUrl);
|
||||
// Dump out information about cluster capability as seen by the
|
||||
// resource manager
|
||||
int minMem = response.getMinimumResourceCapability().getMemory();
|
||||
int maxMem = response.getMaximumResourceCapability().getMemory();
|
||||
LOG.info("Min mem capabililty of resources in this cluster " + minMem);
|
||||
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
|
||||
|
||||
// A resource ask has to be atleast the minimum of the capability of the
|
||||
// cluster, the value has to be a multiple of the min value and cannot
|
||||
// exceed the max.
|
||||
// If it is not an exact multiple of min, the RM will allocate to the
|
||||
// nearest multiple of min
|
||||
if (containerMemory < minMem) {
|
||||
LOG.info("Container memory specified below min threshold of cluster."
|
||||
+ " Using min value." + ", specified=" + containerMemory + ", min="
|
||||
+ minMem);
|
||||
containerMemory = minMem;
|
||||
} else if (containerMemory > maxMem) {
|
||||
// A resource ask cannot exceed the max.
|
||||
if (containerMemory > maxMem) {
|
||||
LOG.info("Container memory specified above max threshold of cluster."
|
||||
+ " Using max value." + ", specified=" + containerMemory + ", max="
|
||||
+ maxMem);
|
||||
|
@ -22,27 +22,16 @@
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
public class ClusterInfo {
|
||||
private Resource minContainerCapability;
|
||||
private Resource maxContainerCapability;
|
||||
|
||||
public ClusterInfo() {
|
||||
this.minContainerCapability = Records.newRecord(Resource.class);
|
||||
this.maxContainerCapability = Records.newRecord(Resource.class);
|
||||
}
|
||||
|
||||
public ClusterInfo(Resource minCapability, Resource maxCapability) {
|
||||
this.minContainerCapability = minCapability;
|
||||
public ClusterInfo(Resource maxCapability) {
|
||||
this.maxContainerCapability = maxCapability;
|
||||
}
|
||||
|
||||
public Resource getMinContainerCapability() {
|
||||
return minContainerCapability;
|
||||
}
|
||||
|
||||
public void setMinContainerCapability(Resource minContainerCapability) {
|
||||
this.minContainerCapability = minContainerCapability;
|
||||
}
|
||||
|
||||
public Resource getMaxContainerCapability() {
|
||||
return maxContainerCapability;
|
||||
}
|
||||
|
@ -210,8 +210,6 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
// Pick up min/max resource from scheduler...
|
||||
RegisterApplicationMasterResponse response = recordFactory
|
||||
.newRecordInstance(RegisterApplicationMasterResponse.class);
|
||||
response.setMinimumResourceCapability(rScheduler
|
||||
.getMinimumResourceCapability());
|
||||
response.setMaximumResourceCapability(rScheduler
|
||||
.getMaximumResourceCapability());
|
||||
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
|
||||
|
Loading…
Reference in New Issue
Block a user