YARN-1411. HA config shouldn't affect NodeManager RPC addresses (Karthik Kambatla via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1542367 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
620890fcc0
commit
f7efa0b86e
@ -136,6 +136,9 @@ Release 2.3.0 - UNRELEASED
|
|||||||
YARN-1401. With zero sleep-delay-before-sigkill.ms, no signal is ever sent
|
YARN-1401. With zero sleep-delay-before-sigkill.ms, no signal is ever sent
|
||||||
(Gera Shegalov via Sandy Ryza)
|
(Gera Shegalov via Sandy Ryza)
|
||||||
|
|
||||||
|
YARN-1411. HA config shouldn't affect NodeManager RPC addresses (Karthik
|
||||||
|
Kambatla via bikas)
|
||||||
|
|
||||||
Release 2.2.1 - UNRELEASED
|
Release 2.2.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -23,29 +23,14 @@
|
|||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class HAUtil {
|
public class HAUtil {
|
||||||
private static Log LOG = LogFactory.getLog(HAUtil.class);
|
private static Log LOG = LogFactory.getLog(HAUtil.class);
|
||||||
|
|
||||||
public static final List<String> RPC_ADDRESS_CONF_KEYS =
|
|
||||||
Collections.unmodifiableList(Arrays.asList(
|
|
||||||
YarnConfiguration.RM_ADDRESS,
|
|
||||||
YarnConfiguration.RM_SCHEDULER_ADDRESS,
|
|
||||||
YarnConfiguration.RM_ADMIN_ADDRESS,
|
|
||||||
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
|
|
||||||
YarnConfiguration.RM_WEBAPP_ADDRESS,
|
|
||||||
YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
|
|
||||||
// TODO Remove after YARN-1318
|
|
||||||
YarnConfiguration.RM_HA_ADMIN_ADDRESS));
|
|
||||||
|
|
||||||
public static final String BAD_CONFIG_MESSAGE_PREFIX =
|
public static final String BAD_CONFIG_MESSAGE_PREFIX =
|
||||||
"Invalid configuration! ";
|
"Invalid configuration! ";
|
||||||
|
|
||||||
@ -139,7 +124,7 @@ private static void verifyAndSetConfValue(String prefix, Configuration conf) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static void verifyAndSetAllRpcAddresses(Configuration conf) {
|
public static void verifyAndSetAllRpcAddresses(Configuration conf) {
|
||||||
for (String confKey : RPC_ADDRESS_CONF_KEYS) {
|
for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
|
||||||
verifyAndSetConfValue(confKey, conf);
|
verifyAndSetConfValue(confKey, conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -188,9 +173,12 @@ static String getRMHAIdsWarningMessage(String ids) {
|
|||||||
ids.toString() + ")";
|
ids.toString() + ")";
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getConfKeyForRMInstance(String prefix,
|
@InterfaceAudience.Private
|
||||||
Configuration conf) {
|
@VisibleForTesting
|
||||||
return addSuffix(prefix, getRMHAId(conf));
|
static String getConfKeyForRMInstance(String prefix, Configuration conf) {
|
||||||
|
return YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS.contains(prefix)
|
||||||
|
? addSuffix(prefix, getRMHAId(conf))
|
||||||
|
: prefix;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String getConfValueForRMInstance(String prefix,
|
public static String getConfValueForRMInstance(String prefix,
|
||||||
|
@ -20,6 +20,8 @@
|
|||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
@ -295,6 +297,17 @@ public class YarnConfiguration extends Configuration {
|
|||||||
public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1;
|
public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1;
|
||||||
// end @Private
|
// end @Private
|
||||||
|
|
||||||
|
public static final List<String> RM_RPC_ADDRESS_CONF_KEYS =
|
||||||
|
Collections.unmodifiableList(Arrays.asList(
|
||||||
|
RM_ADDRESS,
|
||||||
|
RM_SCHEDULER_ADDRESS,
|
||||||
|
RM_ADMIN_ADDRESS,
|
||||||
|
RM_RESOURCE_TRACKER_ADDRESS,
|
||||||
|
RM_WEBAPP_ADDRESS,
|
||||||
|
RM_WEBAPP_HTTPS_ADDRESS,
|
||||||
|
// TODO Remove after YARN-1318
|
||||||
|
RM_HA_ADMIN_ADDRESS));
|
||||||
|
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
// RM state store configs
|
// RM state store configs
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
@ -924,7 +937,7 @@ public YarnConfiguration(Configuration conf) {
|
|||||||
public InetSocketAddress getSocketAddr(
|
public InetSocketAddress getSocketAddr(
|
||||||
String name, String defaultAddress, int defaultPort) {
|
String name, String defaultAddress, int defaultPort) {
|
||||||
String address;
|
String address;
|
||||||
if (HAUtil.isHAEnabled(this)) {
|
if (HAUtil.isHAEnabled(this) && RM_RPC_ADDRESS_CONF_KEYS.contains(name)) {
|
||||||
address = HAUtil.getConfValueForRMInstance(name, defaultAddress, this);
|
address = HAUtil.getConfValueForRMInstance(name, defaultAddress, this);
|
||||||
} else {
|
} else {
|
||||||
address = get(name, defaultAddress);
|
address = get(name, defaultAddress);
|
||||||
|
@ -28,7 +28,9 @@
|
|||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public class TestHAUtil {
|
public class TestHAUtil {
|
||||||
@ -51,7 +53,7 @@ public void setUp() {
|
|||||||
conf.set(YarnConfiguration.RM_HA_IDS, RM_NODE_IDS_UNTRIMMED);
|
conf.set(YarnConfiguration.RM_HA_IDS, RM_NODE_IDS_UNTRIMMED);
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
|
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
|
||||||
|
|
||||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
|
||||||
// configuration key itself cannot contains space/tab/return chars.
|
// configuration key itself cannot contains space/tab/return chars.
|
||||||
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
|
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
|
||||||
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
|
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
|
||||||
@ -92,7 +94,7 @@ public void testVerifyAndSetConfiguration() throws Exception {
|
|||||||
StringUtils.getStringCollection(RM_NODE_IDS), HAUtil.getRMHAIds(conf));
|
StringUtils.getStringCollection(RM_NODE_IDS), HAUtil.getRMHAIds(conf));
|
||||||
assertEquals("Should be saved as Trimmed string",
|
assertEquals("Should be saved as Trimmed string",
|
||||||
RM1_NODE_ID, HAUtil.getRMHAId(conf));
|
RM1_NODE_ID, HAUtil.getRMHAId(conf));
|
||||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
|
||||||
assertEquals("RPC address not set for " + confKey,
|
assertEquals("RPC address not set for " + confKey,
|
||||||
RM1_ADDRESS, conf.get(confKey));
|
RM1_ADDRESS, conf.get(confKey));
|
||||||
}
|
}
|
||||||
@ -111,7 +113,7 @@ public void testVerifyAndSetConfiguration() throws Exception {
|
|||||||
conf.clear();
|
conf.clear();
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, RM_INVALID_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_ID, RM_INVALID_NODE_ID);
|
||||||
conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
|
||||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
|
||||||
// simulate xml with invalid node id
|
// simulate xml with invalid node id
|
||||||
conf.set(confKey + RM_INVALID_NODE_ID, RM_INVALID_NODE_ID);
|
conf.set(confKey + RM_INVALID_NODE_ID, RM_INVALID_NODE_ID);
|
||||||
}
|
}
|
||||||
@ -126,7 +128,7 @@ public void testVerifyAndSetConfiguration() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
conf.clear();
|
conf.clear();
|
||||||
// simulate the case HAUtil.RPC_ADDRESS_CONF_KEYS are not set
|
// simulate the case HAUtil.RM_RPC_ADDRESS_CONF_KEYS are not set
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
||||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
|
||||||
try {
|
try {
|
||||||
@ -145,7 +147,7 @@ public void testVerifyAndSetConfiguration() throws Exception {
|
|||||||
conf.clear();
|
conf.clear();
|
||||||
conf.set(YarnConfiguration.RM_HA_IDS, RM2_NODE_ID + "," + RM3_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_IDS, RM2_NODE_ID + "," + RM3_NODE_ID);
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
|
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
|
||||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
|
||||||
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
|
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
|
||||||
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
|
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
|
||||||
}
|
}
|
||||||
@ -158,4 +160,14 @@ public void testVerifyAndSetConfiguration() throws Exception {
|
|||||||
e.getMessage());
|
e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetConfKeyForRMInstance() {
|
||||||
|
assertTrue("RM instance id is not suffixed",
|
||||||
|
HAUtil.getConfKeyForRMInstance(YarnConfiguration.RM_ADDRESS, conf)
|
||||||
|
.contains(HAUtil.getRMHAId(conf)));
|
||||||
|
assertFalse("RM instance id is suffixed",
|
||||||
|
HAUtil.getConfKeyForRMInstance(YarnConfiguration.NM_ADDRESS, conf)
|
||||||
|
.contains(HAUtil.getRMHAId(conf)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,12 @@
|
|||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.SocketAddress;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class TestYarnConfiguration {
|
public class TestYarnConfiguration {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -52,4 +58,21 @@ public void testRMWebUrlSpecified() throws Exception {
|
|||||||
"http://rmtesting:24543", rmWebUrl);
|
"http://rmtesting:24543", rmWebUrl);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetSocketAddressForNMWithHA() {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
|
||||||
|
// Set NM address
|
||||||
|
conf.set(YarnConfiguration.NM_ADDRESS, "0.0.0.0:1234");
|
||||||
|
|
||||||
|
// Set HA
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
|
conf.set(YarnConfiguration.RM_HA_ID, "rm1");
|
||||||
|
assertTrue(HAUtil.isHAEnabled(conf));
|
||||||
|
|
||||||
|
InetSocketAddress addr = conf.getSocketAddr(YarnConfiguration.NM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_NM_PORT);
|
||||||
|
assertEquals(1234, addr.getPort());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -51,7 +51,7 @@ public void setUp() throws Exception {
|
|||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
|
||||||
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
|
||||||
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
|
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
|
||||||
}
|
}
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
|
||||||
|
@ -104,7 +104,7 @@ private Configuration createHARMConf(
|
|||||||
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
|
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
|
||||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
|
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
|
||||||
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||||
for (String rpcAddress : HAUtil.RPC_ADDRESS_CONF_KEYS) {
|
for (String rpcAddress : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
|
||||||
conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0");
|
conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0");
|
||||||
}
|
}
|
||||||
conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort);
|
conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort);
|
||||||
|
Loading…
Reference in New Issue
Block a user