MAPREDUCE-7280. MiniMRYarnCluster has hard-coded timeout waiting to start history server, with no way to disable. (#2065)
This commit is contained in:
parent
74fc13cf91
commit
7e73cad974
@ -18,9 +18,13 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Unit-test to test bringup and shutdown of Mini Map-Reduce Cluster.
|
* A Unit-test to test bringup and shutdown of Mini Map-Reduce Cluster.
|
||||||
@ -37,4 +41,20 @@ public void testBringUp() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMiniMRYarnClusterWithoutJHS() throws IOException {
|
||||||
|
MiniMRYarnCluster mr = null;
|
||||||
|
try {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.setBoolean(MiniMRYarnCluster.MR_HISTORY_MINICLUSTER_ENABLED, false);
|
||||||
|
mr = new MiniMRYarnCluster("testMiniMRYarnClusterWithoutJHS");
|
||||||
|
mr.init(conf);
|
||||||
|
mr.start();
|
||||||
|
Assert.assertEquals(null, mr.getHistoryServer());
|
||||||
|
} finally {
|
||||||
|
if (mr != null) {
|
||||||
|
mr.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -69,6 +69,10 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
|
|||||||
private JobHistoryServer historyServer;
|
private JobHistoryServer historyServer;
|
||||||
private JobHistoryServerWrapper historyServerWrapper;
|
private JobHistoryServerWrapper historyServerWrapper;
|
||||||
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
|
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
|
||||||
|
public static final String MR_HISTORY_MINICLUSTER_ENABLED =
|
||||||
|
JHAdminConfig.MR_HISTORY_PREFIX + "minicluster.enabled";
|
||||||
|
public static final String MR_HISTORY_MINICLUSTER_LAUNCH_TIMEOUT_MS =
|
||||||
|
JHAdminConfig.MR_HISTORY_PREFIX + "minicluster.launch.timeout.ms";
|
||||||
|
|
||||||
public MiniMRYarnCluster(String testName) {
|
public MiniMRYarnCluster(String testName) {
|
||||||
this(testName, 1);
|
this(testName, 1);
|
||||||
@ -77,11 +81,10 @@ public MiniMRYarnCluster(String testName) {
|
|||||||
public MiniMRYarnCluster(String testName, int noOfNMs) {
|
public MiniMRYarnCluster(String testName, int noOfNMs) {
|
||||||
this(testName, noOfNMs, false);
|
this(testName, noOfNMs, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public MiniMRYarnCluster(String testName, int noOfNMs, boolean enableAHS) {
|
public MiniMRYarnCluster(String testName, int noOfNMs, boolean enableAHS) {
|
||||||
super(testName, 1, noOfNMs, 4, 4, enableAHS);
|
super(testName, 1, noOfNMs, 4, 4, enableAHS);
|
||||||
historyServerWrapper = new JobHistoryServerWrapper();
|
|
||||||
addService(historyServerWrapper);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getResolvedMRHistoryWebAppURLWithoutScheme(
|
public static String getResolvedMRHistoryWebAppURLWithoutScheme(
|
||||||
@ -118,6 +121,11 @@ public static String getResolvedMRHistoryWebAppURLWithoutScheme(
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serviceInit(Configuration conf) throws Exception {
|
public void serviceInit(Configuration conf) throws Exception {
|
||||||
|
if (conf.getBoolean(MR_HISTORY_MINICLUSTER_ENABLED, true)) {
|
||||||
|
historyServerWrapper = new JobHistoryServerWrapper();
|
||||||
|
addService(historyServerWrapper);
|
||||||
|
}
|
||||||
|
|
||||||
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
|
||||||
String stagingDir = conf.get(MRJobConfig.MR_AM_STAGING_DIR);
|
String stagingDir = conf.get(MRJobConfig.MR_AM_STAGING_DIR);
|
||||||
if (stagingDir == null ||
|
if (stagingDir == null ||
|
||||||
@ -212,11 +220,13 @@ public void serviceInit(Configuration conf) throws Exception {
|
|||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
|
|
||||||
|
if (historyServer != null) {
|
||||||
//need to do this because historyServer.init creates a new Configuration
|
//need to do this because historyServer.init creates a new Configuration
|
||||||
getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
|
getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
|
||||||
historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
|
historyServer.getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
|
||||||
MRWebAppUtil.setJHSWebappURLWithoutScheme(getConfig(),
|
MRWebAppUtil.setJHSWebappURLWithoutScheme(getConfig(),
|
||||||
MRWebAppUtil.getJHSWebappURLWithoutScheme(historyServer.getConfig()));
|
MRWebAppUtil.getJHSWebappURLWithoutScheme(historyServer.getConfig()));
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("MiniMRYARN ResourceManager address: " +
|
LOG.info("MiniMRYARN ResourceManager address: " +
|
||||||
getConfig().get(YarnConfiguration.RM_ADDRESS));
|
getConfig().get(YarnConfiguration.RM_ADDRESS));
|
||||||
@ -233,7 +243,6 @@ private class JobHistoryServerWrapper extends AbstractService {
|
|||||||
public JobHistoryServerWrapper() {
|
public JobHistoryServerWrapper() {
|
||||||
super(JobHistoryServerWrapper.class.getName());
|
super(JobHistoryServerWrapper.class.getName());
|
||||||
}
|
}
|
||||||
private volatile boolean jhsStarted = false;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void serviceStart() throws Exception {
|
public synchronized void serviceStart() throws Exception {
|
||||||
@ -255,12 +264,15 @@ public synchronized void serviceStart() throws Exception {
|
|||||||
new Thread() {
|
new Thread() {
|
||||||
public void run() {
|
public void run() {
|
||||||
historyServer.start();
|
historyServer.start();
|
||||||
jhsStarted = true;
|
|
||||||
};
|
};
|
||||||
}.start();
|
}.start();
|
||||||
|
|
||||||
GenericTestUtils.waitFor(() -> jhsStarted, 1500, 60_000);
|
final int launchTimeout = getConfig().getInt(
|
||||||
|
MR_HISTORY_MINICLUSTER_LAUNCH_TIMEOUT_MS, 60_000);
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> historyServer.getServiceState() == STATE.STARTED
|
||||||
|
|| historyServer.getServiceState() == STATE.STOPPED,
|
||||||
|
100, launchTimeout);
|
||||||
if (historyServer.getServiceState() != STATE.STARTED) {
|
if (historyServer.getServiceState() != STATE.STARTED) {
|
||||||
throw new IOException("HistoryServer failed to start");
|
throw new IOException("HistoryServer failed to start");
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user