HADOOP-11757. NFS gateway should shutdown when it can't start UDP or TCP server. Contributed by Brandon Li

This commit is contained in:
Brandon Li 2015-04-01 17:04:44 -07:00
parent 6ccf4fbf8a
commit 60ce825a71
5 changed files with 82 additions and 28 deletions

View File

@ -1186,7 +1186,10 @@ Release 2.7.0 - UNRELEASED
HADOOP-11787. OpensslSecureRandom.c pthread_threadid_np usage signature is
wrong on 32-bit Mac. (Kiran Kumar M R via cnauroth)
HADOOP-11757. NFS gateway should shutdown when it can't start UDP or TCP
server (brandonli)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -60,7 +60,17 @@ private void startUDPServer() {
SimpleUdpServer udpServer = new SimpleUdpServer(rpcProgram.getPort(),
rpcProgram, 1);
rpcProgram.startDaemons();
udpServer.run();
try {
udpServer.run();
} catch (Throwable e) {
LOG.fatal("Failed to start the UDP server.", e);
if (udpServer.getBoundPort() > 0) {
rpcProgram.unregister(PortmapMapping.TRANSPORT_UDP,
udpServer.getBoundPort());
}
udpServer.shutdown();
terminate(1, e);
}
udpBoundPort = udpServer.getBoundPort();
}
@ -69,7 +79,17 @@ private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 1);
rpcProgram.startDaemons();
tcpServer.run();
try {
tcpServer.run();
} catch (Throwable e) {
LOG.fatal("Failed to start the TCP server.", e);
if (tcpServer.getBoundPort() > 0) {
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP,
tcpServer.getBoundPort());
}
tcpServer.shutdown();
terminate(1, e);
}
tcpBoundPort = tcpServer.getBoundPort();
}
@ -83,7 +103,7 @@ public void start(boolean register) {
rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort);
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
} catch (Throwable e) {
LOG.fatal("Failed to start the server. Cause:", e);
LOG.fatal("Failed to register the MOUNT service.", e);
terminate(1, e);
}
}

View File

@ -29,7 +29,6 @@
/**
* Nfs server. Supports NFS v3 using {@link RpcProgram}.
* Currently Mountd program is also started inside this class.
* Only TCP server is supported and UDP is not supported.
*/
public abstract class Nfs3Base {
@ -55,7 +54,7 @@ public void start(boolean register) {
try {
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort);
} catch (Throwable e) {
LOG.fatal("Failed to start the server. Cause:", e);
LOG.fatal("Failed to register the NFSv3 service.", e);
terminate(1, e);
}
}
@ -65,7 +64,17 @@ private void startTCPServer() {
SimpleTcpServer tcpServer = new SimpleTcpServer(rpcProgram.getPort(),
rpcProgram, 0);
rpcProgram.startDaemons();
tcpServer.run();
try {
tcpServer.run();
} catch (Throwable e) {
LOG.fatal("Failed to start the TCP server.", e);
if (tcpServer.getBoundPort() > 0) {
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP,
tcpServer.getBoundPort());
}
tcpServer.shutdown();
terminate(1, e);
}
nfsBoundPort = tcpServer.getBoundPort();
}

View File

@ -39,7 +39,9 @@ public class SimpleTcpServer {
protected final int port;
protected int boundPort = -1; // Will be set after server starts
protected final SimpleChannelUpstreamHandler rpcProgram;
private ServerBootstrap server;
private Channel ch;
/** The maximum number of I/O worker threads */
protected final int workerCount;
@ -53,7 +55,7 @@ public SimpleTcpServer(int port, RpcProgram program, int workercount) {
this.rpcProgram = program;
this.workerCount = workercount;
}
public void run() {
// Configure the Server.
ChannelFactory factory;
@ -66,9 +68,9 @@ public void run() {
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
workerCount);
}
ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
server = new ServerBootstrap(factory);
server.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
@ -77,14 +79,14 @@ public ChannelPipeline getPipeline() throws Exception {
RpcUtil.STAGE_RPC_TCP_RESPONSE);
}
});
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
server.setOption("child.tcpNoDelay", true);
server.setOption("child.keepAlive", true);
// Listen to TCP port
Channel ch = bootstrap.bind(new InetSocketAddress(port));
ch = server.bind(new InetSocketAddress(port));
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
boundPort = socketAddr.getPort();
LOG.info("Started listening to TCP requests at port " + boundPort + " for "
+ rpcProgram + " with workerCount " + workerCount);
}
@ -93,4 +95,13 @@ public ChannelPipeline getPipeline() throws Exception {
public int getBoundPort() {
return this.boundPort;
}
public void shutdown() {
if (ch != null) {
ch.close().awaitUninterruptibly();
}
if (server != null) {
server.releaseExternalResources();
}
}
}

View File

@ -41,8 +41,11 @@ public class SimpleUdpServer {
protected final SimpleChannelUpstreamHandler rpcProgram;
protected final int workerCount;
protected int boundPort = -1; // Will be set after server starts
private ConnectionlessBootstrap server;
private Channel ch;
public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program, int workerCount) {
public SimpleUdpServer(int port, SimpleChannelUpstreamHandler program,
int workerCount) {
this.port = port;
this.rpcProgram = program;
this.workerCount = workerCount;
@ -53,20 +56,19 @@ public void run() {
DatagramChannelFactory f = new NioDatagramChannelFactory(
Executors.newCachedThreadPool(), workerCount);
ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
b.setPipeline(Channels.pipeline(
RpcUtil.STAGE_RPC_MESSAGE_PARSER, rpcProgram,
RpcUtil.STAGE_RPC_UDP_RESPONSE));
server = new ConnectionlessBootstrap(f);
server.setPipeline(Channels.pipeline(RpcUtil.STAGE_RPC_MESSAGE_PARSER,
rpcProgram, RpcUtil.STAGE_RPC_UDP_RESPONSE));
server.setOption("broadcast", "false");
server.setOption("sendBufferSize", SEND_BUFFER_SIZE);
server.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
b.setOption("broadcast", "false");
b.setOption("sendBufferSize", SEND_BUFFER_SIZE);
b.setOption("receiveBufferSize", RECEIVE_BUFFER_SIZE);
// Listen to the UDP port
Channel ch = b.bind(new InetSocketAddress(port));
ch = server.bind(new InetSocketAddress(port));
InetSocketAddress socketAddr = (InetSocketAddress) ch.getLocalAddress();
boundPort = socketAddr.getPort();
LOG.info("Started listening to UDP requests at port " + boundPort + " for "
+ rpcProgram + " with workerCount " + workerCount);
}
@ -75,4 +77,13 @@ public void run() {
public int getBoundPort() {
return this.boundPort;
}
public void shutdown() {
if (ch != null) {
ch.close().awaitUninterruptibly();
}
if (server != null) {
server.releaseExternalResources();
}
}
}