YARN-1639. Modified RM HA configuration handling to have a way of not requiring separate configuration files for each RM. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1564032 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-02-03 19:53:46 +00:00
parent cb5e0787a6
commit 3e7d56678c
4 changed files with 91 additions and 11 deletions

View File

@ -103,6 +103,9 @@ Release 2.4.0 - UNRELEASED
YARN-1617. Remove ancient comment and surround LOG.debug in YARN-1617. Remove ancient comment and surround LOG.debug in
AppSchedulingInfo.allocate (Sandy Ryza) AppSchedulingInfo.allocate (Sandy Ryza)
YARN-1639. Modified RM HA configuration handling to have a way of not
requiring separate configuration files for each RM. (Xuan Gong via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -21,10 +21,13 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.net.InetSocketAddress;
import java.util.Collection; import java.util.Collection;
@InterfaceAudience.Private @InterfaceAudience.Private
@ -108,8 +111,7 @@ private static void verifyAndSetRMHAIdsList(Configuration conf) {
String errmsg = iae.getMessage(); String errmsg = iae.getMessage();
if (confKey == null) { if (confKey == null) {
// Error at addSuffix // Error at addSuffix
errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID, errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID, id);
getRMHAId(conf));
} }
throwBadConfigurationException(errmsg); throwBadConfigurationException(errmsg);
} }
@ -122,10 +124,18 @@ private static void verifyAndSetRMHAIdsList(Configuration conf) {
} }
private static void verifyAndSetCurrentRMHAId(Configuration conf) { private static void verifyAndSetCurrentRMHAId(Configuration conf) {
String rmId = conf.getTrimmed(YarnConfiguration.RM_HA_ID); String rmId = getRMHAId(conf);
if (rmId == null) { if (rmId == null) {
throwBadConfigurationException( StringBuilder msg = new StringBuilder();
getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID)); msg.append("Can not find valid RM_HA_ID. None of ");
for (String id : conf
.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS)) {
msg.append(addSuffix(YarnConfiguration.RM_ADDRESS, id) + " ");
}
msg.append(" are matching" +
" the local address OR " + YarnConfiguration.RM_HA_ID + " is not" +
" specified in HA Configuration");
throwBadConfigurationException(msg.toString());
} else { } else {
Collection<String> ids = getRMHAIds(conf); Collection<String> ids = getRMHAIds(conf);
if (!ids.contains(rmId)) { if (!ids.contains(rmId)) {
@ -179,7 +189,34 @@ public static Collection<String> getRMHAIds(Configuration conf) {
* @return RM Id on success * @return RM Id on success
*/ */
public static String getRMHAId(Configuration conf) { public static String getRMHAId(Configuration conf) {
return conf.get(YarnConfiguration.RM_HA_ID); int found = 0;
String currentRMId = conf.getTrimmed(YarnConfiguration.RM_HA_ID);
if(currentRMId == null) {
for(String rmId : getRMHAIds(conf)) {
String key = addSuffix(YarnConfiguration.RM_ADDRESS, rmId);
String addr = conf.get(key);
if (addr == null) {
continue;
}
InetSocketAddress s;
try {
s = NetUtils.createSocketAddr(addr);
} catch (Exception e) {
LOG.warn("Exception in creating socket address " + addr, e);
continue;
}
if (!s.isUnresolved() && NetUtils.isLocalAddress(s.getAddress())) {
currentRMId = rmId.trim();
found++;
}
}
}
if (found > 1) { // Only one address must match the local address
String msg = "The HA Configuration has multiple addresses that match "
+ "local node's address.";
throw new HadoopIllegalArgumentException(msg);
}
return currentRMId;
} }
@VisibleForTesting @VisibleForTesting

View File

@ -395,7 +395,9 @@
the Active mode when prompted to. the Active mode when prompted to.
(2) The nodes in the RM ensemble are listed in (2) The nodes in the RM ensemble are listed in
yarn.resourcemanager.ha.rm-ids yarn.resourcemanager.ha.rm-ids
(3) The id of each RM comes from yarn.resourcemanager.ha.id (3) The id of each RM either comes from yarn.resourcemanager.ha.id
if yarn.resourcemanager.ha.id is explicitly specified or can be
figured out by matching yarn.resourcemanager.address.{id} with local address
(4) The actual physical addresses come from the configs of the pattern (4) The actual physical addresses come from the configs of the pattern
- {rpc-config}.{id}</description> - {rpc-config}.{id}</description>
<name>yarn.resourcemanager.ha.enabled</name> <name>yarn.resourcemanager.ha.enabled</name>
@ -442,7 +444,10 @@
<property> <property>
<description>The id (string) of the current RM. When HA is enabled, this <description>The id (string) of the current RM. When HA is enabled, this
is a required config. See description of yarn.resourcemanager.ha.enabled is an optional config. The id of current RM can be set by explicitly
specifying yarn.resourcemanager.ha.id or figured out by matching
yarn.resourcemanager.address.{id} with local address
See description of yarn.resourcemanager.ha.enabled
for full details on how this is used.</description> for full details on how this is used.</description>
<name>yarn.resourcemanager.ha.id</name> <name>yarn.resourcemanager.ha.id</name>
<!--value>rm1</value--> <!--value>rm1</value-->

View File

@ -36,6 +36,8 @@
import java.io.IOException; import java.io.IOException;
import junit.framework.Assert;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -48,12 +50,15 @@ public class TestRMHA {
private static final String STATE_ERR = private static final String STATE_ERR =
"ResourceManager is in wrong HA state"; "ResourceManager is in wrong HA state";
private static final String RM1_ADDRESS = "0.0.0.0:0"; private static final String RM1_ADDRESS = "1.1.1.1:1";
private static final String RM1_NODE_ID = "rm1"; private static final String RM1_NODE_ID = "rm1";
private static final String RM2_ADDRESS = "1.1.1.1:1"; private static final String RM2_ADDRESS = "0.0.0.0:0";
private static final String RM2_NODE_ID = "rm2"; private static final String RM2_NODE_ID = "rm2";
private static final String RM3_ADDRESS = "2.2.2.2:2";
private static final String RM3_NODE_ID = "rm3";
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
@ -61,8 +66,8 @@ public void setUp() throws Exception {
for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) { for (String confKey : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
configuration.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS); configuration.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
configuration.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS); configuration.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
configuration.set(HAUtil.addSuffix(confKey, RM3_NODE_ID), RM3_ADDRESS);
} }
configuration.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
} }
private void checkMonitorHealth() throws IOException { private void checkMonitorHealth() throws IOException {
@ -278,6 +283,36 @@ protected Dispatcher createDispatcher() {
rm.stop(); rm.stop();
} }
@Test
public void testHAIDLookup() {
//test implicitly lookup HA-ID
Configuration conf = new YarnConfiguration(configuration);
rm = new MockRM(conf);
rm.init(conf);
assertEquals(conf.get(YarnConfiguration.RM_HA_ID), RM2_NODE_ID);
//test explicitly lookup HA-ID
configuration.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
conf = new YarnConfiguration(configuration);
rm = new MockRM(conf);
rm.init(conf);
assertEquals(conf.get(YarnConfiguration.RM_HA_ID), RM1_NODE_ID);
//test if RM_HA_ID can not be found
configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID+ "," + RM3_NODE_ID);
configuration.unset(YarnConfiguration.RM_HA_ID);
conf = new YarnConfiguration(configuration);
try {
rm = new MockRM(conf);
rm.init(conf);
fail("Should get an exception here.");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains(
"Invalid configuration! Can not find valid RM_HA_ID."));
}
}
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
class MyCountingDispatcher extends AbstractService implements Dispatcher { class MyCountingDispatcher extends AbstractService implements Dispatcher {