YARN-4170. AM need to be notified with priority in AllocateResponse. Contributed by Sunil G

This commit is contained in:
Jian He 2015-10-16 15:26:27 -07:00
parent 4337b263aa
commit f9da5cdb2b
7 changed files with 108 additions and 14 deletions

View File

@ -510,6 +510,9 @@ Release 2.8.0 - UNRELEASED
YARN-4162. CapacityScheduler: Add resource usage by partition and queue capacity YARN-4162. CapacityScheduler: Add resource usage by partition and queue capacity
by partition to REST API. (Naganarasimha G R via wangda) by partition to REST API. (Naganarasimha G R via wangda)
YARN-4170. AM need to be notified with priority in AllocateResponse.
(Sunil G via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -314,4 +315,17 @@ public abstract void setDecreasedContainers(
@Private @Private
@Unstable @Unstable
public abstract void setAMRMToken(Token amRMToken); public abstract void setAMRMToken(Token amRMToken);
/**
* Priority of the application
*
* @return get application priority
*/
@Public
@Unstable
public abstract Priority getApplicationPriority();
@Private
@Unstable
public abstract void setApplicationPriority(Priority priority);
} }

View File

@ -88,6 +88,7 @@ message AllocateResponseProto {
repeated ContainerProto increased_containers = 10; repeated ContainerProto increased_containers = 10;
repeated ContainerProto decreased_containers = 11; repeated ContainerProto decreased_containers = 11;
optional hadoop.common.TokenProto am_rm_token = 12; optional hadoop.common.TokenProto am_rm_token = 12;
optional PriorityProto application_priority = 13;
} }
enum SchedulerResourceTypes { enum SchedulerResourceTypes {

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
@ -40,6 +41,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
@ -47,6 +49,7 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto; import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
@ -72,6 +75,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
private List<NodeReport> updatedNodes = null; private List<NodeReport> updatedNodes = null;
private PreemptionMessage preempt; private PreemptionMessage preempt;
private Token amrmToken = null; private Token amrmToken = null;
private Priority appPriority = null;
public AllocateResponsePBImpl() { public AllocateResponsePBImpl() {
builder = AllocateResponseProto.newBuilder(); builder = AllocateResponseProto.newBuilder();
@ -154,6 +158,9 @@ private synchronized void mergeLocalToBuilder() {
if (this.amrmToken != null) { if (this.amrmToken != null) {
builder.setAmRmToken(convertToProtoFormat(this.amrmToken)); builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
} }
if (this.appPriority != null) {
builder.setApplicationPriority(convertToProtoFormat(this.appPriority));
}
} }
private synchronized void mergeLocalToProto() { private synchronized void mergeLocalToProto() {
@ -378,6 +385,27 @@ public synchronized void setAMRMToken(Token amRMToken) {
this.amrmToken = amRMToken; this.amrmToken = amRMToken;
} }
@Override
public Priority getApplicationPriority() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.appPriority != null) {
return this.appPriority;
}
if (!p.hasApplicationPriority()) {
return null;
}
this.appPriority = convertFromProtoFormat(p.getApplicationPriority());
return this.appPriority;
}
@Override
public void setApplicationPriority(Priority priority) {
maybeInitBuilder();
if (priority == null)
builder.clearApplicationPriority();
this.appPriority = priority;
}
private synchronized void initLocalIncreasedContainerList() { private synchronized void initLocalIncreasedContainerList() {
if (this.increasedContainers != null) { if (this.increasedContainers != null) {
return; return;
@ -644,4 +672,12 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) {
private TokenProto convertToProtoFormat(Token t) { private TokenProto convertToProtoFormat(Token t) {
return ((TokenPBImpl)t).getProto(); return ((TokenPBImpl)t).getProto();
} }
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}
private PriorityProto convertToProtoFormat(Priority t) {
return ((PriorityPBImpl)t).getProto();
}
} }

View File

@ -563,6 +563,10 @@ public AllocateResponse allocate(AllocateRequest request)
allocateResponse allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation)); .setPreemptionMessage(generatePreemptionMessage(allocation));
// Set application priority
allocateResponse.setApplicationPriority(app
.getApplicationSubmissionContext().getPriority());
// update AMRMToken if the token is rolled-up // update AMRMToken if the token is rolled-up
MasterKeyData nextMasterKey = MasterKeyData nextMasterKey =
this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData(); this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
@ -437,6 +438,48 @@ public void testInvalidIncreaseDecreaseRequest() throws Exception {
} }
} }
@Test(timeout = 300000)
public void testPriorityInAllocatedResponse() throws Exception {
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// Set Max Application Priority as 10
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
MockRM rm = new MockRM(conf);
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
// Submit an application
Priority appPriority1 = Priority.newInstance(5);
RMApp app1 = rm.submitApp(2048, appPriority1);
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
List<ContainerId> release = new ArrayList<ContainerId>();
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
allocateRequest.setReleaseList(release);
allocateRequest.setAskList(ask);
AllocateResponse response1 = am1.allocate(allocateRequest);
Assert.assertEquals(appPriority1, response1.getApplicationPriority());
// get scheduler
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// Change the priority of App1 to 8
Priority appPriority2 = Priority.newInstance(8);
cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
AllocateResponse response2 = am1.allocate(allocateRequest);
Assert.assertEquals(appPriority2, response2.getApplicationPriority());
rm.stop();
}
private static class MyResourceManager extends MockRM { private static class MyResourceManager extends MockRM {
public MyResourceManager(YarnConfiguration conf) { public MyResourceManager(YarnConfiguration conf) {

View File

@ -977,16 +977,6 @@ public void testAsyncScheduling() throws Exception {
} }
} }
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
return am;
}
private void waitForAppPreemptionInfo(RMApp app, Resource preempted, private void waitForAppPreemptionInfo(RMApp app, Resource preempted,
int numAMPreempted, int numTaskPreempted, int numAMPreempted, int numTaskPreempted,
Resource currentAttemptPreempted, boolean currentAttemptAMPreempted, Resource currentAttemptPreempted, boolean currentAttemptAMPreempted,
@ -1156,7 +1146,8 @@ public void testPreemptionInfo() throws Exception {
// create app and launch the AM // create app and launch the AM
RMApp app0 = rm1.submitApp(CONTAINER_MEMORY); RMApp app0 = rm1.submitApp(CONTAINER_MEMORY);
MockAM am0 = launchAM(app0, rm1, nm1); MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
am0.registerAppAttempt();
// get scheduler app // get scheduler app
FiCaSchedulerApp schedulerAppAttempt = FiCaSchedulerApp schedulerAppAttempt =
@ -1190,7 +1181,9 @@ public void testPreemptionInfo() throws Exception {
Resource.newInstance(0, 0), false, 0); Resource.newInstance(0, 0), false, 0);
// launch app0-attempt1 // launch app0-attempt1
MockAM am1 = launchAM(app0, rm1, nm1); MockAM am1 = MockRM.launchAM(app0, rm1, nm1);
am1.registerAppAttempt();
schedulerAppAttempt = schedulerAppAttempt =
cs.getSchedulerApplications().get(app0.getApplicationId()) cs.getSchedulerApplications().get(app0.getApplicationId())
.getCurrentAppAttempt(); .getCurrentAppAttempt();