From f9da5cdb2b2dd071fd60fc01ea1edf0f79c0819b Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 16 Oct 2015 15:26:27 -0700 Subject: [PATCH] YARN-4170. AM need to be notified with priority in AllocateResponse. Contributed by Sunil G --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../api/protocolrecords/AllocateResponse.java | 14 ++++++ .../src/main/proto/yarn_service_protos.proto | 1 + .../impl/pb/AllocateResponsePBImpl.java | 40 ++++++++++++++++- .../ApplicationMasterService.java | 4 ++ .../TestApplicationMasterService.java | 43 +++++++++++++++++++ .../capacity/TestCapacityScheduler.java | 17 +++----- 7 files changed, 108 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 851870b472..93c07d87ca 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -510,6 +510,9 @@ Release 2.8.0 - UNRELEASED YARN-4162. CapacityScheduler: Add resource usage by partition and queue capacity by partition to REST API. (Naganarasimha G R via wangda) + YARN-4170. AM need to be notified with priority in AllocateResponse. + (Sunil G via jianhe) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index c3630704c5..d1b2a3a34d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.util.Records; @@ -314,4 +315,17 @@ public abstract void setDecreasedContainers( @Private @Unstable public abstract void setAMRMToken(Token amRMToken); + + /** + * Priority of the application + * + * @return get application priority + */ + @Public + @Unstable + public abstract Priority getApplicationPriority(); + + @Private + @Unstable + public abstract void setApplicationPriority(Priority priority); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a4b9c376ef..8924ebaf6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -88,6 +88,7 @@ message AllocateResponseProto { repeated ContainerProto increased_containers = 10; repeated ContainerProto decreased_containers = 11; optional hadoop.common.TokenProto am_rm_token = 12; + optional PriorityProto application_priority = 13; } enum SchedulerResourceTypes { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index dd7d1a9ede..bd460f6fa4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; @@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto; +import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; @@ -72,7 +75,8 @@ public class AllocateResponsePBImpl extends AllocateResponse { private List updatedNodes = null; private PreemptionMessage preempt; private Token amrmToken = null; - + private Priority appPriority = null; + public AllocateResponsePBImpl() { builder = AllocateResponseProto.newBuilder(); } @@ -154,6 +158,9 @@ private synchronized void mergeLocalToBuilder() { if (this.amrmToken != null) { builder.setAmRmToken(convertToProtoFormat(this.amrmToken)); } + if (this.appPriority != null) { + builder.setApplicationPriority(convertToProtoFormat(this.appPriority)); + } } private synchronized void mergeLocalToProto() { @@ -378,6 +385,27 @@ public synchronized void setAMRMToken(Token amRMToken) { this.amrmToken = amRMToken; } + @Override + public Priority getApplicationPriority() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.appPriority != null) { + return this.appPriority; + } + if (!p.hasApplicationPriority()) { + return null; + } + this.appPriority = convertFromProtoFormat(p.getApplicationPriority()); + return this.appPriority; + } + + @Override + public void setApplicationPriority(Priority priority) { + maybeInitBuilder(); + if (priority == null) + builder.clearApplicationPriority(); + this.appPriority = priority; + } + private synchronized void initLocalIncreasedContainerList() { if (this.increasedContainers != null) { return; @@ -644,4 +672,12 @@ private TokenPBImpl convertFromProtoFormat(TokenProto p) { private TokenProto convertToProtoFormat(Token t) { return ((TokenPBImpl)t).getProto(); } -} + + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { + return new PriorityPBImpl(p); + } + + private PriorityProto convertToProtoFormat(Priority t) { + return ((PriorityPBImpl)t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 87c7bfab5e..ab94175f90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -563,6 +563,10 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); + // Set application priority + allocateResponse.setApplicationPriority(app + .getApplicationSubmissionContext().getPriority()); + // update AMRMToken if the token is rolled-up MasterKeyData nextMasterKey = this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 8fa1ad2d20..cef1b5f937 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -437,6 +438,48 @@ public void testInvalidIncreaseDecreaseRequest() throws Exception { } } + @Test(timeout = 300000) + public void testPriorityInAllocatedResponse() throws Exception { + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + // Set Max Application Priority as 10 + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + MockRM rm = new MockRM(conf); + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + Priority appPriority1 = Priority.newInstance(5); + RMApp app1 = rm.submitApp(2048, appPriority1); + + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl(); + List release = new ArrayList(); + List ask = new ArrayList(); + allocateRequest.setReleaseList(release); + allocateRequest.setAskList(ask); + + AllocateResponse response1 = am1.allocate(allocateRequest); + Assert.assertEquals(appPriority1, response1.getApplicationPriority()); + + // get scheduler + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + // Change the priority of App1 to 8 + Priority appPriority2 = Priority.newInstance(8); + cs.updateApplicationPriority(appPriority2, app1.getApplicationId()); + + AllocateResponse response2 = am1.allocate(allocateRequest); + Assert.assertEquals(appPriority2, response2.getApplicationPriority()); + rm.stop(); + } + private static class MyResourceManager extends MockRM { public MyResourceManager(YarnConfiguration conf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index d81b8ccbc4..f0a1d03275 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -976,16 +976,6 @@ public void testAsyncScheduling() throws Exception { CapacityScheduler.schedule(cs); } } - - private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) - throws Exception { - RMAppAttempt attempt = app.getCurrentAppAttempt(); - nm.nodeHeartbeat(true); - MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); - am.registerAppAttempt(); - rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); - return am; - } private void waitForAppPreemptionInfo(RMApp app, Resource preempted, int numAMPreempted, int numTaskPreempted, @@ -1156,7 +1146,8 @@ public void testPreemptionInfo() throws Exception { // create app and launch the AM RMApp app0 = rm1.submitApp(CONTAINER_MEMORY); - MockAM am0 = launchAM(app0, rm1, nm1); + MockAM am0 = MockRM.launchAM(app0, rm1, nm1); + am0.registerAppAttempt(); // get scheduler app FiCaSchedulerApp schedulerAppAttempt = @@ -1190,7 +1181,9 @@ public void testPreemptionInfo() throws Exception { Resource.newInstance(0, 0), false, 0); // launch app0-attempt1 - MockAM am1 = launchAM(app0, rm1, nm1); + MockAM am1 = MockRM.launchAM(app0, rm1, nm1); + am1.registerAppAttempt(); + schedulerAppAttempt = cs.getSchedulerApplications().get(app0.getApplicationId()) .getCurrentAppAttempt();