YARN-1994. Expose YARN/MR endpoints on multiple interfaces. Contributed by Craig Welch, Milan Potocnik,and Arpit Agarwal

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1614981 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Xuan Gong 2014-07-31 20:06:02 +00:00
parent 1d6e178144
commit e52f67e389
30 changed files with 429 additions and 35 deletions

View File

@ -1843,6 +1843,38 @@ protected char[] getPasswordFromConfig(String name) {
return pass;
}
/**
* Get the socket address for <code>hostProperty</code> as a
* <code>InetSocketAddress</code>. If <code>hostProperty</code> is
* <code>null</code>, <code>addressProperty</code> will be used. This
* is useful for cases where we want to differentiate between host
* bind address and address clients should use to establish connection.
*
* @param hostProperty bind host property name.
* @param addressProperty address property name.
* @param defaultAddressValue the default value
* @param defaultPort the default port
* @return InetSocketAddress
*/
public InetSocketAddress getSocketAddr(
String hostProperty,
String addressProperty,
String defaultAddressValue,
int defaultPort) {
InetSocketAddress bindAddr = getSocketAddr(
addressProperty, defaultAddressValue, defaultPort);
final String host = get(hostProperty);
if (host == null || host.isEmpty()) {
return bindAddr;
}
return NetUtils.createSocketAddr(
host, bindAddr.getPort(), hostProperty);
}
/**
* Get the socket address for <code>name</code> property as a
* <code>InetSocketAddress</code>.
@ -1864,6 +1896,40 @@ public InetSocketAddress getSocketAddr(
public void setSocketAddr(String name, InetSocketAddress addr) {
set(name, NetUtils.getHostPortString(addr));
}
/**
* Set the socket address a client can use to connect for the
* <code>name</code> property as a <code>host:port</code>. The wildcard
* address is replaced with the local host's address. If the host and address
* properties are configured the host component of the address will be combined
* with the port component of the addr to generate the address. This is to allow
* optional control over which host name is used in multi-home bind-host
* cases where a host can have multiple names
* @param hostProperty the bind-host configuration name
* @param addressProperty the service address configuration name
* @param defaultAddressValue the service default address configuration value
* @param addr InetSocketAddress of the service listener
* @return InetSocketAddress for clients to connect
*/
public InetSocketAddress updateConnectAddr(
String hostProperty,
String addressProperty,
String defaultAddressValue,
InetSocketAddress addr) {
final String host = get(hostProperty);
final String connectHostPort = getTrimmed(addressProperty, defaultAddressValue);
if (host == null || host.isEmpty() || connectHostPort == null || connectHostPort.isEmpty()) {
//not our case, fall back to original logic
return updateConnectAddr(addressProperty, addr);
}
final String connectHost = connectHostPort.split(":")[0];
// Create connect address using client address hostname and server port.
return updateConnectAddr(addressProperty, NetUtils.createSocketAddrForHost(
connectHost, addr.getPort()));
}
/**
* Set the socket address a client can use to connect for the

View File

@ -83,6 +83,9 @@ Trunk (Unreleased)
MAPREDUCE-5912. Task.calculateOutputSize does not handle Windows files after
MAPREDUCE-5196. (Remus Rusanu via cnauroth)
MAPREDUCE-6019. MapReduce changes for exposing YARN/MR endpoints on multiple
interfaces. (Craig Welch, Milan Potocnik, Arpit Agarwal via xgong)
BUG FIXES
MAPREDUCE-5714. Removed forceful JVM exit in shutDownJob.

View File

@ -141,7 +141,9 @@ protected void startRpcServer() {
}
server.start();
this.address = NetUtils.getConnectAddress(server);
this.address = NetUtils.createSocketAddrForHost(
context.getNMHostname(),
server.getListenerAddress().getPort());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}

View File

@ -66,4 +66,5 @@ public interface AppContext {
boolean hasSuccessfullyUnregistered();
String getNMHostname();
}

View File

@ -1018,6 +1018,11 @@ public void markSuccessfulUnregistration() {
public void resetIsLastAMRetry() {
isLastAMRetry = false;
}
@Override
public String getNMHostname() {
return nmHost;
}
}
@SuppressWarnings("unchecked")

View File

@ -131,7 +131,8 @@ protected void serviceStart() throws Exception {
}
server.start();
this.bindAddress = NetUtils.getConnectAddress(server);
this.bindAddress = NetUtils.createSocketAddrForHost(appContext.getNMHostname(),
server.getListenerAddress().getPort());
LOG.info("Instantiated MRClientService at " + this.bindAddress);
try {
// Explicitly disabling SSL for map reduce task as we can't allow MR users

View File

@ -61,6 +61,13 @@ public class TestTaskAttemptListenerImpl {
public static class MockTaskAttemptListenerImpl
extends TaskAttemptListenerImpl {
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler, AMPreemptionPolicy policy) {
super(context, jobTokenSecretManager, rmHeartbeatHandler, policy);
}
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler,
@ -210,7 +217,7 @@ public void testGetMapCompletionEvents() throws IOException {
when(appCtx.getEventHandler()).thenReturn(ea);
CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
policy.init(appCtx);
TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
@ -271,7 +278,7 @@ public void testCommitWindow() throws IOException {
when(appCtx.getEventHandler()).thenReturn(ea);
CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
policy.init(appCtx);
TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
@ -326,7 +333,7 @@ public void testCheckpointIDTracking()
when(appCtx.getEventHandler()).thenReturn(ea);
CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
policy.init(appCtx);
TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(
appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {

View File

@ -143,4 +143,9 @@ public boolean hasSuccessfullyUnregistered() {
return true;
}
@Override
public String getNMHostname() {
// bogus - Not Required
return null;
}
}

View File

@ -879,5 +879,10 @@ public boolean hasSuccessfullyUnregistered() {
return true;
}
@Override
public String getNMHostname() {
// bogus - Not Required
return null;
}
}
}

View File

@ -38,7 +38,9 @@ public class JHAdminConfig {
public static final int DEFAULT_MR_HISTORY_PORT = 10020;
public static final String DEFAULT_MR_HISTORY_ADDRESS = "0.0.0.0:" +
DEFAULT_MR_HISTORY_PORT;
public static final String MR_HISTORY_BIND_HOST = MR_HISTORY_PREFIX
+ "bind-host";
/** The address of the History server admin interface. */
public static final String JHS_ADMIN_ADDRESS = MR_HISTORY_PREFIX
+ "admin.address";

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@ -105,11 +106,15 @@ public static String getJHSWebappURLWithScheme(Configuration conf) {
public static InetSocketAddress getJHSWebBindAddress(Configuration conf) {
if (httpPolicyInJHS == Policy.HTTPS_ONLY) {
return conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
return conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_PORT);
} else {
return conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
return conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT);
}

