diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 8307f88c55..418efbe2f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -59,9 +60,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; @@ -202,12 +205,17 @@ public class TestClientRMService { .getRecordFactory(null); private String appType = "MockApp"; - + private final static String QUEUE_1 = "Q-1"; private final static String QUEUE_2 = "Q-2"; private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo"; private File resourceTypesFile = null; + private Configuration conf; + private ResourceManager resourceManager; + private YarnRPC rpc; + private ApplicationClientProtocol client; + @Test public void testGetDecommissioningClusterNodes() throws Exception { MockRM rm = new MockRM() { @@ -218,6 +226,7 @@ protected ClientRMService createClientRMService() { this.getRMContext().getRMDelegationTokenSecretManager()); }; }; + resourceManager = rm; rm.start(); int nodeMemory = 1024; @@ -230,13 +239,12 @@ protected ClientRMService createClientRMService() { rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING); // Create a client. - Configuration conf = new Configuration(); - YarnRPC rpc = YarnRPC.create(conf); + conf = new Configuration(); + rpc = YarnRPC.create(conf); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationClientProtocol client = - (ApplicationClientProtocol) rpc - .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + client = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); // Make call List nodeReports = client.getClusterNodes( @@ -247,9 +255,6 @@ protected ClientRMService createClientRMService() { NodeReport nr = nodeReports.iterator().next(); Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout()); Assert.assertNull(nr.getNodeUpdateType()); - - rpc.stopProxy(client, conf); - rm.close(); } @Test @@ -261,6 +266,7 @@ protected ClientRMService createClientRMService() { this.getRMContext().getRMDelegationTokenSecretManager()); }; }; + resourceManager = rm; rm.start(); RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager(); labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); @@ -272,7 +278,7 @@ protected ClientRMService createClientRMService() { labelsMgr.replaceLabelsOnNode(map); rm.sendNodeStarted(node); node.nodeHeartbeat(true); - + // Add and lose a node with label = y MockNM lostNode = rm.registerNode("host2:1235", 1024); rm.sendNodeStarted(lostNode); @@ -281,13 +287,12 @@ protected ClientRMService createClientRMService() { rm.sendNodeLost(lostNode); // Create a client. - Configuration conf = new Configuration(); - YarnRPC rpc = YarnRPC.create(conf); + conf = new Configuration(); + rpc = YarnRPC.create(conf); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationClientProtocol client = - (ApplicationClientProtocol) rpc - .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + client = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); // Make call GetClusterNodesRequest request = @@ -297,7 +302,7 @@ protected ClientRMService createClientRMService() { Assert.assertEquals(1, nodeReports.size()); Assert.assertNotSame("Node is expected to be healthy!", NodeState.UNHEALTHY, nodeReports.get(0).getNodeState()); - + // Check node's label = x Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("x")); Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); @@ -311,34 +316,34 @@ protected ClientRMService createClientRMService() { nodeReports = client.getClusterNodes(request).getNodeReports(); Assert.assertEquals("Unhealthy nodes should not show up by default", 0, nodeReports.size()); - + // Change label of host1 to y map = new HashMap>(); map.put(node.getNodeId(), ImmutableSet.of("y")); labelsMgr.replaceLabelsOnNode(map); - + // Now query for UNHEALTHY nodes request = GetClusterNodesRequest.newInstance(EnumSet.of(NodeState.UNHEALTHY)); nodeReports = client.getClusterNodes(request).getNodeReports(); Assert.assertEquals(1, nodeReports.size()); Assert.assertEquals("Node is expected to be unhealthy!", NodeState.UNHEALTHY, nodeReports.get(0).getNodeState()); - + Assert.assertTrue(nodeReports.get(0).getNodeLabels().contains("y")); Assert.assertNull(nodeReports.get(0).getDecommissioningTimeout()); Assert.assertNull(nodeReports.get(0).getNodeUpdateType()); - + // Remove labels of host1 map = new HashMap>(); map.put(node.getNodeId(), ImmutableSet.of("y")); labelsMgr.removeLabelsFromNode(map); - + // Query all states should return all nodes rm.registerNode("host3:1236", 1024); request = GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class)); nodeReports = client.getClusterNodes(request).getNodeReports(); Assert.assertEquals(3, nodeReports.size()); - + // All host1-3's label should be empty (instead of null) for (NodeReport report : nodeReports) { Assert.assertTrue(report.getNodeLabels() != null @@ -346,11 +351,8 @@ protected ClientRMService createClientRMService() { Assert.assertNull(report.getDecommissioningTimeout()); Assert.assertNull(report.getNodeUpdateType()); } - - rpc.stopProxy(client, conf); - rm.close(); } - + @Test public void testNonExistingApplicationReport() throws YarnException { RMContext rmContext = mock(RMContext.class); @@ -358,7 +360,6 @@ public void testNonExistingApplicationReport() throws YarnException { new ConcurrentHashMap()); ClientRMService rmService = new ClientRMService(rmContext, null, null, null, null, null); - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); GetApplicationReportRequest request = recordFactory .newRecordInstance(GetApplicationReportRequest.class); request.setApplicationId(ApplicationId.newInstance(0, 0)); @@ -373,7 +374,7 @@ public void testNonExistingApplicationReport() throws YarnException { } } - @Test + @Test public void testGetApplicationReport() throws Exception { ResourceScheduler scheduler = mock(ResourceScheduler.class); RMContext rmContext = mock(RMContext.class); @@ -389,14 +390,13 @@ public void testGetApplicationReport() throws Exception { ClientRMService rmService = new ClientRMService(rmContext, scheduler, null, mockAclsManager, null, null); try { - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); GetApplicationReportRequest request = recordFactory .newRecordInstance(GetApplicationReportRequest.class); request.setApplicationId(appId1); - GetApplicationReportResponse response = + GetApplicationReportResponse response = rmService.getApplicationReport(request); ApplicationReport report = response.getApplicationReport(); - ApplicationResourceUsageReport usageReport = + ApplicationResourceUsageReport usageReport = report.getApplicationResourceUsageReport(); Assert.assertEquals(10, usageReport.getMemorySeconds()); Assert.assertEquals(3, usageReport.getVcoreSeconds()); @@ -443,12 +443,11 @@ public void testGetApplicationReport() throws Exception { rmService.close(); } } - + @Test public void testGetApplicationAttemptReport() throws YarnException, IOException { ClientRMService rmService = createRMService(); - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); GetApplicationAttemptReportRequest request = recordFactory .newRecordInstance(GetApplicationAttemptReportRequest.class); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( @@ -477,7 +476,7 @@ public void testGetApplicationResourceUsageReportDummy() throws YarnException, public void handle(Event event) { } }); - ApplicationSubmissionContext asContext = + ApplicationSubmissionContext asContext = mock(ApplicationSubmissionContext.class); YarnConfiguration config = new YarnConfiguration(); RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId, @@ -490,7 +489,6 @@ public void handle(Event event) { @Test public void testGetApplicationAttempts() throws YarnException, IOException { ClientRMService rmService = createRMService(); - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); GetApplicationAttemptsRequest request = recordFactory .newRecordInstance(GetApplicationAttemptsRequest.class); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( @@ -512,7 +510,6 @@ public void testGetApplicationAttempts() throws YarnException, IOException { @Test public void testGetContainerReport() throws YarnException, IOException { ClientRMService rmService = createRMService(); - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); GetContainerReportRequest request = recordFactory .newRecordInstance(GetContainerReportRequest.class); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( @@ -533,7 +530,6 @@ public void testGetContainerReport() throws YarnException, IOException { @Test public void testGetContainers() throws YarnException, IOException { ClientRMService rmService = createRMService(); - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); GetContainersRequest request = recordFactory .newRecordInstance(GetContainersRequest.class); ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( @@ -598,12 +594,13 @@ public void testForceKillNonExistingApplication() throws YarnException { @Test public void testApplicationTagsValidation() throws IOException { - YarnConfiguration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); int maxtags = 3, appMaxTagLength = 5; conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAGS, maxtags); conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAG_LENGTH, appMaxTagLength); MockRM rm = new MockRM(conf); + resourceManager = rm; rm.init(conf); rm.start(); @@ -625,7 +622,6 @@ public void testApplicationTagsValidation() throws IOException { tags = Arrays.asList("tãg1", "tag2#"); validateApplicationTag(rmService, tags, "A tag can only have ASCII characters! Invalid tag - tãg1"); - rm.close(); } private void validateApplicationTag(ClientRMService rmService, @@ -643,9 +639,10 @@ private void validateApplicationTag(ClientRMService rmService, @Test public void testForceKillApplication() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); conf.setBoolean(MockRM.ENABLE_WEBAPP, true); MockRM rm = new MockRM(conf); + resourceManager = rm; rm.init(conf); rm.start(); @@ -701,7 +698,7 @@ public void testForceKillApplication() throws Exception { assertEquals("Incorrect number of apps in the RM", 2, rmService.getApplications(getRequest).getApplicationList().size()); } - + @Test (expected = ApplicationNotFoundException.class) public void testMoveAbsentApplication() throws YarnException { RMContext rmContext = mock(RMContext.class); @@ -1205,7 +1202,7 @@ private void setupCurrentCall(String hostName) throws UnknownHostException { new byte[]{123, 123, 123, 123})); Server.getCurCall().set(mockCall); } - + @Test (timeout = 30000) @SuppressWarnings ("rawtypes") public void testAppSubmit() throws Exception { @@ -1353,7 +1350,7 @@ public void handle(Event event) {} ApplicationId[] appIds = {getApplicationId(101), getApplicationId(102), getApplicationId(103)}; List tags = Arrays.asList("Tag1", "Tag2", "Tag3"); - + long[] submitTimeMillis = new long[3]; // Submit applications for (int i = 0; i < appIds.length; i++) { @@ -1380,23 +1377,23 @@ public void handle(Event event) {} request.setLimit(1L); assertEquals("Failed to limit applications", 1, rmService.getApplications(request).getApplicationList().size()); - + // Check start range request = GetApplicationsRequest.newInstance(); request.setStartRange(submitTimeMillis[0] + 1, System.currentTimeMillis()); - + // 2 applications are submitted after first timeMills - assertEquals("Incorrect number of matching start range", + assertEquals("Incorrect number of matching start range", 2, rmService.getApplications(request).getApplicationList().size()); - + // 1 application is submitted after the second timeMills request.setStartRange(submitTimeMillis[1] + 1, System.currentTimeMillis()); - assertEquals("Incorrect number of matching start range", + assertEquals("Incorrect number of matching start range", 1, rmService.getApplications(request).getApplicationList().size()); - + // no application is submitted after the third timeMills request.setStartRange(submitTimeMillis[2] + 1, System.currentTimeMillis()); - assertEquals("Incorrect number of matching start range", + assertEquals("Incorrect number of matching start range", 0, rmService.getApplications(request).getApplicationList().size()); // Check queue @@ -1468,7 +1465,7 @@ public void handle(Event event) {} assertEquals("Incorrect number of applications for the scope", 3, rmService.getApplications(request).getApplicationList().size()); } - + @Test(timeout=4000) public void testConcurrentAppSubmit() throws IOException, InterruptedException, BrokenBarrierException, @@ -1487,7 +1484,7 @@ public void testConcurrentAppSubmit() appId1, null, null); final SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest( appId2, null, null); - + final CyclicBarrier startBarrier = new CyclicBarrier(2); final CyclicBarrier endBarrier = new CyclicBarrier(2); @@ -1529,7 +1526,7 @@ public void run() { } }; t.start(); - + // submit another app, so go through while the first app is blocked startBarrier.await(); rmService.submitApplication(submitRequest2); @@ -1615,21 +1612,21 @@ private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext) private ConcurrentHashMap getRMApps( RMContext rmContext, YarnScheduler yarnScheduler) { - ConcurrentHashMap apps = - new ConcurrentHashMap(); + ConcurrentHashMap apps = + new ConcurrentHashMap(); ApplicationId applicationId1 = getApplicationId(1); ApplicationId applicationId2 = getApplicationId(2); ApplicationId applicationId3 = getApplicationId(3); YarnConfiguration config = new YarnConfiguration(); apps.put(applicationId1, getRMApp(rmContext, yarnScheduler, applicationId1, - config, "testqueue", 10, 3,null,null)); + config, "testqueue", 10, 3, null, null)); apps.put(applicationId2, getRMApp(rmContext, yarnScheduler, applicationId2, - config, "a", 20, 2,null,"")); + config, "a", 20, 2, null, "")); apps.put(applicationId3, getRMApp(rmContext, yarnScheduler, applicationId3, - config, "testqueue", 40, 5,"high-mem","high-mem")); + config, "testqueue", 40, 5, "high-mem", "high-mem")); return apps; } - + private List getSchedulerApps( Map apps) { List schedApps = new ArrayList(); @@ -1642,7 +1639,7 @@ private List getSchedulerApps( private static ApplicationId getApplicationId(int id) { return ApplicationId.newInstance(123456, id); } - + private static ApplicationAttemptId getApplicationAttemptId(int id) { return ApplicationAttemptId.newInstance(getApplicationId(id), 1); } @@ -1667,7 +1664,7 @@ public ApplicationReport createAndGetApplicationReport( String clientUserName, boolean allowAccess) { ApplicationReport report = super.createAndGetApplicationReport( clientUserName, allowAccess); - ApplicationResourceUsageReport usageReport = + ApplicationResourceUsageReport usageReport = report.getApplicationResourceUsageReport(); usageReport.setMemorySeconds(memorySeconds); usageReport.setVcoreSeconds(vcoreSeconds); @@ -1687,8 +1684,7 @@ public ApplicationReport createAndGetApplicationReport( RMContainerImpl containerimpl = spy(new RMContainerImpl(container, SchedulerRequestKey.extractFrom(container), attemptId, null, "", rmContext)); - Map attempts = - new HashMap(); + Map attempts = new HashMap<>(); attempts.put(attemptId, rmAppAttemptImpl); when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl); when(app.getAppAttempts()).thenReturn(attempts); @@ -1750,6 +1746,7 @@ private ResourceManager setupResourceManager() { ResourceScheduler.class); conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); MockRM rm = new MockRM(conf); + resourceManager = rm; rm.start(); try { rm.registerNode("127.0.0.1:1", 102400, 100); @@ -1790,8 +1787,8 @@ private ReservationSubmissionRequest submitReservationTestHelper( @Test public void testCreateReservation() { - ResourceManager rm = setupResourceManager(); - ClientRMService clientService = rm.getClientRMService(); + resourceManager = setupResourceManager(); + ClientRMService clientService = resourceManager.getClientRMService(); Clock clock = new UTCClock(); long arrival = clock.getTime(); long duration = 60000; @@ -1821,14 +1818,12 @@ public void testCreateReservation() { } catch (Exception e) { Assert.assertTrue(e instanceof YarnException); } - - rm.stop(); } @Test public void testUpdateReservation() { - ResourceManager rm = setupResourceManager(); - ClientRMService clientService = rm.getClientRMService(); + resourceManager = setupResourceManager(); + ClientRMService clientService = resourceManager.getClientRMService(); Clock clock = new UTCClock(); long arrival = clock.getTime(); long duration = 60000; @@ -1857,14 +1852,12 @@ public void testUpdateReservation() { } Assert.assertNotNull(uResponse); System.out.println("Update reservation response: " + uResponse); - - rm.stop(); } @Test public void testListReservationsByReservationId() { - ResourceManager rm = setupResourceManager(); - ClientRMService clientService = rm.getClientRMService(); + resourceManager = setupResourceManager(); + ClientRMService clientService = resourceManager.getClientRMService(); Clock clock = new UTCClock(); long arrival = clock.getTime(); long duration = 60000; @@ -1888,14 +1881,12 @@ public void testListReservationsByReservationId() { .getReservationId().getId(), reservationID.getId()); Assert.assertEquals(response.getReservationAllocationState().get(0) .getResourceAllocationRequests().size(), 0); - - rm.stop(); } @Test public void testListReservationsByTimeInterval() { - ResourceManager rm = setupResourceManager(); - ClientRMService clientService = rm.getClientRMService(); + resourceManager = setupResourceManager(); + ClientRMService clientService = resourceManager.getClientRMService(); Clock clock = new UTCClock(); long arrival = clock.getTime(); long duration = 60000; @@ -1947,14 +1938,12 @@ public void testListReservationsByTimeInterval() { reservationRequests.getInterpreter().toString()); Assert.assertTrue(reservationRequests.getReservationResources().get(0) .getDuration() == duration); - - rm.stop(); } @Test public void testListReservationsByInvalidTimeInterval() { - ResourceManager rm = setupResourceManager(); - ClientRMService clientService = rm.getClientRMService(); + resourceManager = setupResourceManager(); + ClientRMService clientService = resourceManager.getClientRMService(); Clock clock = new UTCClock(); long arrival = clock.getTime(); long duration = 60000; @@ -1991,14 +1980,12 @@ public void testListReservationsByInvalidTimeInterval() { Assert.assertEquals(1, response.getReservationAllocationState().size()); Assert.assertEquals(response.getReservationAllocationState().get(0) .getReservationId().getId(), sRequest.getReservationId().getId()); - - rm.stop(); } @Test public void testListReservationsByTimeIntervalContainingNoReservations() { - ResourceManager rm = setupResourceManager(); - ClientRMService clientService = rm.getClientRMService(); + resourceManager = setupResourceManager(); + ClientRMService clientService = resourceManager.getClientRMService(); Clock clock = new UTCClock(); long arrival = clock.getTime(); long duration = 60000; @@ -2073,14 +2060,12 @@ public void testListReservationsByTimeIntervalContainingNoReservations() { // Ensure all reservations are filtered out. Assert.assertNotNull(response); assertThat(response.getReservationAllocationState()).isEmpty(); - - rm.stop(); } @Test public void testReservationDelete() { - ResourceManager rm = setupResourceManager(); - ClientRMService clientService = rm.getClientRMService(); + resourceManager = setupResourceManager(); + ClientRMService clientService = resourceManager.getClientRMService(); Clock clock = new UTCClock(); long arrival = clock.getTime(); long duration = 60000; @@ -2114,8 +2099,6 @@ public void testReservationDelete() { } Assert.assertNotNull(response); Assert.assertEquals(0, response.getReservationAllocationState().size()); - - rm.stop(); } @Test @@ -2128,6 +2111,7 @@ protected ClientRMService createClientRMService() { .getRMDelegationTokenSecretManager()); }; }; + resourceManager = rm; rm.start(); NodeLabel labelX = NodeLabel.newInstance("x", false); NodeLabel labelY = NodeLabel.newInstance("y"); @@ -2142,12 +2126,12 @@ protected ClientRMService createClientRMService() { labelsMgr.replaceLabelsOnNode(map); // Create a client. - Configuration conf = new Configuration(); - YarnRPC rpc = YarnRPC.create(conf); + conf = new Configuration(); + rpc = YarnRPC.create(conf); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationClientProtocol client = (ApplicationClientProtocol) rpc - .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + client = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); // Get node labels collection GetClusterNodeLabelsResponse response = client @@ -2168,9 +2152,6 @@ protected ClientRMService createClientRMService() { // Below label "x" is not present in the response as exclusivity is true Assert.assertFalse(nodeToLabels.get(node1).containsAll( Arrays.asList(NodeLabel.newInstance("x")))); - - rpc.stopProxy(client, conf); - rm.stop(); } @Test @@ -2183,6 +2164,7 @@ protected ClientRMService createClientRMService() { .getRMDelegationTokenSecretManager()); }; }; + resourceManager = rm; rm.start(); NodeLabel labelX = NodeLabel.newInstance("x", false); @@ -2205,12 +2187,12 @@ protected ClientRMService createClientRMService() { labelsMgr.replaceLabelsOnNode(map); // Create a client. - Configuration conf = new Configuration(); - YarnRPC rpc = YarnRPC.create(conf); + conf = new Configuration(); + rpc = YarnRPC.create(conf); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationClientProtocol client = (ApplicationClientProtocol) rpc - .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + client = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); // Get node labels collection GetClusterNodeLabelsResponse response = client @@ -2244,9 +2226,6 @@ protected ClientRMService createClientRMService() { Assert.assertTrue(labelsToNodes.get(labelZ.getName()).containsAll( Arrays.asList(node1B, node3B))); assertThat(labelsToNodes.get(labelY.getName())).isNull(); - - rpc.stopProxy(client, conf); - rm.close(); } @Test(timeout = 120000) @@ -2259,6 +2238,7 @@ protected ClientRMService createClientRMService() { this.getRMContext().getRMDelegationTokenSecretManager()); } }; + resourceManager = rm; rm.start(); NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager(); @@ -2278,12 +2258,12 @@ protected ClientRMService createClientRMService() { nodes.put(host2.getHost(), ImmutableSet.of(docker)); mgr.addNodeAttributes(nodes); // Create a client. - Configuration conf = new Configuration(); - YarnRPC rpc = YarnRPC.create(conf); + conf = new Configuration(); + rpc = YarnRPC.create(conf); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationClientProtocol client = (ApplicationClientProtocol) rpc - .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + client = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); GetClusterNodeAttributesRequest request = GetClusterNodeAttributesRequest.newInstance(); @@ -2295,8 +2275,6 @@ protected ClientRMService createClientRMService() { Assert.assertTrue(attributes.contains(NodeAttributeInfo.newInstance(os))); Assert .assertTrue(attributes.contains(NodeAttributeInfo.newInstance(docker))); - rpc.stopProxy(client, conf); - rm.close(); } @Test(timeout = 120000) @@ -2309,6 +2287,7 @@ protected ClientRMService createClientRMService() { this.getRMContext().getRMDelegationTokenSecretManager()); } }; + resourceManager = rm; rm.start(); NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager(); @@ -2331,12 +2310,12 @@ protected ClientRMService createClientRMService() { nodes.put(node2, ImmutableSet.of(docker, dist)); mgr.addNodeAttributes(nodes); // Create a client. - Configuration conf = new Configuration(); - YarnRPC rpc = YarnRPC.create(conf); + conf = new Configuration(); + rpc = YarnRPC.create(conf); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationClientProtocol client = (ApplicationClientProtocol) rpc - .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + client = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); GetAttributesToNodesRequest request = GetAttributesToNodesRequest.newInstance(); @@ -2377,8 +2356,6 @@ protected ClientRMService createClientRMService() { attrs3.get(os.getAttributeKey()))); Assert.assertTrue(findHostnameAndValInMapping(node2, "docker0", attrs3.get(docker.getAttributeKey()))); - rpc.stopProxy(client, conf); - rm.close(); } private boolean findHostnameAndValInMapping(String hostname, String attrVal, @@ -2401,6 +2378,7 @@ protected ClientRMService createClientRMService() { this.getRMContext().getRMDelegationTokenSecretManager()); } }; + resourceManager = rm; rm.start(); NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager(); @@ -2423,12 +2401,12 @@ protected ClientRMService createClientRMService() { nodes.put(node2, ImmutableSet.of(docker, dist)); mgr.addNodeAttributes(nodes); // Create a client. - Configuration conf = new Configuration(); - YarnRPC rpc = YarnRPC.create(conf); + conf = new Configuration(); + rpc = YarnRPC.create(conf); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationClientProtocol client = (ApplicationClientProtocol) rpc - .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + client = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); // Specify null for hostnames. GetNodesToAttributesRequest request1 = @@ -2471,8 +2449,6 @@ protected ClientRMService createClientRMService() { client.getNodesToAttributes(request4); hostToAttrs = response4.getNodeToAttributes(); Assert.assertEquals(0, hostToAttrs.size()); - rpc.stopProxy(client, conf); - rm.close(); } @Test(timeout = 120000) @@ -2480,13 +2456,14 @@ public void testUpdatePriorityAndKillAppWithZeroClusterResource() throws Exception { int maxPriority = 10; int appPriority = 5; - YarnConfiguration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); Assume.assumeFalse("FairScheduler does not support Application Priorities", conf.get(YarnConfiguration.RM_SCHEDULER) .equals(FairScheduler.class.getName())); conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, maxPriority); MockRM rm = new MockRM(conf); + resourceManager = rm; rm.init(conf); rm.start(); MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder @@ -2498,20 +2475,20 @@ public void testUpdatePriorityAndKillAppWithZeroClusterResource() testApplicationPriorityUpdation(rmService, app1, appPriority, appPriority); rm.killApp(app1.getApplicationId()); rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); - rm.stop(); } @Test(timeout = 120000) public void testUpdateApplicationPriorityRequest() throws Exception { int maxPriority = 10; int appPriority = 5; - YarnConfiguration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); Assume.assumeFalse("FairScheduler does not support Application Priorities", conf.get(YarnConfiguration.RM_SCHEDULER) .equals(FairScheduler.class.getName())); conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, maxPriority); MockRM rm = new MockRM(conf); + resourceManager = rm; rm.init(conf); rm.start(); rm.registerNode("host1:1234", 1024); @@ -2555,8 +2532,6 @@ public void testUpdateApplicationPriorityRequest() throws Exception { Assert.assertEquals("Incorrect priority has been set to application", appPriority, rmService.updateApplicationPriority(updateRequest) .getApplicationPriority().getPriority()); - - rm.stop(); } private void testApplicationPriorityUpdation(ClientRMService rmService, @@ -2576,55 +2551,53 @@ private void testApplicationPriorityUpdation(ClientRMService rmService, updateApplicationPriority.getApplicationPriority().getPriority()); } - private void createExcludeFile(String filename) throws IOException { - File file = new File(filename); - if (file.exists()) { - file.delete(); + private File createExcludeFile(File testDir) throws IOException { + File excludeFile = new File(testDir, "excludeFile"); + try (FileOutputStream out = new FileOutputStream(excludeFile)) { + out.write("decommisssionedHost".getBytes(UTF_8)); } - - FileOutputStream out = new FileOutputStream(file); - out.write("decommisssionedHost".getBytes()); - out.close(); + return excludeFile; } @Test public void testRMStartWithDecommissionedNode() throws Exception { - String excludeFile = "excludeFile"; - createExcludeFile(excludeFile); - YarnConfiguration conf = new YarnConfiguration(); - conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, - excludeFile); - MockRM rm = new MockRM(conf) { - protected ClientRMService createClientRMService() { - return new ClientRMService(this.rmContext, scheduler, - this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, - this.getRMContext().getRMDelegationTokenSecretManager()); + File testDir = GenericTestUtils.getRandomizedTestDir(); + assertTrue("Failed to create test directory: " + testDir.getAbsolutePath(), testDir.mkdirs()); + try { + File excludeFile = createExcludeFile(testDir); + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + excludeFile.getAbsolutePath()); + MockRM rm = new MockRM(conf) { + protected ClientRMService createClientRMService() { + return new ClientRMService(this.rmContext, scheduler, + this.rmAppManager, this.applicationACLsManager, this.queueACLsManager, + this.getRMContext().getRMDelegationTokenSecretManager()); + }; }; - }; - rm.start(); + resourceManager = rm; + rm.start(); - YarnRPC rpc = YarnRPC.create(conf); - InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); - LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationClientProtocol client = - (ApplicationClientProtocol) rpc - .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + rpc = YarnRPC.create(conf); + InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); + LOG.info("Connecting to ResourceManager at " + rmAddress); + client = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); - // Make call - GetClusterNodesRequest request = - GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class)); - List nodeReports = client.getClusterNodes(request).getNodeReports(); - Assert.assertEquals(1, nodeReports.size()); - - rm.stop(); - rpc.stopProxy(client, conf); - new File(excludeFile).delete(); + // Make call + GetClusterNodesRequest request = + GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class)); + List nodeReports = client.getClusterNodes(request).getNodeReports(); + assertEquals(1, nodeReports.size()); + } finally { + FileUtil.fullyDelete(testDir); + } } @Test public void testGetResourceTypesInfoWhenResourceProfileDisabled() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); MockRM rm = new MockRM(conf) { protected ClientRMService createClientRMService() { return new ClientRMService(this.rmContext, scheduler, @@ -2632,14 +2605,14 @@ protected ClientRMService createClientRMService() { this.getRMContext().getRMDelegationTokenSecretManager()); } }; + resourceManager = rm; rm.start(); - YarnRPC rpc = YarnRPC.create(conf); + rpc = YarnRPC.create(conf); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationClientProtocol client = - (ApplicationClientProtocol) rpc - .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + client = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); // Make call GetAllResourceTypeInfoRequest request = @@ -2659,9 +2632,6 @@ protected ClientRMService createClientRMService() { response.getResourceTypeInfo().get(1).getName()); Assert.assertEquals(ResourceInformation.VCORES.getUnits(), response.getResourceTypeInfo().get(1).getDefaultUnit()); - - rm.stop(); - rpc.stopProxy(client, conf); } @Test @@ -2755,6 +2725,7 @@ protected ClientRMService createClientRMService() { this.getRMContext().getRMDelegationTokenSecretManager()); }; }; + resourceManager = rm; rm.start(); Resource resource = BuilderUtils.newResource(1024, 1); @@ -2769,13 +2740,12 @@ protected ClientRMService createClientRMService() { node.nodeHeartbeat(true); // Create a client. - Configuration conf = new Configuration(); - YarnRPC rpc = YarnRPC.create(conf); + conf = new Configuration(); + rpc = YarnRPC.create(conf); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationClientProtocol client = - (ApplicationClientProtocol) rpc - .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + client = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); // Make call GetClusterNodesRequest request = @@ -2807,9 +2777,6 @@ protected ClientRMService createClientRMService() { getResourceInformation("memory-mb").getUnits()); Assert.assertEquals(976562, nodeReports.get(0).getCapability(). getResourceInformation("memory-mb").getValue()); - - rpc.stopProxy(client, conf); - rm.close(); } @Test @@ -2821,6 +2788,7 @@ protected ClientRMService createClientRMService() { this.getRMContext().getRMDelegationTokenSecretManager()); }; }; + resourceManager = rm; rm.start(); ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); @@ -2833,13 +2801,12 @@ protected ClientRMService createClientRMService() { repeat(7, clusterMetrics::incrNumShutdownNMs); // Create a client. - Configuration conf = new Configuration(); - YarnRPC rpc = YarnRPC.create(conf); + conf = new Configuration(); + rpc = YarnRPC.create(conf); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); LOG.info("Connecting to ResourceManager at " + rmAddress); - ApplicationClientProtocol client = - (ApplicationClientProtocol) rpc - .getProxy(ApplicationClientProtocol.class, rmAddress, conf); + client = (ApplicationClientProtocol) rpc.getProxy( + ApplicationClientProtocol.class, rmAddress, conf); YarnClusterMetrics ymetrics = client.getClusterMetrics( GetClusterMetricsRequest.newInstance()).getClusterMetrics(); @@ -2852,18 +2819,21 @@ protected ClientRMService createClientRMService() { Assert.assertEquals(5, ymetrics.getNumUnhealthyNodeManagers()); Assert.assertEquals(6, ymetrics.getNumRebootedNodeManagers()); Assert.assertEquals(7, ymetrics.getNumShutdownNodeManagers()); - - rpc.stopProxy(client, conf); - rm.close(); } @After - public void tearDown(){ + public void tearDown() throws Exception { if (resourceTypesFile != null && resourceTypesFile.exists()) { resourceTypesFile.delete(); } ClusterMetrics.destroy(); DefaultMetricsSystem.shutdown(); + if (conf != null && client != null && rpc != null) { + rpc.stopProxy(client, conf); + } + if (resourceManager != null) { + resourceManager.close(); + } } private static void repeat(int n, Runnable r) {