From e76ffbf10239b86f989be423a2e2691d0d792784 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Sat, 10 Sep 2022 01:39:00 +0800 Subject: [PATCH] YARN-11297. [Federation] Improve Yarn Router Reservation Submission Code. (#4863) --- .../clientrm/FederationClientInterceptor.java | 8 +++++- .../TestFederationClientInterceptor.java | 18 +++---------- .../TestableFederationClientInterceptor.java | 25 ++++++++++++++++++- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 04452af365..ec6f5fbb0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -949,7 +949,13 @@ public ReservationSubmissionResponse submitReservation( // Second, determine whether the current ReservationId has a corresponding subCluster. // If it does not exist, add it. If it exists, update it. Boolean exists = existsReservationHomeSubCluster(reservationId); - if (!exists) { + + // We may encounter the situation of repeated submission of Reservation, + // at this time we should try to use the reservation that has been allocated + // !exists indicates that the reservation does not exist and needs to be added + // i==0, mainly to consider repeated submissions, + // so the first time to apply for reservation, try to use the original reservation + if (!exists || i == 0) { addReservationHomeSubCluster(reservationId, reservationHomeSubCluster); } else { updateReservationHomeSubCluster(subClusterId, reservationId, reservationHomeSubCluster); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 93a759bc40..ac980b4858 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -1308,13 +1308,6 @@ public void testSubmitReservation() throws Exception { GetNewReservationResponse response = interceptor.getNewReservation(request); Assert.assertNotNull(response); - // allow plan follower to synchronize, manually trigger an assignment - Map mockRMs = interceptor.getMockRMs(); - for (MockRM mockRM : mockRMs.values()) { - ReservationSystem reservationSystem = mockRM.getReservationSystem(); - reservationSystem.synchronizePlan("root.decided", true); - } - // Submit Reservation ReservationId reservationId = response.getReservationId(); ReservationDefinition rDefinition = createReservationDefinition(1024, 1); @@ -1384,13 +1377,6 @@ public void testSubmitReservationMultipleSubmission() throws Exception { GetNewReservationResponse response = interceptor.getNewReservation(request); Assert.assertNotNull(response); - // allow plan follower to synchronize, manually trigger an assignment - Map mockRMs = interceptor.getMockRMs(); - for (MockRM mockRM : mockRMs.values()) { - ReservationSystem reservationSystem = mockRM.getReservationSystem(); - reservationSystem.synchronizePlan("root.decided", true); - } - // First Submit Reservation ReservationId reservationId = response.getReservationId(); ReservationDefinition rDefinition = createReservationDefinition(1024, 1); @@ -1404,10 +1390,12 @@ public void testSubmitReservationMultipleSubmission() throws Exception { Assert.assertNotNull(subClusterId1); Assert.assertTrue(subClusters.contains(subClusterId1)); - // First Retry + // First Retry, repeat the submission ReservationSubmissionResponse submissionResponse1 = interceptor.submitReservation(rSubmissionRequest); Assert.assertNotNull(submissionResponse1); + + // Expect reserved clusters to be consistent SubClusterId subClusterId2 = stateStoreUtil.queryReservationHomeSC(reservationId); Assert.assertNotNull(subClusterId2); Assert.assertEquals(subClusterId1, subClusterId2); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java index 4e4df3f123..8279899e38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -28,13 +28,16 @@ import java.util.Map; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeAttributeType; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -43,6 +46,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; @@ -95,6 +100,7 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( mockRMs.put(subClusterId, mockRM); } initNodeAttributes(subClusterId, mockRM); + initReservationSystem(mockRM); return mockRM.getClientRMService(); } } @@ -167,6 +173,23 @@ private void initNodeAttributes(SubClusterId subClusterId, MockRM mockRM) { } } + private void initReservationSystem(MockRM mockRM) throws YarnException { + try { + // Ensure that the reserved resources of the RM#Reservation System are allocated + String planName = "root.decided"; + ReservationSystem reservationSystem = mockRM.getReservationSystem(); + reservationSystem.synchronizePlan(planName, true); + + GenericTestUtils.waitFor(() -> { + Plan plan = reservationSystem.getPlan(planName); + Resource resource = plan.getTotalCapacity(); + return (resource.getMemorySize() > 0 && resource.getVirtualCores() > 0); + }, 100, 2000); + } catch (TimeoutException | InterruptedException e) { + throw new YarnException(e); + } + } + @Override public void shutdown() { if (mockRMs != null && !mockRMs.isEmpty()) { @@ -193,4 +216,4 @@ public void shutdown() { mockRMs.clear(); super.shutdown(); } -} +} \ No newline at end of file