View File

@ -83,6 +83,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.webapp.WebApp;
@ -119,6 +120,7 @@ protected void serviceStart() throws Exception {
YarnRPC rpc = YarnRPC.create(conf);
initializeWebApp(conf);
InetSocketAddress address = conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_PORT);
@ -137,9 +139,11 @@ protected void serviceStart() throws Exception {
}
server.start();
this.bindAddress = conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_ADDRESS,
this.bindAddress = conf.updateConnectAddr(JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS,
server.getListenerAddress());
LOG.info("Instantiated MRClientService at " + this.bindAddress);
LOG.info("Instantiated HistoryClientService at " + this.bindAddress);
super.serviceStart();
}
@ -158,8 +162,9 @@ protected void initializeWebApp(Configuration conf) {
JHAdminConfig.MR_WEBAPP_SPNEGO_USER_NAME_KEY)
.at(NetUtils.getHostPortString(bindAddress)).start(webApp);
String connectHost = MRWebAppUtil.getJHSWebappURLWithoutScheme(conf).split(":")[0];
MRWebAppUtil.setJHSWebappURLWithoutScheme(conf,
NetUtils.getHostPortString(webApp.getListenerAddress()));
connectHost + ":" + webApp.getListenerAddress().getPort());
}
@Override

View File

@ -394,4 +394,9 @@ public boolean hasSuccessfullyUnregistered() {
return true;
}
@Override
public String getNMHostname() {
// bogus - Not Required
return null;
}
}

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserMappingsProtocolService;
import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
@ -94,7 +95,9 @@ public void serviceInit(Configuration conf) throws Exception {
WritableRpcEngine.ensureInitialized();
clientRpcAddress = conf.getSocketAddr(JHAdminConfig.JHS_ADMIN_ADDRESS,
clientRpcAddress = conf.getSocketAddr(
JHAdminConfig.MR_HISTORY_BIND_HOST,
JHAdminConfig.JHS_ADMIN_ADDRESS,
JHAdminConfig.DEFAULT_JHS_ADMIN_ADDRESS,
JHAdminConfig.DEFAULT_JHS_ADMIN_PORT);
clientRpcServer = new RPC.Builder(conf)

View File

@ -77,6 +77,9 @@ Release 2.6.0 - UNRELEASED
YARN-2347. Consolidated RMStateVersion and NMDBSchemaVersion into Version in
yarn-server-common. (Junping Du via zjshen)
YARN-1994. Expose YARN/MR endpoints on multiple interfaces. (Craig Welch,
Milan Potocnik, Arpit Agarwal via xgong)
OPTIMIZATIONS
BUG FIXES

View File

@ -126,6 +126,10 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_RM_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_PORT;
/** The actual bind address for the RM.*/
public static final String RM_BIND_HOST =
RM_PREFIX + "bind-host";
/** The number of threads used to handle applications manager requests.*/
public static final String RM_CLIENT_THREAD_COUNT =
RM_PREFIX + "client.thread-count";
@ -545,6 +549,10 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NM_ADDRESS = "0.0.0.0:"
+ DEFAULT_NM_PORT;
/** The actual bind address or the NM.*/
public static final String NM_BIND_HOST =
NM_PREFIX + "bind-host";
/** who will execute(launch) the containers.*/
public static final String NM_CONTAINER_EXECUTOR =
NM_PREFIX + "container-executor.class";
@ -1132,6 +1140,10 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_TIMELINE_SERVICE_ADDRESS = "0.0.0.0:"
+ DEFAULT_TIMELINE_SERVICE_PORT;
/** The listening endpoint for the timeline service application.*/
public static final String TIMELINE_SERVICE_BIND_HOST =
TIMELINE_SERVICE_PREFIX + "bind-host";
/** The number of threads to handle client RPC API requests. */
public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT =
TIMELINE_SERVICE_PREFIX + "handler-thread-count";

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.RMHAUtils;
@Private
@ -170,6 +171,37 @@ private static String getResolvedAddress(InetSocketAddress address) {
return sb.toString();
}
/**
* Get the URL to use for binding where bind hostname can be specified
* to override the hostname in the webAppURLWithoutScheme. Port specified in the
* webAppURLWithoutScheme will be used.
*
* @param conf the configuration
* @param hostProperty bind host property name
* @param webAppURLWithoutScheme web app URL without scheme String
* @return String representing bind URL
*/
public static String getWebAppBindURL(
Configuration conf,
String hostProperty,
String webAppURLWithoutScheme) {
// If the bind-host setting exists then it overrides the hostname
// portion of the corresponding webAppURLWithoutScheme
String host = conf.getTrimmed(hostProperty);
if (host != null && !host.isEmpty()) {
if (webAppURLWithoutScheme.contains(":")) {
webAppURLWithoutScheme = host + ":" + webAppURLWithoutScheme.split(":")[1];
}
else {
throw new YarnRuntimeException("webAppURLWithoutScheme must include port specification but doesn't: " +
webAppURLWithoutScheme);
}
}
return webAppURLWithoutScheme;
}
public static String getNMWebAppURLWithoutScheme(Configuration conf) {
if (YarnConfiguration.useHttps(conf)) {
return conf.get(YarnConfiguration.NM_WEBAPP_HTTPS_ADDRESS,

View File

@ -70,6 +70,17 @@
<value>${yarn.resourcemanager.hostname}:8032</value>
</property>
<property>
<description>
The actual address the server will bind to. If this optional address is
set, the RPC and webapp servers will bind to this address and the port specified in
yarn.resourcemanager.address and yarn.resourcemanager.webapp.address, respectively. This
is most useful for making RM listen to all interfaces by setting to 0.0.0.0.
</description>
<name>yarn.resourcemanager.bind-host</name>
<value></value>
</property>
<property>
<description>The number of threads used to handle applications manager requests.</description>
<name>yarn.resourcemanager.client.thread-count</name>
@ -635,6 +646,17 @@
<value>${yarn.nodemanager.hostname}:0</value>
</property>
<property>
<description>
The actual address the server will bind to. If this optional address is
set, the RPC and webapp servers will bind to this address and the port specified in
yarn.nodemanager.address and yarn.nodemanager.webapp.address, respectively. This is
most useful for making NM listen to all interfaces by setting to 0.0.0.0.
</description>
<name>yarn.nodemanager.bind-host</name>
<value></value>
</property>
<property>
<description>Environment variables that should be forwarded from the NodeManager's environment to the container's.</description>
<name>yarn.nodemanager.admin-env</name>
@ -1172,6 +1194,18 @@
<value>${yarn.timeline-service.hostname}:8190</value>
</property>
<property>
<description>
The actual address the server will bind to. If this optional address is
set, the RPC and webapp servers will bind to this address and the port specified in
yarn.timeline-service.address and yarn.timeline-service.webapp.address, respectively.
This is most useful for making the service listen to all interfaces by setting to
0.0.0.0.
</description>
<name>yarn.timeline-service.bind-host</name>
<value></value>
</property>
<property>
<description>Store class name for timeline store.</description>
<name>yarn.timeline-service.store-class</name>

View File

@ -28,6 +28,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
public class TestYarnConfiguration {
@ -75,4 +76,131 @@ public void testGetSocketAddressForNMWithHA() {
YarnConfiguration.DEFAULT_NM_PORT);
assertEquals(1234, addr.getPort());
}
@Test
public void testGetSocketAddr() throws Exception {
YarnConfiguration conf;
InetSocketAddress resourceTrackerAddress;
//all default
conf = new YarnConfiguration();
resourceTrackerAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
assertEquals(
new InetSocketAddress(
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0],
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
resourceTrackerAddress);
//with address
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.1");
resourceTrackerAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
assertEquals(
new InetSocketAddress(
"10.0.0.1",
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
resourceTrackerAddress);
//address and socket
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2:5001");
resourceTrackerAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
assertEquals(
new InetSocketAddress(
"10.0.0.2",
5001),
resourceTrackerAddress);
//bind host only
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_BIND_HOST, "10.0.0.3");
resourceTrackerAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
assertEquals(
new InetSocketAddress(
"10.0.0.3",
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
resourceTrackerAddress);
//bind host and address no port
conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0");
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2");
resourceTrackerAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
assertEquals(
new InetSocketAddress(
"0.0.0.0",
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT),
resourceTrackerAddress);
//bind host and address with port
conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0");
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "10.0.0.2:5003");
resourceTrackerAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
assertEquals(
new InetSocketAddress(
"0.0.0.0",
5003),
resourceTrackerAddress);
}
@Test
public void testUpdateConnectAddr() throws Exception {
YarnConfiguration conf;
InetSocketAddress resourceTrackerConnectAddress;
InetSocketAddress serverAddress;
//no override, old behavior. Won't work on a host named "yo.yo.yo"
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "yo.yo.yo");
serverAddress = new InetSocketAddress(
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0],
Integer.valueOf(YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[1]));
resourceTrackerConnectAddress = conf.updateConnectAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
serverAddress);
assertFalse(resourceTrackerConnectAddress.toString().startsWith("yo.yo.yo"));
//cause override with address
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "yo.yo.yo");
conf.set(YarnConfiguration.RM_BIND_HOST, "0.0.0.0");
serverAddress = new InetSocketAddress(
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[0],
Integer.valueOf(YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS.split(":")[1]));
resourceTrackerConnectAddress = conf.updateConnectAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
serverAddress);
assertTrue(resourceTrackerConnectAddress.toString().startsWith("yo.yo.yo"));
}
}

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
public class ApplicationHistoryClientService extends AbstractService {
@ -75,10 +76,11 @@ public ApplicationHistoryClientService(ApplicationHistoryManager history) {
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress address =
conf.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
InetSocketAddress address = conf.getSocketAddr(
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT);
server =
rpc.getServer(ApplicationHistoryProtocol.class, protocolHandler,
@ -88,8 +90,10 @@ protected void serviceStart() throws Exception {
server.start();
this.bindAddress =
conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
server.getListenerAddress());
conf.updateConnectAddr(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
server.getListenerAddress());
LOG.info("Instantiated ApplicationHistoryClientService at "
+ this.bindAddress);

View File

@ -192,7 +192,9 @@ protected void startWebApp() {
TimelineAuthenticationFilterInitializer.class.getName()
+ initializers);
}
String bindAddress = WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
String bindAddress = WebAppUtils.getWebAppBindURL(conf,
YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
LOG.info("Instantiating AHSWebApp at " + bindAddress);
try {
AHSWebApp ahsWebApp = AHSWebApp.getInstance();

View File

@ -275,6 +275,7 @@ protected void serviceStart() throws Exception {
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress initialAddress = conf.getSocketAddr(
YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_ADDRESS,
YarnConfiguration.DEFAULT_NM_ADDRESS,
YarnConfiguration.DEFAULT_NM_PORT);
@ -296,7 +297,22 @@ protected void serviceStart() throws Exception {
" server is still starting.");
this.setBlockNewContainerRequests(true);
server.start();
InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
InetSocketAddress connectAddress;
String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST);
String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS);
if (bindHost == null || bindHost.isEmpty() ||
nmAddress == null || nmAddress.isEmpty()) {
connectAddress = NetUtils.getConnectAddress(server);
} else {
//a bind-host case with an address, to support overriding the first hostname
//found when querying for our hostname with the specified address, combine
//the specified address with the actual port listened on by the server
connectAddress = NetUtils.getConnectAddress(
new InetSocketAddress(nmAddress.split(":")[0],
server.getListenerAddress().getPort()));
}
NodeId nodeId = NodeId.newInstance(
connectAddress.getAddress().getCanonicalHostName(),
connectAddress.getPort());
@ -304,6 +320,7 @@ protected void serviceStart() throws Exception {
this.context.getNMTokenSecretManager().setNodeId(nodeId);
this.context.getContainerTokenSecretManager().setNodeId(nodeId);
LOG.info("ContainerManager started at " + connectAddress);
LOG.info("ContainerManager bound to " + initialAddress);
super.serviceStart();
}

View File

@ -81,6 +81,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
@ -251,6 +252,7 @@ public void serviceInit(Configuration conf) throws Exception {
cacheCleanupPeriod =
conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
localizationServerAddress = conf.getSocketAddr(
YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
@ -341,7 +343,9 @@ public void serviceStart() throws Exception {
server = createServer();
server.start();
localizationServerAddress =
getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS,
getConfig().updateConnectAddr(YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
server.getListenerAddress());
LOG.info("Localizer started on port " + server.getPort());
super.serviceStart();

View File

@ -55,7 +55,9 @@ public WebServer(Context nmContext, ResourceView resourceView,
@Override
protected void serviceStart() throws Exception {
String bindAddress = WebAppUtils.getNMWebAppURLWithoutScheme(getConfig());
String bindAddress = WebAppUtils.getWebAppBindURL(getConfig(),
YarnConfiguration.NM_BIND_HOST,
WebAppUtils.getNMWebAppURLWithoutScheme(getConfig()));
LOG.info("Instantiating NMWebApp at " + bindAddress);
try {

View File

@ -90,7 +90,9 @@ public class AdminService extends CompositeService implements
private EmbeddedElectorService embeddedElector;
private Server server;
private InetSocketAddress masterServiceAddress;
// Address to use for binding. May be a wildcard address.
private InetSocketAddress masterServiceBindAddress;
private AccessControlList adminAcl;
private final RecordFactory recordFactory =
@ -114,10 +116,12 @@ public void serviceInit(Configuration conf) throws Exception {
}
}
masterServiceAddress = conf.getSocketAddr(
masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
@ -141,7 +145,7 @@ protected void startServer() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server = (Server) rpc.getServer(
ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
ResourceManagerAdministrationProtocol.class, this, masterServiceBindAddress,
conf, null,
conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
@ -170,8 +174,10 @@ protected void startServer() throws Exception {
}
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
server.getListenerAddress());
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
server.getListenerAddress());
}
protected void stopServer() throws Exception {

View File

@ -127,6 +127,7 @@ protected void serviceStart() throws Exception {
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
@ -159,7 +160,9 @@ protected void serviceStart() throws Exception {
this.server.start();
this.bindAddress =
conf.updateConnectAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS,
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
server.getListenerAddress());
super.serviceStart();
}

View File

@ -199,7 +199,9 @@ protected void serviceStart() throws Exception {
}
this.server.start();
clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
server.getListenerAddress());
super.serviceStart();
}
@ -213,7 +215,9 @@ protected void serviceStop() throws Exception {
}
InetSocketAddress getBindAddress(Configuration conf) {
return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
return conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
}

View File

@ -155,7 +155,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker;
private String webAppAddress;
@VisibleForTesting
protected String webAppAddress;
private ConfigurationProvider configurationProvider = null;
/** End of Active services */
@ -230,7 +231,9 @@ protected void serviceInit(Configuration conf) throws Exception {
}
createAndInitActiveServices();
webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(this.conf);
webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
YarnConfiguration.RM_BIND_HOST,
WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));
this.rmLoginUGI = UserGroupInformation.getCurrentUser();

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@ -121,6 +122,7 @@ public ResourceTrackerService(RMContext rmContext,
@Override
protected void serviceInit(Configuration conf) throws Exception {
resourceTrackerAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT);
@ -175,9 +177,11 @@ protected void serviceStart() throws Exception {
}
refreshServiceAcls(conf, RMPolicyProvider.getInstance());
}
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
server.getListenerAddress());
}

View File

@ -380,7 +380,19 @@ public void testHAIDLookup() {
}
@Test
public void testHAWithRMHostName() {
public void testHAWithRMHostName() throws Exception {
innerTestHAWithRMHostName(false);
configuration.clear();
setUp();
innerTestHAWithRMHostName(true);
}
public void innerTestHAWithRMHostName(boolean includeBindHost) {
//this is run two times, with and without a bind host configured
if (includeBindHost) {
configuration.set(YarnConfiguration.RM_BIND_HOST, "9.9.9.9");
}
//test if both RM_HOSTBANE_{rm_id} and RM_RPCADDRESS_{rm_id} are set
//We should only read rpc addresses from RM_RPCADDRESS_{rm_id} configuration
configuration.set(HAUtil.addSuffix(YarnConfiguration.RM_HOSTNAME,
@ -400,6 +412,15 @@ public void testHAWithRMHostName() {
RM2_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM2_NODE_ID)));
assertEquals("RPC address not set for " + confKey,
RM3_ADDRESS, conf.get(HAUtil.addSuffix(confKey, RM3_NODE_ID)));
if (includeBindHost) {
assertEquals("Web address misconfigured WITH bind-host",
rm.webAppAddress.substring(0, 7), "9.9.9.9");
} else {
//YarnConfiguration tries to figure out which rm host it's on by binding to it,
//which doesn't happen for any of these fake addresses, so we end up with 0.0.0.0
assertEquals("Web address misconfigured WITHOUT bind-host",
rm.webAppAddress.substring(0, 7), "0.0.0.0");
}
}
} catch (YarnRuntimeException e) {
fail("Should not throw any exceptions.");