YARN-4994. Use MiniYARNCluster with try-with-resources in tests. Contributed by Andras Bokor.
This commit is contained in:
parent
736f54b727
commit
ae401539ea
@ -563,11 +563,9 @@ public void testTimelineEventHandling() throws Exception {
|
||||
TestParams t = new TestParams(RunningAppContext.class, false);
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
MiniYARNCluster yarnCluster = null;
|
||||
long currentTime = System.currentTimeMillis();
|
||||
try {
|
||||
yarnCluster = new MiniYARNCluster(
|
||||
TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1);
|
||||
try (MiniYARNCluster yarnCluster = new MiniYARNCluster(
|
||||
TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1)) {
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
Configuration confJHEH = new YarnConfiguration(conf);
|
||||
@ -720,10 +718,6 @@ public void testTimelineEventHandling() throws Exception {
|
||||
tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
|
||||
Assert.assertEquals(TaskType.MAP.toString(),
|
||||
tEntity.getEvents().get(1).getEventInfo().get("TASK_TYPE"));
|
||||
} finally {
|
||||
if (yarnCluster != null) {
|
||||
yarnCluster.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -165,13 +165,11 @@ public void testCheckMaxEligible() throws Exception {
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testFilterAppsByAggregatedStatus() throws Exception {
|
||||
MiniYARNCluster yarnCluster = null;
|
||||
try {
|
||||
try (MiniYARNCluster yarnCluster =
|
||||
new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(),
|
||||
1, 1, 1, 1)) {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
yarnCluster =
|
||||
new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(), 1,
|
||||
1, 1, 1);
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
conf = yarnCluster.getConfig();
|
||||
@ -237,10 +235,6 @@ public void testFilterAppsByAggregatedStatus() throws Exception {
|
||||
Assert.assertTrue(hal.eligibleApplications.contains(app4));
|
||||
Assert.assertTrue(hal.eligibleApplications.contains(app7));
|
||||
Assert.assertTrue(hal.eligibleApplications.contains(app8));
|
||||
} finally {
|
||||
if (yarnCluster != null) {
|
||||
yarnCluster.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -52,16 +52,14 @@ public class TestHadoopArchiveLogsRunner {
|
||||
|
||||
@Test(timeout = 50000)
|
||||
public void testHadoopArchiveLogs() throws Exception {
|
||||
MiniYARNCluster yarnCluster = null;
|
||||
MiniDFSCluster dfsCluster = null;
|
||||
FileSystem fs = null;
|
||||
try {
|
||||
try (MiniYARNCluster yarnCluster =
|
||||
new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
|
||||
1, 2, 1, 1)) {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
|
||||
yarnCluster =
|
||||
new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
|
||||
1, 2, 1, 1);
|
||||
yarnCluster.init(conf);
|
||||
yarnCluster.start();
|
||||
conf = yarnCluster.getConfig();
|
||||
@ -133,9 +131,6 @@ public int compare(FileStatus o1, FileStatus o2) {
|
||||
harLogs[2].getOwner());
|
||||
Assert.assertEquals(0, fs.listStatus(workingDir).length);
|
||||
} finally {
|
||||
if (yarnCluster != null) {
|
||||
yarnCluster.stop();
|
||||
}
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
}
|
||||
|
@ -35,8 +35,6 @@ public class TestHedgingRequestRMFailoverProxyProvider {
|
||||
|
||||
@Test
|
||||
public void testHedgingRequestProxyProvider() throws Exception {
|
||||
final MiniYARNCluster cluster =
|
||||
new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1);
|
||||
Configuration conf = new YarnConfiguration();
|
||||
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
@ -49,41 +47,44 @@ public void testHedgingRequestProxyProvider() throws Exception {
|
||||
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
|
||||
2000);
|
||||
|
||||
HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
|
||||
HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
|
||||
HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
|
||||
HATestUtil.setRpcAddressForRM("rm4", 40000, conf);
|
||||
HATestUtil.setRpcAddressForRM("rm5", 50000, conf);
|
||||
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
|
||||
try (MiniYARNCluster cluster =
|
||||
new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1)) {
|
||||
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
|
||||
HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
|
||||
HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
|
||||
HATestUtil.setRpcAddressForRM("rm4", 40000, conf);
|
||||
HATestUtil.setRpcAddressForRM("rm5", 50000, conf);
|
||||
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
|
||||
|
||||
final YarnClient client = YarnClient.createYarnClient();
|
||||
client.init(conf);
|
||||
client.start();
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
|
||||
// Transition rm5 to active;
|
||||
long start = System.currentTimeMillis();
|
||||
makeRMActive(cluster, 4);
|
||||
final YarnClient client = YarnClient.createYarnClient();
|
||||
client.init(conf);
|
||||
client.start();
|
||||
|
||||
validateActiveRM(client);
|
||||
// Transition rm5 to active;
|
||||
long start = System.currentTimeMillis();
|
||||
makeRMActive(cluster, 4);
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println("Client call succeeded at " + end);
|
||||
// should return the response fast
|
||||
Assert.assertTrue(end - start <= 10000);
|
||||
validateActiveRM(client);
|
||||
|
||||
// transition rm5 to standby
|
||||
cluster.getResourceManager(4).getRMContext().getRMAdminService()
|
||||
.transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
|
||||
long end = System.currentTimeMillis();
|
||||
System.out.println("Client call succeeded at " + end);
|
||||
// should return the response fast
|
||||
Assert.assertTrue(end - start <= 10000);
|
||||
|
||||
makeRMActive(cluster, 2);
|
||||
// transition rm5 to standby
|
||||
cluster.getResourceManager(4).getRMContext().getRMAdminService()
|
||||
.transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
|
||||
|
||||
validateActiveRM(client);
|
||||
makeRMActive(cluster, 2);
|
||||
|
||||
cluster.stop();
|
||||
validateActiveRM(client);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private void validateActiveRM(YarnClient client) throws IOException {
|
||||
|
@ -60,11 +60,11 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
|
||||
*/
|
||||
@Test(timeout = 120000)
|
||||
public void testAMRMProxyE2E() throws Exception {
|
||||
MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1);
|
||||
YarnClient rmClient = null;
|
||||
ApplicationMasterProtocol client;
|
||||
|
||||
try {
|
||||
try (MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E",
|
||||
1, 1, 1);
|
||||
YarnClient rmClient = YarnClient.createYarnClient()) {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
|
||||
cluster.init(conf);
|
||||
@ -75,7 +75,6 @@ public void testAMRMProxyE2E() throws Exception {
|
||||
|
||||
yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
|
||||
rmClient = YarnClient.createYarnClient();
|
||||
rmClient.init(yarnConf);
|
||||
rmClient.start();
|
||||
|
||||
@ -135,11 +134,6 @@ public void testAMRMProxyE2E() throws Exception {
|
||||
Thread.sleep(500);
|
||||
Assert.assertNotEquals(RMAppState.FINISHED, rmApp.getState());
|
||||
|
||||
} finally {
|
||||
if (rmClient != null) {
|
||||
rmClient.stop();
|
||||
}
|
||||
cluster.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@ -150,12 +144,11 @@ public void testAMRMProxyE2E() throws Exception {
|
||||
*/
|
||||
@Test(timeout = 120000)
|
||||
public void testE2ETokenRenewal() throws Exception {
|
||||
MiniYARNCluster cluster =
|
||||
new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
|
||||
YarnClient rmClient = null;
|
||||
ApplicationMasterProtocol client;
|
||||
|
||||
try {
|
||||
try (MiniYARNCluster cluster =
|
||||
new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
|
||||
YarnClient rmClient = YarnClient.createYarnClient()) {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
|
||||
conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1500);
|
||||
@ -170,7 +163,6 @@ public void testE2ETokenRenewal() throws Exception {
|
||||
final Configuration yarnConf = cluster.getConfig();
|
||||
yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
|
||||
rmClient = YarnClient.createYarnClient();
|
||||
rmClient.init(yarnConf);
|
||||
rmClient.start();
|
||||
|
||||
@ -216,11 +208,6 @@ public void testE2ETokenRenewal() throws Exception {
|
||||
client.finishApplicationMaster(FinishApplicationMasterRequest
|
||||
.newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
|
||||
|
||||
} finally {
|
||||
if (rmClient != null) {
|
||||
rmClient.stop();
|
||||
}
|
||||
cluster.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@ -230,11 +217,11 @@ public void testE2ETokenRenewal() throws Exception {
|
||||
*/
|
||||
@Test(timeout = 120000)
|
||||
public void testE2ETokenSwap() throws Exception {
|
||||
MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1);
|
||||
YarnClient rmClient = null;
|
||||
ApplicationMasterProtocol client;
|
||||
|
||||
try {
|
||||
try (MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap",
|
||||
1, 1, 1);
|
||||
YarnClient rmClient = YarnClient.createYarnClient()) {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
|
||||
cluster.init(conf);
|
||||
@ -242,7 +229,6 @@ public void testE2ETokenSwap() throws Exception {
|
||||
|
||||
// the client will connect to the RM with the token provided by AMRMProxy
|
||||
final Configuration yarnConf = cluster.getConfig();
|
||||
rmClient = YarnClient.createYarnClient();
|
||||
rmClient.init(yarnConf);
|
||||
rmClient.start();
|
||||
|
||||
@ -260,11 +246,6 @@ public void testE2ETokenSwap() throws Exception {
|
||||
e.getMessage().startsWith("Invalid AMRMToken from appattempt_"));
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (rmClient != null) {
|
||||
rmClient.stop();
|
||||
}
|
||||
cluster.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1721,15 +1721,13 @@ public void testGetQueueInfoPreemptionDisabled() throws Exception {
|
||||
+ "ProportionalCapacityPreemptionPolicy");
|
||||
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
|
||||
conf.setBoolean(PREFIX + "root.a.a1.disable_preemption", true);
|
||||
MiniYARNCluster cluster =
|
||||
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
|
||||
|
||||
YarnClient yarnClient = null;
|
||||
try {
|
||||
try (MiniYARNCluster cluster =
|
||||
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
|
||||
YarnClient yarnClient = YarnClient.createYarnClient()) {
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
final Configuration yarnConf = cluster.getConfig();
|
||||
yarnClient = YarnClient.createYarnClient();
|
||||
yarnClient.init(yarnConf);
|
||||
yarnClient.start();
|
||||
|
||||
@ -1742,13 +1740,6 @@ public void testGetQueueInfoPreemptionDisabled() throws Exception {
|
||||
assertEquals(0, result);
|
||||
Assert.assertTrue(sysOutStream.toString()
|
||||
.contains("Preemption : disabled"));
|
||||
} finally {
|
||||
// clean-up
|
||||
if (yarnClient != null) {
|
||||
yarnClient.stop();
|
||||
}
|
||||
cluster.stop();
|
||||
cluster.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import java.io.IOException;
|
||||
|
||||
public class TestMiniYarnCluster {
|
||||
|
||||
@ -41,10 +42,11 @@ public void testTimelineServiceStartInMiniCluster() throws Exception {
|
||||
*/
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
|
||||
enableAHS = false;
|
||||
MiniYARNCluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
||||
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS);
|
||||
try (MiniYARNCluster cluster =
|
||||
new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
||||
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
|
||||
enableAHS)) {
|
||||
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
|
||||
@ -52,11 +54,6 @@ public void testTimelineServiceStartInMiniCluster() throws Exception {
|
||||
Assert.assertNull("Timeline Service should not have been started",
|
||||
cluster.getApplicationHistoryServer());
|
||||
}
|
||||
finally {
|
||||
if(cluster != null) {
|
||||
cluster.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Timeline service should start if TIMELINE_SERVICE_ENABLED == true
|
||||
@ -64,10 +61,10 @@ public void testTimelineServiceStartInMiniCluster() throws Exception {
|
||||
*/
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
enableAHS = false;
|
||||
cluster = null;
|
||||
try {
|
||||
cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
||||
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS);
|
||||
try (MiniYARNCluster cluster =
|
||||
new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
||||
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
|
||||
enableAHS)) {
|
||||
cluster.init(conf);
|
||||
|
||||
// Verify that the timeline-service starts on ephemeral ports by default
|
||||
@ -87,21 +84,16 @@ public void testTimelineServiceStartInMiniCluster() throws Exception {
|
||||
Assert.assertNotNull("Timeline Service should have been started",
|
||||
cluster.getApplicationHistoryServer());
|
||||
}
|
||||
finally {
|
||||
if(cluster != null) {
|
||||
cluster.stop();
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Timeline service should start if TIMELINE_SERVICE_ENABLED == false
|
||||
* and enableAHS == true
|
||||
*/
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
|
||||
enableAHS = true;
|
||||
cluster = null;
|
||||
try {
|
||||
cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
||||
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS);
|
||||
try (MiniYARNCluster cluster =
|
||||
new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
|
||||
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
|
||||
enableAHS)) {
|
||||
cluster.init(conf);
|
||||
cluster.start();
|
||||
|
||||
@ -115,15 +107,10 @@ public void testTimelineServiceStartInMiniCluster() throws Exception {
|
||||
Assert.assertNotNull("Timeline Service should have been started",
|
||||
cluster.getApplicationHistoryServer());
|
||||
}
|
||||
finally {
|
||||
if(cluster != null) {
|
||||
cluster.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiRMConf() {
|
||||
public void testMultiRMConf() throws IOException {
|
||||
String RM1_NODE_ID = "rm1", RM2_NODE_ID = "rm2";
|
||||
int RM1_PORT_BASE = 10000, RM2_PORT_BASE = 20000;
|
||||
Configuration conf = new YarnConfiguration();
|
||||
@ -137,23 +124,28 @@ public void testMultiRMConf() {
|
||||
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
|
||||
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
|
||||
|
||||
MiniYARNCluster cluster =
|
||||
try (MiniYARNCluster cluster =
|
||||
new MiniYARNCluster(TestMiniYarnCluster.class.getName(),
|
||||
2, 0, 1, 1);
|
||||
cluster.init(conf);
|
||||
Configuration conf1 = cluster.getResourceManager(0).getConfig(),
|
||||
conf2 = cluster.getResourceManager(1).getConfig();
|
||||
Assert.assertFalse(conf1 == conf2);
|
||||
Assert.assertEquals("0.0.0.0:18032",
|
||||
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID)));
|
||||
Assert.assertEquals("0.0.0.0:28032",
|
||||
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID)));
|
||||
Assert.assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID));
|
||||
2, 0, 1, 1)) {
|
||||
cluster.init(conf);
|
||||
Configuration conf1 = cluster.getResourceManager(0).getConfig(),
|
||||
conf2 = cluster.getResourceManager(1).getConfig();
|
||||
Assert.assertFalse(conf1 == conf2);
|
||||
Assert.assertEquals("0.0.0.0:18032",
|
||||
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
||||
RM1_NODE_ID)));
|
||||
Assert.assertEquals("0.0.0.0:28032",
|
||||
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
||||
RM2_NODE_ID)));
|
||||
Assert.assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID));
|
||||
|
||||
Assert.assertEquals("0.0.0.0:18032",
|
||||
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID)));
|
||||
Assert.assertEquals("0.0.0.0:28032",
|
||||
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID)));
|
||||
Assert.assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID));
|
||||
Assert.assertEquals("0.0.0.0:18032",
|
||||
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
||||
RM1_NODE_ID)));
|
||||
Assert.assertEquals("0.0.0.0:28032",
|
||||
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
|
||||
RM2_NODE_ID)));
|
||||
Assert.assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user