diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java index 4c1a9cffc1..f6e305fab7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java @@ -33,7 +33,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -42,7 +41,6 @@ import java.util.Map; import java.util.Set; -import com.google.common.base.Supplier; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputByteBuffer; @@ -75,14 +73,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -96,13 +86,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.api.records.ReservationRequest; -import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; -import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; @@ -119,23 +103,28 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Records; -import org.apache.hadoop.yarn.util.UTCClock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.slf4j.event.Level; -public class TestYarnClient { +/** + * This class is to test class {@link YarnClient) and {@link YarnClientImpl}. + */ +public class TestYarnClient extends ParameterizedSchedulerTestBase { + + public TestYarnClient(SchedulerType type) throws IOException { + super(type); + } + + protected void configureFairScheduler(YarnConfiguration conf) {} @Before public void setup() { @@ -145,7 +134,7 @@ public void setup() { @Test public void testClientStop() { - Configuration conf = new Configuration(); + Configuration conf = getConf(); ResourceManager rm = new ResourceManager(); rm.init(conf); rm.start(); @@ -159,7 +148,7 @@ public void testClientStop() { @Test public void testStartWithTimelineV15() throws Exception { - Configuration conf = new Configuration(); + Configuration conf = getConf(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f); YarnClientImpl client = (YarnClientImpl) YarnClient.createYarnClient(); @@ -254,7 +243,7 @@ public void verifyError(Throwable e) { @SuppressWarnings("deprecation") @Test (timeout = 30000) public void testSubmitApplication() throws Exception { - Configuration conf = new Configuration(); + Configuration conf = getConf(); conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, 100); // speed up tests final YarnClient client = new MockYarnClient(); @@ -301,7 +290,7 @@ public void testSubmitApplication() throws Exception { @SuppressWarnings("deprecation") @Test (timeout = 20000) public void testSubmitApplicationInterrupted() throws IOException { - Configuration conf = new Configuration(); + Configuration conf = getConf(); int pollIntervalMs = 1000; conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS, pollIntervalMs); @@ -406,10 +395,9 @@ public void testKillApplication() throws Exception { rm.start(); RMApp app = rm.submitApp(2000); - Configuration conf = new Configuration(); @SuppressWarnings("resource") final YarnClient client = new MockYarnClient(); - client.init(conf); + client.init(getConf()); client.start(); client.killApplication(app.getApplicationId()); @@ -447,9 +435,8 @@ public void testApplicationTypeLimit() throws Exception { @Test (timeout = 10000) public void testGetApplications() throws YarnException, IOException { - Configuration conf = new Configuration(); final YarnClient client = new MockYarnClient(); - client.init(conf); + client.init(getConf()); client.start(); List expectedReports = ((MockYarnClient)client).getReports(); @@ -500,9 +487,8 @@ public void testGetApplications() throws YarnException, IOException { @Test(timeout = 10000) public void testGetApplicationAttempts() throws YarnException, IOException { - Configuration conf = new Configuration(); final YarnClient client = new MockYarnClient(); - client.init(conf); + client.init(getConf()); client.start(); ApplicationId applicationId = ApplicationId.newInstance(1234, 5); @@ -539,7 +525,7 @@ public void testGetApplicationAttempt() throws YarnException, IOException { @Test(timeout = 10000) public void testGetContainers() throws YarnException, IOException { - Configuration conf = new Configuration(); + Configuration conf = getConf(); conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true); @@ -572,7 +558,7 @@ public void testGetContainers() throws YarnException, IOException { @Test(timeout = 10000) public void testGetContainerReport() throws YarnException, IOException { - Configuration conf = new Configuration(); + Configuration conf = getConf(); conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true); final YarnClient client = new MockYarnClient(); @@ -603,9 +589,8 @@ public void testGetContainerReport() throws YarnException, IOException { @Test (timeout = 10000) public void testGetLabelsToNodes() throws YarnException, IOException { - Configuration conf = new Configuration(); final YarnClient client = new MockYarnClient(); - client.init(conf); + client.init(getConf()); client.start(); // Get labels to nodes mapping @@ -629,9 +614,8 @@ public void testGetLabelsToNodes() throws YarnException, IOException { @Test (timeout = 10000) public void testGetNodesToLabels() throws YarnException, IOException { - Configuration conf = new Configuration(); final YarnClient client = new MockYarnClient(); - client.init(conf); + client.init(getConf()); client.start(); // Get labels to nodes mapping @@ -1025,7 +1009,7 @@ public void testAMMRTokens() throws Exception { MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1); YarnClient rmClient = null; try { - cluster.init(new YarnConfiguration()); + cluster.init(getConf()); cluster.start(); final Configuration yarnConf = cluster.getConfig(); rmClient = YarnClient.createYarnClient(); @@ -1146,7 +1130,7 @@ private void testAsyncAPIPollTimeoutHelper(Long valueForTimeout, boolean expectedTimeoutEnforcement) { YarnClientImpl client = new YarnClientImpl(); try { - Configuration conf = new Configuration(); + Configuration conf = getConf(); if (valueForTimeout != null) { conf.setLong( YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS, @@ -1165,7 +1149,7 @@ private void testAsyncAPIPollTimeoutHelper(Long valueForTimeout, @Test public void testBestEffortTimelineDelegationToken() throws Exception { - Configuration conf = new YarnConfiguration(); + Configuration conf = getConf(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); @@ -1199,7 +1183,7 @@ TimelineClient createTimelineClient() throws IOException, YarnException { @Test public void testAutomaticTimelineDelegationTokenLoading() throws Exception { - Configuration conf = new YarnConfiguration(); + Configuration conf = getConf(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf); TimelineDelegationTokenIdentifier timelineDT = @@ -1289,7 +1273,7 @@ public boolean isSecurityEnabled() { public void testParseTimelineDelegationTokenRenewer() throws Exception { // Client side YarnClientImpl client = (YarnClientImpl) YarnClient.createYarnClient(); - Configuration conf = new YarnConfiguration(); + Configuration conf = getConf(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.set(YarnConfiguration.RM_PRINCIPAL, "rm/_HOST@EXAMPLE.COM"); conf.set( @@ -1303,387 +1287,9 @@ public void testParseTimelineDelegationTokenRenewer() throws Exception { } } - private MiniYARNCluster setupMiniYARNCluster() throws Exception { - CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); - ReservationSystemTestUtil.setupQueueConfiguration(conf); - conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, - ResourceScheduler.class); - conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); - MiniYARNCluster cluster = - new MiniYARNCluster("testReservationAPIs", 2, 1, 1); - - cluster.init(conf); - cluster.start(); - - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - return cluster.getResourceManager().getRMContext() - .getReservationSystem() - .getPlan(ReservationSystemTestUtil.reservationQ) - .getTotalCapacity().getMemorySize() > 6000; - } - }, 10, 10000); - - return cluster; - } - - private YarnClient setupYarnClient(MiniYARNCluster cluster) { - final Configuration yarnConf = cluster.getConfig(); - YarnClient client = YarnClient.createYarnClient(); - client.init(yarnConf); - client.start(); - return client; - } - - private ReservationSubmissionRequest submitReservationTestHelper( - YarnClient client, long arrival, long deadline, long duration) - throws IOException, YarnException { - ReservationId reservationID = client.createReservation().getReservationId(); - ReservationSubmissionRequest sRequest = createSimpleReservationRequest( - reservationID, 4, arrival, deadline, duration); - ReservationSubmissionResponse sResponse = - client.submitReservation(sRequest); - Assert.assertNotNull(sResponse); - Assert.assertNotNull(reservationID); - System.out.println("Submit reservation response: " + reservationID); - - return sRequest; - } - - @Test - public void testCreateReservation() throws Exception { - MiniYARNCluster cluster = setupMiniYARNCluster(); - YarnClient client = setupYarnClient(cluster); - try { - Clock clock = new UTCClock(); - long arrival = clock.getTime(); - long duration = 60000; - long deadline = (long) (arrival + 1.05 * duration); - ReservationSubmissionRequest sRequest = - submitReservationTestHelper(client, arrival, deadline, duration); - - // Submit the reservation again with the same request and make sure it - // passes. - client.submitReservation(sRequest); - - // Submit the reservation with the same reservation id but different - // reservation definition, and ensure YarnException is thrown. - arrival = clock.getTime(); - ReservationDefinition rDef = sRequest.getReservationDefinition(); - rDef.setArrival(arrival + duration); - sRequest.setReservationDefinition(rDef); - try { - client.submitReservation(sRequest); - Assert.fail("Reservation submission should fail if a duplicate " - + "reservation id is used, but the reservation definition has been " - + "updated."); - } catch (Exception e) { - Assert.assertTrue(e instanceof YarnException); - } - } finally { - // clean-up - if (client != null) { - client.stop(); - } - cluster.stop(); - } - } - - @Test - public void testUpdateReservation() throws Exception { - MiniYARNCluster cluster = setupMiniYARNCluster(); - YarnClient client = setupYarnClient(cluster); - try { - Clock clock = new UTCClock(); - long arrival = clock.getTime(); - long duration = 60000; - long deadline = (long) (arrival + 1.05 * duration); - ReservationSubmissionRequest sRequest = - submitReservationTestHelper(client, arrival, deadline, duration); - - ReservationDefinition rDef = sRequest.getReservationDefinition(); - ReservationRequest rr = - rDef.getReservationRequests().getReservationResources().get(0); - ReservationId reservationID = sRequest.getReservationId(); - rr.setNumContainers(5); - arrival = clock.getTime(); - duration = 30000; - deadline = (long) (arrival + 1.05 * duration); - rr.setDuration(duration); - rDef.setArrival(arrival); - rDef.setDeadline(deadline); - ReservationUpdateRequest uRequest = - ReservationUpdateRequest.newInstance(rDef, reservationID); - ReservationUpdateResponse uResponse = client.updateReservation(uRequest); - Assert.assertNotNull(uResponse); - System.out.println("Update reservation response: " + uResponse); - } finally { - // clean-up - if (client != null) { - client.stop(); - } - cluster.stop(); - } - } - - @Test - public void testListReservationsByReservationId() throws Exception{ - MiniYARNCluster cluster = setupMiniYARNCluster(); - YarnClient client = setupYarnClient(cluster); - try { - Clock clock = new UTCClock(); - long arrival = clock.getTime(); - long duration = 60000; - long deadline = (long) (arrival + 1.05 * duration); - ReservationSubmissionRequest sRequest = - submitReservationTestHelper(client, arrival, deadline, duration); - - ReservationId reservationID = sRequest.getReservationId(); - ReservationListRequest request = ReservationListRequest.newInstance( - ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1, - -1, false); - ReservationListResponse response = client.listReservations(request); - Assert.assertNotNull(response); - Assert.assertEquals(1, response.getReservationAllocationState().size()); - Assert.assertEquals(response.getReservationAllocationState().get(0) - .getReservationId().getId(), reservationID.getId()); - Assert.assertEquals(response.getReservationAllocationState().get(0) - .getResourceAllocationRequests().size(), 0); - } finally { - // clean-up - if (client != null) { - client.stop(); - } - cluster.stop(); - } - } - - @Test - public void testListReservationsByTimeInterval() throws Exception { - MiniYARNCluster cluster = setupMiniYARNCluster(); - YarnClient client = setupYarnClient(cluster); - try { - Clock clock = new UTCClock(); - long arrival = clock.getTime(); - long duration = 60000; - long deadline = (long) (arrival + 1.05 * duration); - ReservationSubmissionRequest sRequest = - submitReservationTestHelper(client, arrival, deadline, duration); - - // List reservations, search by a point in time within the reservation - // range. - arrival = clock.getTime(); - ReservationId reservationID = sRequest.getReservationId(); - ReservationListRequest request = ReservationListRequest.newInstance( - ReservationSystemTestUtil.reservationQ, "", arrival + duration / 2, - arrival + duration / 2, true); - - ReservationListResponse response = client.listReservations(request); - Assert.assertNotNull(response); - Assert.assertEquals(1, response.getReservationAllocationState().size()); - Assert.assertEquals(response.getReservationAllocationState().get(0) - .getReservationId().getId(), reservationID.getId()); - // List reservations, search by time within reservation interval. - request = ReservationListRequest.newInstance( - ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE, true); - - response = client.listReservations(request); - Assert.assertNotNull(response); - Assert.assertEquals(1, response.getReservationAllocationState().size()); - Assert.assertEquals(response.getReservationAllocationState().get(0) - .getReservationId().getId(), reservationID.getId()); - // Verify that the full resource allocations exist. - Assert.assertTrue(response.getReservationAllocationState().get(0) - .getResourceAllocationRequests().size() > 0); - - // Verify that the full RDL is returned. - ReservationRequests reservationRequests = - response.getReservationAllocationState().get(0) - .getReservationDefinition().getReservationRequests(); - Assert.assertEquals("R_ALL", - reservationRequests.getInterpreter().toString()); - Assert.assertTrue(reservationRequests.getReservationResources().get(0) - .getDuration() == duration); - } finally { - // clean-up - if (client != null) { - client.stop(); - } - cluster.stop(); - } - } - - @Test - public void testListReservationsByInvalidTimeInterval() throws Exception { - MiniYARNCluster cluster = setupMiniYARNCluster(); - YarnClient client = setupYarnClient(cluster); - try { - Clock clock = new UTCClock(); - long arrival = clock.getTime(); - long duration = 60000; - long deadline = (long) (arrival + 1.05 * duration); - ReservationSubmissionRequest sRequest = - submitReservationTestHelper(client, arrival, deadline, duration); - - // List reservations, search by invalid end time == -1. - ReservationListRequest request = ReservationListRequest - .newInstance(ReservationSystemTestUtil.reservationQ, "", 1, -1, true); - - ReservationListResponse response = client.listReservations(request); - Assert.assertNotNull(response); - Assert.assertEquals(1, response.getReservationAllocationState().size()); - Assert.assertEquals(response.getReservationAllocationState().get(0) - .getReservationId().getId(), sRequest.getReservationId().getId()); - - // List reservations, search by invalid end time < -1. - request = ReservationListRequest.newInstance( - ReservationSystemTestUtil.reservationQ, "", 1, -10, true); - - response = client.listReservations(request); - Assert.assertNotNull(response); - Assert.assertEquals(1, response.getReservationAllocationState().size()); - Assert.assertEquals(response.getReservationAllocationState().get(0) - .getReservationId().getId(), sRequest.getReservationId().getId()); - } finally { - // clean-up - if (client != null) { - client.stop(); - } - cluster.stop(); - } - } - - @Test - public void testListReservationsByTimeIntervalContainingNoReservations() - throws Exception { - MiniYARNCluster cluster = setupMiniYARNCluster(); - YarnClient client = setupYarnClient(cluster); - try { - Clock clock = new UTCClock(); - long arrival = clock.getTime(); - long duration = 60000; - long deadline = (long) (arrival + 1.05 * duration); - ReservationSubmissionRequest sRequest = - submitReservationTestHelper(client, arrival, deadline, duration); - - // List reservations, search by very large start time. - ReservationListRequest request = ReservationListRequest.newInstance( - ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE, -1, - false); - - ReservationListResponse response = client.listReservations(request); - - // Ensure all reservations are filtered out. - Assert.assertNotNull(response); - Assert.assertEquals(response.getReservationAllocationState().size(), 0); - - duration = 30000; - deadline = sRequest.getReservationDefinition().getDeadline(); - - // List reservations, search by start time after the reservation - // end time. - request = ReservationListRequest.newInstance( - ReservationSystemTestUtil.reservationQ, "", deadline + duration, - deadline + 2 * duration, false); - - response = client.listReservations(request); - - // Ensure all reservations are filtered out. - Assert.assertNotNull(response); - Assert.assertEquals(response.getReservationAllocationState().size(), 0); - - arrival = clock.getTime(); - // List reservations, search by end time before the reservation start - // time. - request = ReservationListRequest.newInstance( - ReservationSystemTestUtil.reservationQ, "", 0, arrival - duration, - false); - - response = client.listReservations(request); - - // Ensure all reservations are filtered out. - Assert.assertNotNull(response); - Assert.assertEquals(response.getReservationAllocationState().size(), 0); - - // List reservations, search by very small end time. - request = ReservationListRequest - .newInstance(ReservationSystemTestUtil.reservationQ, "", 0, 1, false); - - response = client.listReservations(request); - - // Ensure all reservations are filtered out. - Assert.assertNotNull(response); - Assert.assertEquals(response.getReservationAllocationState().size(), 0); - - } finally { - // clean-up - if (client != null) { - client.stop(); - } - cluster.stop(); - } - } - - @Test - public void testReservationDelete() throws Exception { - MiniYARNCluster cluster = setupMiniYARNCluster(); - YarnClient client = setupYarnClient(cluster); - try { - Clock clock = new UTCClock(); - long arrival = clock.getTime(); - long duration = 60000; - long deadline = (long) (arrival + 1.05 * duration); - ReservationSubmissionRequest sRequest = - submitReservationTestHelper(client, arrival, deadline, duration); - - ReservationId reservationID = sRequest.getReservationId(); - // Delete the reservation - ReservationDeleteRequest dRequest = - ReservationDeleteRequest.newInstance(reservationID); - ReservationDeleteResponse dResponse = client.deleteReservation(dRequest); - Assert.assertNotNull(dResponse); - System.out.println("Delete reservation response: " + dResponse); - - // List reservations, search by non-existent reservationID - ReservationListRequest request = ReservationListRequest.newInstance( - ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1, - -1, false); - - ReservationListResponse response = client.listReservations(request); - Assert.assertNotNull(response); - Assert.assertEquals(0, response.getReservationAllocationState().size()); - } finally { - // clean-up - if (client != null) { - client.stop(); - } - cluster.stop(); - } - } - - private ReservationSubmissionRequest createSimpleReservationRequest( - ReservationId reservationId, int numContainers, long arrival, - long deadline, long duration) { - // create a request with a single atomic ask - ReservationRequest r = - ReservationRequest.newInstance(Resource.newInstance(1024, 1), - numContainers, 1, duration); - ReservationRequests reqs = - ReservationRequests.newInstance(Collections.singletonList(r), - ReservationRequestInterpreter.R_ALL); - ReservationDefinition rDef = - ReservationDefinition.newInstance(arrival, deadline, reqs, - "testYarnClient#reservation"); - ReservationSubmissionRequest request = - ReservationSubmissionRequest.newInstance(rDef, - ReservationSystemTestUtil.reservationQ, reservationId); - return request; - } - @Test(timeout = 30000, expected = ApplicationNotFoundException.class) public void testShouldNotRetryForeverForNonNetworkExceptions() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); + YarnConfiguration conf = getConf(); conf.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, -1); ResourceManager rm = null; @@ -1715,10 +1321,9 @@ public void testShouldNotRetryForeverForNonNetworkExceptions() throws Exception @Test public void testSignalContainer() throws Exception { - Configuration conf = new Configuration(); @SuppressWarnings("resource") final YarnClient client = new MockYarnClient(); - client.init(conf); + client.init(getConf()); client.start(); ApplicationId applicationId = ApplicationId.newInstance(1234, 5); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( @@ -1741,7 +1346,7 @@ private void testCreateTimelineClientWithError( boolean timelineClientBestEffort, Throwable mockErr, CreateTimelineClientErrorVerifier errVerifier) throws Exception { - Configuration conf = new Configuration(); + Configuration conf = getConf(); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, timelineServiceEnabled); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClientWithReservation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClientWithReservation.java new file mode 100644 index 0000000000..0836b7b9c9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClientWithReservation.java @@ -0,0 +1,521 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client.api.impl; + +import com.google.common.base.Supplier; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.UTCClock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.stream.Collectors; + + +/** + * This class is to test class {@link YarnClient) and {@link YarnClientImpl} + * with Reservation. + */ +@RunWith(Parameterized.class) +public class TestYarnClientWithReservation { + protected final static String TEST_DIR = + new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); + protected final static String FS_ALLOC_FILE = + new File(TEST_DIR, "test-fs-queues.xml").getAbsolutePath(); + + public enum SchedulerType { + CAPACITY, FAIR + } + + private SchedulerType schedulerType; + + @Parameterized.Parameters(name = "{0}") + public static Collection getParameters() { + return Arrays.stream(SchedulerType.values()).map( + type -> new Object[]{type}).collect(Collectors.toList()); + } + + public TestYarnClientWithReservation(SchedulerType scheduler) { + this.schedulerType = scheduler; + } + + + private MiniYARNCluster setupMiniYARNCluster() throws Exception { + MiniYARNCluster cluster = + new MiniYARNCluster("testReservationAPIs", 2, 1, 1); + + cluster.init(getConfigurationForReservation()); + cluster.start(); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return cluster.getResourceManager().getRMContext() + .getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ) + .getTotalCapacity().getMemorySize() > 6000; + } + }, 10, 10000); + + return cluster; + } + + private Configuration getConfigurationForReservation() { + Configuration conf = new Configuration(); + if (schedulerType == SchedulerType.FAIR) { + conf = configureReservationForFairScheduler(); + conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); + } else if (schedulerType == SchedulerType.CAPACITY) { + conf = configureReservationForCapacityScheduler(); + conf.set(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class.getName()); + } + + conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); + return conf; + } + + private Configuration configureReservationForCapacityScheduler() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + ReservationSystemTestUtil.setupQueueConfiguration(conf); + return conf; + } + + private Configuration configureReservationForFairScheduler() { + Configuration conf = new Configuration(); + try { + PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(" "); + out.println(" "); + out.println(" "); + // set weight to 10 to make sure this queue get enough steady fair share + out.println(" 10"); + out.println(" "); + out.println(""); + out.println("drf" + + ""); + out.println(""); + out.close(); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE); + return conf; + } + + private YarnClient setupYarnClient(MiniYARNCluster cluster) { + final Configuration yarnConf = cluster.getConfig(); + YarnClient client = YarnClient.createYarnClient(); + client.init(yarnConf); + client.start(); + return client; + } + + private ReservationSubmissionRequest submitReservationTestHelper( + YarnClient client, long arrival, long deadline, long duration) + throws IOException, YarnException { + ReservationId reservationID = client.createReservation().getReservationId(); + ReservationSubmissionRequest sRequest = createSimpleReservationRequest( + reservationID, 4, arrival, deadline, duration); + ReservationSubmissionResponse sResponse = + client.submitReservation(sRequest); + Assert.assertNotNull(sResponse); + Assert.assertNotNull(reservationID); + System.out.println("Submit reservation response: " + reservationID); + + return sRequest; + } + + @Before + public void setup() { + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.setMiniClusterMode(true); + } + + @Test + public void testCreateReservation() throws Exception { + MiniYARNCluster cluster = setupMiniYARNCluster(); + YarnClient client = setupYarnClient(cluster); + try { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + ReservationSubmissionRequest sRequest = + submitReservationTestHelper(client, arrival, deadline, duration); + + // Submit the reservation again with the same request and make sure it + // passes. + client.submitReservation(sRequest); + + // Submit the reservation with the same reservation id but different + // reservation definition, and ensure YarnException is thrown. + arrival = clock.getTime(); + ReservationDefinition rDef = sRequest.getReservationDefinition(); + rDef.setArrival(arrival + duration); + sRequest.setReservationDefinition(rDef); + try { + client.submitReservation(sRequest); + Assert.fail("Reservation submission should fail if a duplicate " + + "reservation id is used, but the reservation definition has been " + + "updated."); + } catch (Exception e) { + Assert.assertTrue(e instanceof YarnException); + } + } finally { + // clean-up + if (client != null) { + client.stop(); + } + cluster.stop(); + } + } + + @Test + public void testUpdateReservation() throws Exception { + MiniYARNCluster cluster = setupMiniYARNCluster(); + YarnClient client = setupYarnClient(cluster); + try { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + ReservationSubmissionRequest sRequest = + submitReservationTestHelper(client, arrival, deadline, duration); + + ReservationDefinition rDef = sRequest.getReservationDefinition(); + ReservationRequest rr = + rDef.getReservationRequests().getReservationResources().get(0); + ReservationId reservationID = sRequest.getReservationId(); + rr.setNumContainers(5); + arrival = clock.getTime(); + duration = 30000; + deadline = (long) (arrival + 1.05 * duration); + rr.setDuration(duration); + rDef.setArrival(arrival); + rDef.setDeadline(deadline); + ReservationUpdateRequest uRequest = + ReservationUpdateRequest.newInstance(rDef, reservationID); + ReservationUpdateResponse uResponse = client.updateReservation(uRequest); + Assert.assertNotNull(uResponse); + System.out.println("Update reservation response: " + uResponse); + } finally { + // clean-up + if (client != null) { + client.stop(); + } + cluster.stop(); + } + } + + private ReservationSubmissionRequest createSimpleReservationRequest( + ReservationId reservationId, int numContainers, long arrival, + long deadline, long duration) { + // create a request with a single atomic ask + ReservationRequest r = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + numContainers, 1, duration); + ReservationRequests reqs = + ReservationRequests.newInstance(Collections.singletonList(r), + ReservationRequestInterpreter.R_ALL); + ReservationDefinition rDef = + ReservationDefinition.newInstance(arrival, deadline, reqs, + "testYarnClient#reservation"); + ReservationSubmissionRequest request = + ReservationSubmissionRequest.newInstance(rDef, + ReservationSystemTestUtil.reservationQ, reservationId); + return request; + } + + + @Test + public void testListReservationsByReservationId() throws Exception{ + MiniYARNCluster cluster = setupMiniYARNCluster(); + YarnClient client = setupYarnClient(cluster); + try { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + ReservationSubmissionRequest sRequest = + submitReservationTestHelper(client, arrival, deadline, duration); + + ReservationId reservationID = sRequest.getReservationId(); + ReservationListRequest request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1, + -1, false); + ReservationListResponse response = client.listReservations(request); + Assert.assertNotNull(response); + Assert.assertEquals(1, response.getReservationAllocationState().size()); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getReservationId().getId(), reservationID.getId()); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getResourceAllocationRequests().size(), 0); + } finally { + // clean-up + if (client != null) { + client.stop(); + } + cluster.stop(); + } + } + + @Test + public void testListReservationsByTimeInterval() throws Exception { + MiniYARNCluster cluster = setupMiniYARNCluster(); + YarnClient client = setupYarnClient(cluster); + try { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + ReservationSubmissionRequest sRequest = + submitReservationTestHelper(client, arrival, deadline, duration); + + // List reservations, search by a point in time within the reservation + // range. + arrival = clock.getTime(); + ReservationId reservationID = sRequest.getReservationId(); + ReservationListRequest request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", arrival + duration / 2, + arrival + duration / 2, true); + + ReservationListResponse response = client.listReservations(request); + Assert.assertNotNull(response); + Assert.assertEquals(1, response.getReservationAllocationState().size()); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getReservationId().getId(), reservationID.getId()); + // List reservations, search by time within reservation interval. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE, true); + + response = client.listReservations(request); + Assert.assertNotNull(response); + Assert.assertEquals(1, response.getReservationAllocationState().size()); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getReservationId().getId(), reservationID.getId()); + // Verify that the full resource allocations exist. + Assert.assertTrue(response.getReservationAllocationState().get(0) + .getResourceAllocationRequests().size() > 0); + + // Verify that the full RDL is returned. + ReservationRequests reservationRequests = + response.getReservationAllocationState().get(0) + .getReservationDefinition().getReservationRequests(); + Assert.assertEquals("R_ALL", + reservationRequests.getInterpreter().toString()); + Assert.assertTrue(reservationRequests.getReservationResources().get(0) + .getDuration() == duration); + } finally { + // clean-up + if (client != null) { + client.stop(); + } + cluster.stop(); + } + } + + @Test + public void testListReservationsByInvalidTimeInterval() throws Exception { + MiniYARNCluster cluster = setupMiniYARNCluster(); + YarnClient client = setupYarnClient(cluster); + try { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + ReservationSubmissionRequest sRequest = + submitReservationTestHelper(client, arrival, deadline, duration); + + // List reservations, search by invalid end time == -1. + ReservationListRequest request = ReservationListRequest + .newInstance(ReservationSystemTestUtil.reservationQ, "", 1, -1, true); + + ReservationListResponse response = client.listReservations(request); + Assert.assertNotNull(response); + Assert.assertEquals(1, response.getReservationAllocationState().size()); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getReservationId().getId(), sRequest.getReservationId().getId()); + + // List reservations, search by invalid end time < -1. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", 1, -10, true); + + response = client.listReservations(request); + Assert.assertNotNull(response); + Assert.assertEquals(1, response.getReservationAllocationState().size()); + Assert.assertEquals(response.getReservationAllocationState().get(0) + .getReservationId().getId(), sRequest.getReservationId().getId()); + } finally { + // clean-up + if (client != null) { + client.stop(); + } + cluster.stop(); + } + } + + @Test + public void testListReservationsByTimeIntervalContainingNoReservations() + throws Exception { + MiniYARNCluster cluster = setupMiniYARNCluster(); + YarnClient client = setupYarnClient(cluster); + try { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + ReservationSubmissionRequest sRequest = + submitReservationTestHelper(client, arrival, deadline, duration); + + // List reservations, search by very large start time. + ReservationListRequest request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE, -1, + false); + + ReservationListResponse response = client.listReservations(request); + + // Ensure all reservations are filtered out. + Assert.assertNotNull(response); + Assert.assertEquals(response.getReservationAllocationState().size(), 0); + + duration = 30000; + deadline = sRequest.getReservationDefinition().getDeadline(); + + // List reservations, search by start time after the reservation + // end time. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", deadline + duration, + deadline + 2 * duration, false); + + response = client.listReservations(request); + + // Ensure all reservations are filtered out. + Assert.assertNotNull(response); + Assert.assertEquals(response.getReservationAllocationState().size(), 0); + + arrival = clock.getTime(); + // List reservations, search by end time before the reservation start + // time. + request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, "", 0, arrival - duration, + false); + + response = client.listReservations(request); + + // Ensure all reservations are filtered out. + Assert.assertNotNull(response); + Assert.assertEquals(response.getReservationAllocationState().size(), 0); + + // List reservations, search by very small end time. + request = ReservationListRequest + .newInstance(ReservationSystemTestUtil.reservationQ, "", 0, 1, false); + + response = client.listReservations(request); + + // Ensure all reservations are filtered out. + Assert.assertNotNull(response); + Assert.assertEquals(response.getReservationAllocationState().size(), 0); + + } finally { + // clean-up + if (client != null) { + client.stop(); + } + cluster.stop(); + } + } + + @Test + public void testReservationDelete() throws Exception { + MiniYARNCluster cluster = setupMiniYARNCluster(); + YarnClient client = setupYarnClient(cluster); + try { + Clock clock = new UTCClock(); + long arrival = clock.getTime(); + long duration = 60000; + long deadline = (long) (arrival + 1.05 * duration); + ReservationSubmissionRequest sRequest = + submitReservationTestHelper(client, arrival, deadline, duration); + + ReservationId reservationID = sRequest.getReservationId(); + // Delete the reservation + ReservationDeleteRequest dRequest = + ReservationDeleteRequest.newInstance(reservationID); + ReservationDeleteResponse dResponse = client.deleteReservation(dRequest); + Assert.assertNotNull(dResponse); + System.out.println("Delete reservation response: " + dResponse); + + // List reservations, search by non-existent reservationID + ReservationListRequest request = ReservationListRequest.newInstance( + ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1, + -1, false); + + ReservationListResponse response = client.listReservations(request); + Assert.assertNotNull(response); + Assert.assertEquals(0, response.getReservationAllocationState().size()); + } finally { + // clean-up + if (client != null) { + client.stop(); + } + cluster.stop(); + } + } + +}