Merging r1534707 through r1534893 from trunk to branch HDFS-2832
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1534894 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
f794ef63b1
@ -225,6 +225,9 @@ Release 2.2.1 - UNRELEASED
|
|||||||
MAPREDUCE-5518. Fixed typo "can't read paritions file". (Albert Chu
|
MAPREDUCE-5518. Fixed typo "can't read paritions file". (Albert Chu
|
||||||
via devaraj)
|
via devaraj)
|
||||||
|
|
||||||
|
MAPREDUCE-5561. org.apache.hadoop.mapreduce.v2.app.job.impl.TestJobImpl
|
||||||
|
testcase failing on trunk (Karthik Kambatla via jlowe)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -415,7 +415,6 @@ public void testFailAbortDoesntHang() throws IOException {
|
|||||||
TaskEventType.T_ATTEMPT_FAILED));
|
TaskEventType.T_ATTEMPT_FAILED));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertJobState(job, JobStateInternal.FAIL_ABORT);
|
|
||||||
|
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
//Verify abortJob is called once and the job failed
|
//Verify abortJob is called once and the job failed
|
||||||
|
@ -82,6 +82,12 @@ Release 2.3.0 - UNRELEASED
|
|||||||
YARN-1300. SLS tests fail because conf puts YARN properties in
|
YARN-1300. SLS tests fail because conf puts YARN properties in
|
||||||
fair-scheduler.xml (Ted Yu via Sandy Ryza)
|
fair-scheduler.xml (Ted Yu via Sandy Ryza)
|
||||||
|
|
||||||
|
YARN-1183. MiniYARNCluster shutdown takes several minutes intermittently
|
||||||
|
(Andrey Klochkov via jeagles)
|
||||||
|
|
||||||
|
YARN-1305. RMHAProtocolService#serviceInit should handle HAUtil's
|
||||||
|
IllegalArgumentException (Tsuyoshi Ozawa via bikas)
|
||||||
|
|
||||||
Release 2.2.1 - UNRELEASED
|
Release 2.2.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
@ -130,6 +136,9 @@ Release 2.2.1 - UNRELEASED
|
|||||||
YARN-1331. yarn.cmd exits with NoClassDefFoundError trying to run rmadmin or
|
YARN-1331. yarn.cmd exits with NoClassDefFoundError trying to run rmadmin or
|
||||||
logs. (cnauroth)
|
logs. (cnauroth)
|
||||||
|
|
||||||
|
YARN-1330. Fair Scheduler: defaultQueueSchedulingPolicy does not take effect
|
||||||
|
(Sandy Ryza)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -42,10 +43,13 @@ public class HAUtil {
|
|||||||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
||||||
YarnConfiguration.RM_WEBAPP_ADDRESS));
|
YarnConfiguration.RM_WEBAPP_ADDRESS));
|
||||||
|
|
||||||
|
public static final String BAD_CONFIG_MESSAGE_PREFIX =
|
||||||
|
"Invalid configuration! ";
|
||||||
|
|
||||||
private HAUtil() { /* Hidden constructor */ }
|
private HAUtil() { /* Hidden constructor */ }
|
||||||
|
|
||||||
private static void throwBadConfigurationException(String msg) {
|
private static void throwBadConfigurationException(String msg) {
|
||||||
throw new YarnRuntimeException("Invalid configuration! " + msg);
|
throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -59,29 +63,137 @@ public static boolean isHAEnabled(Configuration conf) {
|
|||||||
YarnConfiguration.DEFAULT_RM_HA_ENABLED);
|
YarnConfiguration.DEFAULT_RM_HA_ENABLED);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Collection<String> getRMHAIds(Configuration conf) {
|
/**
|
||||||
return conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS);
|
* Verify configuration for Resource Manager HA.
|
||||||
|
* @param conf Configuration
|
||||||
|
* @throws YarnRuntimeException
|
||||||
|
*/
|
||||||
|
public static void verifyAndSetConfiguration(Configuration conf)
|
||||||
|
throws YarnRuntimeException {
|
||||||
|
verifyAndSetRMHAIds(conf);
|
||||||
|
verifyAndSetRMHAId(conf);
|
||||||
|
verifyAndSetAllRpcAddresses(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static void verifyAndSetRMHAIds(Configuration conf) {
|
||||||
|
Collection<String> ids =
|
||||||
|
conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS);
|
||||||
|
if (ids.size() <= 0) {
|
||||||
|
throwBadConfigurationException(
|
||||||
|
getInvalidValueMessage(YarnConfiguration.RM_HA_IDS,
|
||||||
|
conf.get(YarnConfiguration.RM_HA_IDS)));
|
||||||
|
} else if (ids.size() == 1) {
|
||||||
|
LOG.warn(getRMHAIdsWarningMessage(ids.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
StringBuilder setValue = new StringBuilder();
|
||||||
|
for (String id: ids) {
|
||||||
|
setValue.append(id);
|
||||||
|
setValue.append(",");
|
||||||
|
}
|
||||||
|
conf.set(YarnConfiguration.RM_HA_IDS,
|
||||||
|
setValue.substring(0, setValue.length() - 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void verifyAndSetRMHAId(Configuration conf) {
|
||||||
|
String rmId = conf.getTrimmed(YarnConfiguration.RM_HA_ID);
|
||||||
|
if (rmId == null) {
|
||||||
|
throwBadConfigurationException(
|
||||||
|
getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID));
|
||||||
|
} else {
|
||||||
|
Collection<String> ids = getRMHAIds(conf);
|
||||||
|
if (!ids.contains(rmId)) {
|
||||||
|
throwBadConfigurationException(
|
||||||
|
getRMHAIdNeedToBeIncludedMessage(ids.toString(), rmId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void verifyAndSetConfValue(String prefix, Configuration conf) {
|
||||||
|
String confKey = null;
|
||||||
|
String confValue = null;
|
||||||
|
try {
|
||||||
|
confKey = getConfKeyForRMInstance(prefix, conf);
|
||||||
|
confValue = getConfValueForRMInstance(prefix, conf);
|
||||||
|
conf.set(prefix, confValue);
|
||||||
|
} catch (YarnRuntimeException yre) {
|
||||||
|
// Error at getRMHAId()
|
||||||
|
throw yre;
|
||||||
|
} catch (IllegalArgumentException iae) {
|
||||||
|
String errmsg;
|
||||||
|
if (confKey == null) {
|
||||||
|
// Error at addSuffix
|
||||||
|
errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID,
|
||||||
|
getRMHAId(conf));
|
||||||
|
} else {
|
||||||
|
// Error at Configuration#set.
|
||||||
|
errmsg = getNeedToSetValueMessage(confKey);
|
||||||
|
}
|
||||||
|
throwBadConfigurationException(errmsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void verifyAndSetAllRpcAddresses(Configuration conf) {
|
||||||
|
for (String confKey : RPC_ADDRESS_CONF_KEYS) {
|
||||||
|
verifyAndSetConfValue(confKey, conf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param conf Configuration
|
* @param conf Configuration. Please use getRMHAIds to check.
|
||||||
|
* @return RM Ids on success
|
||||||
|
*/
|
||||||
|
public static Collection<String> getRMHAIds(Configuration conf) {
|
||||||
|
return conf.getStringCollection(YarnConfiguration.RM_HA_IDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param conf Configuration. Please use verifyAndSetRMHAId to check.
|
||||||
* @return RM Id on success
|
* @return RM Id on success
|
||||||
* @throws YarnRuntimeException for configurations without a node id
|
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static String getRMHAId(Configuration conf) {
|
static String getRMHAId(Configuration conf) {
|
||||||
String rmId = conf.get(YarnConfiguration.RM_HA_ID);
|
return conf.get(YarnConfiguration.RM_HA_ID);
|
||||||
if (rmId == null) {
|
}
|
||||||
throwBadConfigurationException(YarnConfiguration.RM_HA_ID +
|
|
||||||
" needs to be set in a HA configuration");
|
@VisibleForTesting
|
||||||
}
|
static String getNeedToSetValueMessage(String confKey) {
|
||||||
return rmId;
|
return confKey + " needs to be set in a HA configuration.";
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static String getInvalidValueMessage(String confKey,
|
||||||
|
String invalidValue){
|
||||||
|
return "Invalid value of " + confKey +". "
|
||||||
|
+ "Current value is " + invalidValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static String getRMHAIdNeedToBeIncludedMessage(String ids,
|
||||||
|
String rmId) {
|
||||||
|
return YarnConfiguration.RM_HA_IDS + "("
|
||||||
|
+ ids + ") need to contain " + YarnConfiguration.RM_HA_ID + "("
|
||||||
|
+ rmId + ") in a HA configuration.";
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static String getRMHAIdsWarningMessage(String ids) {
|
||||||
|
return "Resource Manager HA is enabled, but " +
|
||||||
|
YarnConfiguration.RM_HA_IDS + " has only one id(" +
|
||||||
|
ids.toString() + ")";
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getConfKeyForRMInstance(String prefix,
|
||||||
|
Configuration conf) {
|
||||||
|
return addSuffix(prefix, getRMHAId(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getConfValueForRMInstance(String prefix,
|
private static String getConfValueForRMInstance(String prefix,
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
String confKey = addSuffix(prefix, getRMHAId(conf));
|
String confKey = getConfKeyForRMInstance(prefix, conf);
|
||||||
String retVal = conf.get(confKey);
|
String retVal = conf.getTrimmed(confKey);
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("getConfValueForRMInstance: prefix = " + prefix +
|
LOG.trace("getConfValueForRMInstance: prefix = " + prefix +
|
||||||
"; confKey being looked up = " + confKey +
|
"; confKey being looked up = " + confKey +
|
||||||
@ -96,16 +208,6 @@ static String getConfValueForRMInstance(String prefix, String defaultValue,
|
|||||||
return (value == null) ? defaultValue : value;
|
return (value == null) ? defaultValue : value;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setConfValue(String prefix, Configuration conf) {
|
|
||||||
conf.set(prefix, getConfValueForRMInstance(prefix, conf));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void setAllRpcAddresses(Configuration conf) {
|
|
||||||
for (String confKey : RPC_ADDRESS_CONF_KEYS) {
|
|
||||||
setConfValue(confKey, conf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Add non empty and non null suffix to a key */
|
/** Add non empty and non null suffix to a key */
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static String addSuffix(String key, String suffix) {
|
public static String addSuffix(String key, String suffix) {
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -27,53 +28,134 @@
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public class TestHAUtil {
|
public class TestHAUtil {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
private static final String RM1_ADDRESS = "1.2.3.4:8021";
|
private static final String RM1_ADDRESS_UNTRIMMED = " \t\t\n 1.2.3.4:8021 \n\t ";
|
||||||
|
private static final String RM1_ADDRESS = RM1_ADDRESS_UNTRIMMED.trim();
|
||||||
private static final String RM2_ADDRESS = "localhost:8022";
|
private static final String RM2_ADDRESS = "localhost:8022";
|
||||||
private static final String RM1_NODE_ID = "rm1";
|
private static final String RM1_NODE_ID_UNTRIMMED = "rm1 ";
|
||||||
|
private static final String RM1_NODE_ID = RM1_NODE_ID_UNTRIMMED.trim();
|
||||||
private static final String RM2_NODE_ID = "rm2";
|
private static final String RM2_NODE_ID = "rm2";
|
||||||
|
private static final String RM3_NODE_ID = "rm3";
|
||||||
|
private static final String RM_INVALID_NODE_ID = ".rm";
|
||||||
|
private static final String RM_NODE_IDS_UNTRIMMED = RM1_NODE_ID_UNTRIMMED + "," + RM2_NODE_ID;
|
||||||
|
private static final String RM_NODE_IDS = RM1_NODE_ID + "," + RM2_NODE_ID;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_IDS, RM_NODE_IDS_UNTRIMMED);
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
|
||||||
|
|
||||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
||||||
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
|
// configuration key itself cannot contains space/tab/return chars.
|
||||||
|
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
|
||||||
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
|
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetRMServiceId() throws Exception {
|
public void testGetRMServiceId() throws Exception {
|
||||||
|
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||||
Collection<String> rmhaIds = HAUtil.getRMHAIds(conf);
|
Collection<String> rmhaIds = HAUtil.getRMHAIds(conf);
|
||||||
assertEquals(2, rmhaIds.size());
|
assertEquals(2, rmhaIds.size());
|
||||||
|
|
||||||
|
String[] ids = rmhaIds.toArray(new String[0]);
|
||||||
|
assertEquals(RM1_NODE_ID, ids[0]);
|
||||||
|
assertEquals(RM2_NODE_ID, ids[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetRMId() throws Exception {
|
public void testGetRMId() throws Exception {
|
||||||
|
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
||||||
assertEquals("Does not honor " + YarnConfiguration.RM_HA_ID,
|
assertEquals("Does not honor " + YarnConfiguration.RM_HA_ID,
|
||||||
RM1_NODE_ID, HAUtil.getRMHAId(conf));
|
RM1_NODE_ID, HAUtil.getRMHAId(conf));
|
||||||
conf = new YarnConfiguration();
|
|
||||||
try {
|
conf.clear();
|
||||||
HAUtil.getRMHAId(conf);
|
assertNull("Return null when " + YarnConfiguration.RM_HA_ID
|
||||||
fail("getRMHAId() fails to throw an exception when RM_HA_ID is not set");
|
+ " is not set", HAUtil.getRMHAId(conf));
|
||||||
} catch (YarnRuntimeException yre) {
|
|
||||||
// do nothing
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSetGetRpcAddresses() throws Exception {
|
public void testVerifyAndSetConfiguration() throws Exception {
|
||||||
HAUtil.setAllRpcAddresses(conf);
|
try {
|
||||||
|
HAUtil.verifyAndSetConfiguration(conf);
|
||||||
|
} catch (YarnRuntimeException e) {
|
||||||
|
fail("Should not throw any exceptions.");
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals("Should be saved as Trimmed collection",
|
||||||
|
StringUtils.getStringCollection(RM_NODE_IDS), HAUtil.getRMHAIds(conf));
|
||||||
|
assertEquals("Should be saved as Trimmed string",
|
||||||
|
RM1_NODE_ID, HAUtil.getRMHAId(conf));
|
||||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
||||||
assertEquals("RPC address not set for " + confKey,
|
assertEquals("RPC address not set for " + confKey,
|
||||||
RM1_ADDRESS, conf.get(confKey));
|
RM1_ADDRESS, conf.get(confKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
conf.clear();
|
||||||
|
conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
|
||||||
|
try {
|
||||||
|
HAUtil.verifyAndSetConfiguration(conf);
|
||||||
|
} catch (YarnRuntimeException e) {
|
||||||
|
assertEquals("YarnRuntimeException by getRMId()",
|
||||||
|
HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
|
||||||
|
HAUtil.getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID),
|
||||||
|
e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
conf.clear();
|
||||||
|
conf.set(YarnConfiguration.RM_HA_ID, RM_INVALID_NODE_ID);
|
||||||
|
conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
|
||||||
|
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
||||||
|
// simulate xml with invalid node id
|
||||||
|
conf.set(confKey + RM_INVALID_NODE_ID, RM_INVALID_NODE_ID);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
HAUtil.verifyAndSetConfiguration(conf);
|
||||||
|
} catch (YarnRuntimeException e) {
|
||||||
|
assertEquals("YarnRuntimeException by addSuffix()",
|
||||||
|
HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
|
||||||
|
HAUtil.getInvalidValueMessage(YarnConfiguration.RM_HA_ID,
|
||||||
|
RM_INVALID_NODE_ID),
|
||||||
|
e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
conf.clear();
|
||||||
|
// simulate the case HAUtil.RPC_ADDRESS_CONF_KEYS are not set
|
||||||
|
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
||||||
|
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
|
||||||
|
try {
|
||||||
|
HAUtil.verifyAndSetConfiguration(conf);
|
||||||
|
fail("Should throw YarnRuntimeException. by Configuration#set()");
|
||||||
|
} catch (YarnRuntimeException e) {
|
||||||
|
String confKey =
|
||||||
|
HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID);
|
||||||
|
assertEquals("YarnRuntimeException by Configuration#set()",
|
||||||
|
HAUtil.BAD_CONFIG_MESSAGE_PREFIX + HAUtil.getNeedToSetValueMessage(confKey),
|
||||||
|
e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// simulate the case YarnConfiguration.RM_HA_IDS doesn't contain
|
||||||
|
// the value of YarnConfiguration.RM_HA_ID
|
||||||
|
conf.clear();
|
||||||
|
conf.set(YarnConfiguration.RM_HA_IDS, RM2_NODE_ID + "," + RM3_NODE_ID);
|
||||||
|
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
|
||||||
|
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
||||||
|
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
|
||||||
|
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
HAUtil.verifyAndSetConfiguration(conf);
|
||||||
|
} catch (YarnRuntimeException e) {
|
||||||
|
assertEquals("YarnRuntimeException by getRMId()'s validation",
|
||||||
|
HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
|
||||||
|
HAUtil.getRMHAIdNeedToBeIncludedMessage("[rm2, rm3]", RM1_NODE_ID),
|
||||||
|
e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@ protected synchronized void serviceInit(Configuration conf) throws
|
|||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
haEnabled = HAUtil.isHAEnabled(this.conf);
|
haEnabled = HAUtil.isHAEnabled(this.conf);
|
||||||
if (haEnabled) {
|
if (haEnabled) {
|
||||||
HAUtil.setAllRpcAddresses(this.conf);
|
HAUtil.verifyAndSetConfiguration(conf);
|
||||||
rm.setConf(this.conf);
|
rm.setConf(this.conf);
|
||||||
}
|
}
|
||||||
rm.createAndInitActiveServices();
|
rm.createAndInitActiveServices();
|
||||||
|
@ -378,22 +378,24 @@ public void reloadAllocs() throws IOException, ParserConfigurationException,
|
|||||||
queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
|
queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
|
||||||
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
|
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
|
||||||
|
|
||||||
// Update metrics
|
// Make sure all queues exist
|
||||||
for (FSQueue queue : queues.values()) {
|
|
||||||
FSQueueMetrics queueMetrics = queue.getMetrics();
|
|
||||||
queueMetrics.setMinShare(queue.getMinShare());
|
|
||||||
queueMetrics.setMaxShare(queue.getMaxShare());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create all queus
|
|
||||||
for (String name: queueNamesInAllocFile) {
|
for (String name: queueNamesInAllocFile) {
|
||||||
getLeafQueue(name, true);
|
getLeafQueue(name, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set custom policies as specified
|
for (FSQueue queue : queues.values()) {
|
||||||
for (Map.Entry<String, SchedulingPolicy> entry : queuePolicies.entrySet()) {
|
// Update queue metrics
|
||||||
queues.get(entry.getKey()).setPolicy(entry.getValue());
|
FSQueueMetrics queueMetrics = queue.getMetrics();
|
||||||
|
queueMetrics.setMinShare(queue.getMinShare());
|
||||||
|
queueMetrics.setMaxShare(queue.getMaxShare());
|
||||||
|
// Set scheduling policies
|
||||||
|
if (queuePolicies.containsKey(queue.getName())) {
|
||||||
|
queue.setPolicy(queuePolicies.get(queue.getName()));
|
||||||
|
} else {
|
||||||
|
queue.setPolicy(SchedulingPolicy.getDefault());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,6 +88,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
|
||||||
@ -807,6 +808,7 @@ public void testAllocationFileParsing() throws Exception {
|
|||||||
out.println("<queue name=\"queueB\">");
|
out.println("<queue name=\"queueB\">");
|
||||||
out.println("<minResources>2048mb,0vcores</minResources>");
|
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||||
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
|
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
|
||||||
|
out.println("<schedulingPolicy>fair</schedulingPolicy>");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
// Give queue C no minimum
|
// Give queue C no minimum
|
||||||
out.println("<queue name=\"queueC\">");
|
out.println("<queue name=\"queueC\">");
|
||||||
@ -833,6 +835,8 @@ public void testAllocationFileParsing() throws Exception {
|
|||||||
+ "</defaultMinSharePreemptionTimeout>");
|
+ "</defaultMinSharePreemptionTimeout>");
|
||||||
// Set fair share preemption timeout to 5 minutes
|
// Set fair share preemption timeout to 5 minutes
|
||||||
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
|
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
|
||||||
|
// Set default scheduling policy to DRF
|
||||||
|
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
@ -894,6 +898,18 @@ public void testAllocationFileParsing() throws Exception {
|
|||||||
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
|
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
|
||||||
assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE"));
|
assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE"));
|
||||||
assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
|
assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
|
||||||
|
|
||||||
|
// Verify existing queues have default scheduling policy
|
||||||
|
assertEquals(DominantResourceFairnessPolicy.NAME,
|
||||||
|
queueManager.getQueue("root").getPolicy().getName());
|
||||||
|
assertEquals(DominantResourceFairnessPolicy.NAME,
|
||||||
|
queueManager.getQueue("root.queueA").getPolicy().getName());
|
||||||
|
// Verify default is overriden if specified explicitly
|
||||||
|
assertEquals(FairSharePolicy.NAME,
|
||||||
|
queueManager.getQueue("root.queueB").getPolicy().getName());
|
||||||
|
// Verify new queue gets default scheduling policy
|
||||||
|
assertEquals(DominantResourceFairnessPolicy.NAME,
|
||||||
|
queueManager.getLeafQueue("root.newqueue", true).getPolicy().getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -34,8 +36,10 @@
|
|||||||
import org.apache.hadoop.service.CompositeService;
|
import org.apache.hadoop.service.CompositeService;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
@ -52,6 +56,10 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -83,6 +91,9 @@ public class MiniYARNCluster extends CompositeService {
|
|||||||
|
|
||||||
private ResourceManagerWrapper resourceManagerWrapper;
|
private ResourceManagerWrapper resourceManagerWrapper;
|
||||||
|
|
||||||
|
private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
|
||||||
|
new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
|
||||||
|
|
||||||
private File testWorkDir;
|
private File testWorkDir;
|
||||||
|
|
||||||
// Number of nm-local-dirs per nodemanager
|
// Number of nm-local-dirs per nodemanager
|
||||||
@ -210,6 +221,16 @@ protected void doSecureLogin() throws IOException {
|
|||||||
};
|
};
|
||||||
};
|
};
|
||||||
resourceManager.init(conf);
|
resourceManager.init(conf);
|
||||||
|
resourceManager.getRMContext().getDispatcher().register(RMAppAttemptEventType.class,
|
||||||
|
new EventHandler<RMAppAttemptEvent>() {
|
||||||
|
public void handle(RMAppAttemptEvent event) {
|
||||||
|
if (event instanceof RMAppAttemptRegistrationEvent) {
|
||||||
|
appMasters.put(event.getApplicationAttemptId(), event.getTimestamp());
|
||||||
|
} else if (event instanceof RMAppAttemptUnregistrationEvent) {
|
||||||
|
appMasters.remove(event.getApplicationAttemptId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,9 +264,22 @@ public void run() {
|
|||||||
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
|
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void waitForAppMastersToFinish(long timeoutMillis) throws InterruptedException {
|
||||||
|
long started = System.currentTimeMillis();
|
||||||
|
synchronized (appMasters) {
|
||||||
|
while (!appMasters.isEmpty() && System.currentTimeMillis() - started < timeoutMillis) {
|
||||||
|
appMasters.wait(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!appMasters.isEmpty()) {
|
||||||
|
LOG.warn("Stopping RM while some app masters are still alive");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized void serviceStop() throws Exception {
|
protected synchronized void serviceStop() throws Exception {
|
||||||
if (resourceManager != null) {
|
if (resourceManager != null) {
|
||||||
|
waitForAppMastersToFinish(5000);
|
||||||
resourceManager.stop();
|
resourceManager.stop();
|
||||||
}
|
}
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
|
Loading…
Reference in New Issue
Block a user