MAPREDUCE-4079. Allow MR AppMaster to limit ephemeral port range.(bobby via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1329694 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-04-24 13:03:15 +00:00
parent 858887e289
commit f340d6c894
8 changed files with 45 additions and 9 deletions

View File

@ -292,6 +292,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4161. create sockets consistently (Daryn Sharp via bobby)
MAPREDUCE-4079. Allow MR AppMaster to limit ephemeral port range.
(bobby via tgraves)
OPTIMIZATIONS
BUG FIXES

View File

@ -139,7 +139,8 @@ public void start() {
rpc.getServer(MRClientProtocol.class, protocolHandler, address,
conf, secretManager,
conf.getInt(MRJobConfig.MR_AM_JOB_CLIENT_THREAD_COUNT,
MRJobConfig.DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT));
MRJobConfig.DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT),
MRJobConfig.MR_AM_JOB_CLIENT_PORT_RANGE);
// Enable service authorization?
if (conf.getBoolean(

View File

@ -373,6 +373,13 @@ public interface MRJobConfig {
MR_AM_PREFIX + "job.client.thread-count";
public static final int DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT = 1;
/**
* Range of ports that the MapReduce AM can use when binding. Leave blank
* if you want all possible ports.
*/
public static final String MR_AM_JOB_CLIENT_PORT_RANGE =
MR_AM_PREFIX + "job.client.port-range";
/** Enable blacklisting of nodes in the job.*/
public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE =
MR_AM_PREFIX + "job.node-blacklisting.enable";

View File

@ -1236,6 +1236,14 @@
MR AppMaster from remote tasks</description>
</property>
<property>
<name>yarn.app.mapreduce.am.job.client.port-range</name>
<value></value>
<description>Range of ports that the MapReduce AM can use when binding.
Leave blank if you want all possible ports.
For example 50000-50050,50100-50200</description>
</property>
<property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value>

View File

@ -31,6 +31,6 @@ public interface RpcServerFactory {
public Server getServer(Class<?> protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers)
int numHandlers, String portRangeConfig)
throws YarnException;
}

View File

@ -61,11 +61,20 @@ public static RpcServerFactoryPBImpl get() {
private RpcServerFactoryPBImpl() {
}
@Override
public Server getServer(Class<?> protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers)
throws YarnException {
return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
null);
}
@Override
public Server getServer(Class<?> protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
String portRangeConfig)
throws YarnException {
Constructor<?> constructor = serviceCache.get(protocol);
if (constructor == null) {
@ -121,7 +130,7 @@ public Server getServer(Class<?> protocol, Object instance,
try {
return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
(BlockingService)method.invoke(null, service));
(BlockingService)method.invoke(null, service), portRangeConfig);
} catch (InvocationTargetException e) {
throw new YarnException(e);
} catch (IllegalAccessException e) {
@ -155,11 +164,11 @@ private String getPackageName(Class<?> clazz) {
private Server createServer(Class<?> pbProtocol, InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager, int numHandlers,
BlockingService blockingService) throws IOException {
BlockingService blockingService, String portRangeConfig) throws IOException {
RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class);
RPC.Server server = RPC.getServer(pbProtocol, blockingService,
addr.getHostName(), addr.getPort(), numHandlers, false, conf,
secretManager);
secretManager, portRangeConfig);
LOG.info("Adding protocol "+pbProtocol.getCanonicalName()+" to the server");
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, pbProtocol, blockingService);
return server;

View File

@ -56,12 +56,12 @@ public void stopProxy(Object proxy, Configuration conf) {
public Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers) {
int numHandlers, String portRangeConfig) {
LOG.debug("Creating a HadoopYarnProtoRpc server for protocol " + protocol +
" with " + numHandlers + " handlers");
return RpcFactoryProvider.getServerFactory(conf).getServer(protocol,
instance, addr, conf, secretManager, numHandlers);
instance, addr, conf, secretManager, numHandlers, portRangeConfig);
}

View File

@ -43,8 +43,16 @@ public abstract Object getProxy(Class protocol, InetSocketAddress addr,
public abstract Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers);
int numHandlers, String portRangeConfig);
public Server getServer(Class protocol, Object instance,
InetSocketAddress addr, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager,
int numHandlers) {
return getServer(protocol, instance, addr, conf, secretManager, numHandlers,
null);
}
public static YarnRPC create(Configuration conf) {
LOG.debug("Creating YarnRPC for " +
conf.get(YarnConfiguration.IPC_RPC_IMPL));