YARN-1305. RMHAProtocolService#serviceInit should handle HAUtil's IllegalArgumentException (Tsuyoshi Ozawa via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1534884 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-10-23 01:10:17 +00:00
parent 3baff29b8c
commit 116b459d22
4 changed files with 228 additions and 41 deletions

View File

@ -85,6 +85,9 @@ Release 2.3.0 - UNRELEASED
YARN-1183. MiniYARNCluster shutdown takes several minutes intermittently YARN-1183. MiniYARNCluster shutdown takes several minutes intermittently
(Andrey Klochkov via jeagles) (Andrey Klochkov via jeagles)
YARN-1305. RMHAProtocolService#serviceInit should handle HAUtil's
IllegalArgumentException (Tsuyoshi Ozawa via bikas)
Release 2.2.1 - UNRELEASED Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -23,6 +23,7 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.util.Arrays; import java.util.Arrays;
@ -42,10 +43,13 @@ public class HAUtil {
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.RM_WEBAPP_ADDRESS)); YarnConfiguration.RM_WEBAPP_ADDRESS));
public static final String BAD_CONFIG_MESSAGE_PREFIX =
"Invalid configuration! ";
private HAUtil() { /* Hidden constructor */ } private HAUtil() { /* Hidden constructor */ }
private static void throwBadConfigurationException(String msg) { private static void throwBadConfigurationException(String msg) {
throw new YarnRuntimeException("Invalid configuration! " + msg); throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg);
} }
/** /**
@ -59,29 +63,137 @@ public static boolean isHAEnabled(Configuration conf) {
YarnConfiguration.DEFAULT_RM_HA_ENABLED); YarnConfiguration.DEFAULT_RM_HA_ENABLED);
} }
public static Collection<String> getRMHAIds(Configuration conf) { /**
return conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS); * Verify configuration for Resource Manager HA.
* @param conf Configuration
* @throws YarnRuntimeException
*/
public static void verifyAndSetConfiguration(Configuration conf)
throws YarnRuntimeException {
verifyAndSetRMHAIds(conf);
verifyAndSetRMHAId(conf);
verifyAndSetAllRpcAddresses(conf);
}
private static void verifyAndSetRMHAIds(Configuration conf) {
Collection<String> ids =
conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS);
if (ids.size() <= 0) {
throwBadConfigurationException(
getInvalidValueMessage(YarnConfiguration.RM_HA_IDS,
conf.get(YarnConfiguration.RM_HA_IDS)));
} else if (ids.size() == 1) {
LOG.warn(getRMHAIdsWarningMessage(ids.toString()));
}
StringBuilder setValue = new StringBuilder();
for (String id: ids) {
setValue.append(id);
setValue.append(",");
}
conf.set(YarnConfiguration.RM_HA_IDS,
setValue.substring(0, setValue.length() - 1));
}
private static void verifyAndSetRMHAId(Configuration conf) {
String rmId = conf.getTrimmed(YarnConfiguration.RM_HA_ID);
if (rmId == null) {
throwBadConfigurationException(
getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID));
} else {
Collection<String> ids = getRMHAIds(conf);
if (!ids.contains(rmId)) {
throwBadConfigurationException(
getRMHAIdNeedToBeIncludedMessage(ids.toString(), rmId));
}
}
conf.set(YarnConfiguration.RM_HA_ID, rmId);
}
private static void verifyAndSetConfValue(String prefix, Configuration conf) {
String confKey = null;
String confValue = null;
try {
confKey = getConfKeyForRMInstance(prefix, conf);
confValue = getConfValueForRMInstance(prefix, conf);
conf.set(prefix, confValue);
} catch (YarnRuntimeException yre) {
// Error at getRMHAId()
throw yre;
} catch (IllegalArgumentException iae) {
String errmsg;
if (confKey == null) {
// Error at addSuffix
errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID,
getRMHAId(conf));
} else {
// Error at Configuration#set.
errmsg = getNeedToSetValueMessage(confKey);
}
throwBadConfigurationException(errmsg);
}
}
public static void verifyAndSetAllRpcAddresses(Configuration conf) {
for (String confKey : RPC_ADDRESS_CONF_KEYS) {
verifyAndSetConfValue(confKey, conf);
}
} }
/** /**
* @param conf Configuration * @param conf Configuration. Please use getRMHAIds to check.
* @return RM Ids on success
*/
public static Collection<String> getRMHAIds(Configuration conf) {
return conf.getStringCollection(YarnConfiguration.RM_HA_IDS);
}
/**
* @param conf Configuration. Please use verifyAndSetRMHAId to check.
* @return RM Id on success * @return RM Id on success
* @throws YarnRuntimeException for configurations without a node id
*/ */
@VisibleForTesting @VisibleForTesting
public static String getRMHAId(Configuration conf) { static String getRMHAId(Configuration conf) {
String rmId = conf.get(YarnConfiguration.RM_HA_ID); return conf.get(YarnConfiguration.RM_HA_ID);
if (rmId == null) { }
throwBadConfigurationException(YarnConfiguration.RM_HA_ID +
" needs to be set in a HA configuration"); @VisibleForTesting
} static String getNeedToSetValueMessage(String confKey) {
return rmId; return confKey + " needs to be set in a HA configuration.";
}
@VisibleForTesting
static String getInvalidValueMessage(String confKey,
String invalidValue){
return "Invalid value of " + confKey +". "
+ "Current value is " + invalidValue;
}
@VisibleForTesting
static String getRMHAIdNeedToBeIncludedMessage(String ids,
String rmId) {
return YarnConfiguration.RM_HA_IDS + "("
+ ids + ") need to contain " + YarnConfiguration.RM_HA_ID + "("
+ rmId + ") in a HA configuration.";
}
@VisibleForTesting
static String getRMHAIdsWarningMessage(String ids) {
return "Resource Manager HA is enabled, but " +
YarnConfiguration.RM_HA_IDS + " has only one id(" +
ids.toString() + ")";
}
private static String getConfKeyForRMInstance(String prefix,
Configuration conf) {
return addSuffix(prefix, getRMHAId(conf));
} }
private static String getConfValueForRMInstance(String prefix, private static String getConfValueForRMInstance(String prefix,
Configuration conf) { Configuration conf) {
String confKey = addSuffix(prefix, getRMHAId(conf)); String confKey = getConfKeyForRMInstance(prefix, conf);
String retVal = conf.get(confKey); String retVal = conf.getTrimmed(confKey);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("getConfValueForRMInstance: prefix = " + prefix + LOG.trace("getConfValueForRMInstance: prefix = " + prefix +
"; confKey being looked up = " + confKey + "; confKey being looked up = " + confKey +
@ -96,16 +208,6 @@ static String getConfValueForRMInstance(String prefix, String defaultValue,
return (value == null) ? defaultValue : value; return (value == null) ? defaultValue : value;
} }
private static void setConfValue(String prefix, Configuration conf) {
conf.set(prefix, getConfValueForRMInstance(prefix, conf));
}
public static void setAllRpcAddresses(Configuration conf) {
for (String confKey : RPC_ADDRESS_CONF_KEYS) {
setConfValue(confKey, conf);
}
}
/** Add non empty and non null suffix to a key */ /** Add non empty and non null suffix to a key */
@VisibleForTesting @VisibleForTesting
public static String addSuffix(String key, String suffix) { public static String addSuffix(String key, String suffix) {

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -27,53 +28,134 @@
import java.util.Collection; import java.util.Collection;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
public class TestHAUtil { public class TestHAUtil {
private Configuration conf; private Configuration conf;
private static final String RM1_ADDRESS = "1.2.3.4:8021"; private static final String RM1_ADDRESS_UNTRIMMED = " \t\t\n 1.2.3.4:8021 \n\t ";
private static final String RM1_ADDRESS = RM1_ADDRESS_UNTRIMMED.trim();
private static final String RM2_ADDRESS = "localhost:8022"; private static final String RM2_ADDRESS = "localhost:8022";
private static final String RM1_NODE_ID = "rm1"; private static final String RM1_NODE_ID_UNTRIMMED = "rm1 ";
private static final String RM1_NODE_ID = RM1_NODE_ID_UNTRIMMED.trim();
private static final String RM2_NODE_ID = "rm2"; private static final String RM2_NODE_ID = "rm2";
private static final String RM3_NODE_ID = "rm3";
private static final String RM_INVALID_NODE_ID = ".rm";
private static final String RM_NODE_IDS_UNTRIMMED = RM1_NODE_ID_UNTRIMMED + "," + RM2_NODE_ID;
private static final String RM_NODE_IDS = RM1_NODE_ID + "," + RM2_NODE_ID;
@Before @Before
public void setUp() { public void setUp() {
conf = new Configuration(); conf = new Configuration();
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); conf.set(YarnConfiguration.RM_HA_IDS, RM_NODE_IDS_UNTRIMMED);
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID); conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) { for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS); // configuration key itself cannot contains space/tab/return chars.
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS); conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
} }
} }
@Test @Test
public void testGetRMServiceId() throws Exception { public void testGetRMServiceId() throws Exception {
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
Collection<String> rmhaIds = HAUtil.getRMHAIds(conf); Collection<String> rmhaIds = HAUtil.getRMHAIds(conf);
assertEquals(2, rmhaIds.size()); assertEquals(2, rmhaIds.size());
String[] ids = rmhaIds.toArray(new String[0]);
assertEquals(RM1_NODE_ID, ids[0]);
assertEquals(RM2_NODE_ID, ids[1]);
} }
@Test @Test
public void testGetRMId() throws Exception { public void testGetRMId() throws Exception {
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
assertEquals("Does not honor " + YarnConfiguration.RM_HA_ID, assertEquals("Does not honor " + YarnConfiguration.RM_HA_ID,
RM1_NODE_ID, HAUtil.getRMHAId(conf)); RM1_NODE_ID, HAUtil.getRMHAId(conf));
conf = new YarnConfiguration();
try { conf.clear();
HAUtil.getRMHAId(conf); assertNull("Return null when " + YarnConfiguration.RM_HA_ID
fail("getRMHAId() fails to throw an exception when RM_HA_ID is not set"); + " is not set", HAUtil.getRMHAId(conf));
} catch (YarnRuntimeException yre) {
// do nothing
}
} }
@Test @Test
public void testSetGetRpcAddresses() throws Exception { public void testVerifyAndSetConfiguration() throws Exception {
HAUtil.setAllRpcAddresses(conf); try {
HAUtil.verifyAndSetConfiguration(conf);
} catch (YarnRuntimeException e) {
fail("Should not throw any exceptions.");
}
assertEquals("Should be saved as Trimmed collection",
StringUtils.getStringCollection(RM_NODE_IDS), HAUtil.getRMHAIds(conf));
assertEquals("Should be saved as Trimmed string",
RM1_NODE_ID, HAUtil.getRMHAId(conf));
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) { for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
assertEquals("RPC address not set for " + confKey, assertEquals("RPC address not set for " + confKey,
RM1_ADDRESS, conf.get(confKey)); RM1_ADDRESS, conf.get(confKey));
}
conf.clear();
conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
try {
HAUtil.verifyAndSetConfiguration(conf);
} catch (YarnRuntimeException e) {
assertEquals("YarnRuntimeException by getRMId()",
HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
HAUtil.getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID),
e.getMessage());
}
conf.clear();
conf.set(YarnConfiguration.RM_HA_ID, RM_INVALID_NODE_ID);
conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
// simulate xml with invalid node id
conf.set(confKey + RM_INVALID_NODE_ID, RM_INVALID_NODE_ID);
}
try {
HAUtil.verifyAndSetConfiguration(conf);
} catch (YarnRuntimeException e) {
assertEquals("YarnRuntimeException by addSuffix()",
HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
HAUtil.getInvalidValueMessage(YarnConfiguration.RM_HA_ID,
RM_INVALID_NODE_ID),
e.getMessage());
}
conf.clear();
// simulate the case HAUtil.RPC_ADDRESS_CONF_KEYS are not set
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
try {
HAUtil.verifyAndSetConfiguration(conf);
fail("Should throw YarnRuntimeException. by Configuration#set()");
} catch (YarnRuntimeException e) {
String confKey =
HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID);
assertEquals("YarnRuntimeException by Configuration#set()",
HAUtil.BAD_CONFIG_MESSAGE_PREFIX + HAUtil.getNeedToSetValueMessage(confKey),
e.getMessage());
}
// simulate the case YarnConfiguration.RM_HA_IDS doesn't contain
// the value of YarnConfiguration.RM_HA_ID
conf.clear();
conf.set(YarnConfiguration.RM_HA_IDS, RM2_NODE_ID + "," + RM3_NODE_ID);
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
}
try {
HAUtil.verifyAndSetConfiguration(conf);
} catch (YarnRuntimeException e) {
assertEquals("YarnRuntimeException by getRMId()'s validation",
HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
HAUtil.getRMHAIdNeedToBeIncludedMessage("[rm2, rm3]", RM1_NODE_ID),
e.getMessage());
} }
} }
} }

View File

@ -57,7 +57,7 @@ protected synchronized void serviceInit(Configuration conf) throws
this.conf = conf; this.conf = conf;
haEnabled = HAUtil.isHAEnabled(this.conf); haEnabled = HAUtil.isHAEnabled(this.conf);
if (haEnabled) { if (haEnabled) {
HAUtil.setAllRpcAddresses(this.conf); HAUtil.verifyAndSetConfiguration(conf);
rm.setConf(this.conf); rm.setConf(this.conf);
} }
rm.createAndInitActiveServices(); rm.createAndInitActiveServices();