HDFS-5649. Unregister NFS and Mount service when NFS gateway is shutting down. Contributed by Brandon Li
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1556405 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
53b27fc60b
commit
1be8ee2424
@ -23,6 +23,7 @@
|
|||||||
import org.apache.hadoop.oncrpc.SimpleTcpServer;
|
import org.apache.hadoop.oncrpc.SimpleTcpServer;
|
||||||
import org.apache.hadoop.oncrpc.SimpleUdpServer;
|
import org.apache.hadoop.oncrpc.SimpleUdpServer;
|
||||||
import org.apache.hadoop.portmap.PortmapMapping;
|
import org.apache.hadoop.portmap.PortmapMapping;
|
||||||
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main class for starting mountd daemon. This daemon implements the NFS
|
* Main class for starting mountd daemon. This daemon implements the NFS
|
||||||
@ -71,8 +72,24 @@ public void start(boolean register) {
|
|||||||
startUDPServer();
|
startUDPServer();
|
||||||
startTCPServer();
|
startTCPServer();
|
||||||
if (register) {
|
if (register) {
|
||||||
|
ShutdownHookManager.get().addShutdownHook(new Unregister(),
|
||||||
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort);
|
rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort);
|
||||||
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
|
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Priority of the mountd shutdown hook.
|
||||||
|
*/
|
||||||
|
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
|
||||||
|
|
||||||
|
private class Unregister implements Runnable {
|
||||||
|
@Override
|
||||||
|
public synchronized void run() {
|
||||||
|
rpcProgram.unregister(PortmapMapping.TRANSPORT_UDP, udpBoundPort);
|
||||||
|
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, tcpBoundPort);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import org.apache.hadoop.oncrpc.RpcProgram;
|
import org.apache.hadoop.oncrpc.RpcProgram;
|
||||||
import org.apache.hadoop.oncrpc.SimpleTcpServer;
|
import org.apache.hadoop.oncrpc.SimpleTcpServer;
|
||||||
import org.apache.hadoop.portmap.PortmapMapping;
|
import org.apache.hadoop.portmap.PortmapMapping;
|
||||||
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Nfs server. Supports NFS v3 using {@link RpcProgram}.
|
* Nfs server. Supports NFS v3 using {@link RpcProgram}.
|
||||||
@ -50,6 +51,8 @@ public void start(boolean register) {
|
|||||||
startTCPServer(); // Start TCP server
|
startTCPServer(); // Start TCP server
|
||||||
|
|
||||||
if (register) {
|
if (register) {
|
||||||
|
ShutdownHookManager.get().addShutdownHook(new Unregister(),
|
||||||
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort);
|
rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -61,4 +64,16 @@ private void startTCPServer() {
|
|||||||
tcpServer.run();
|
tcpServer.run();
|
||||||
nfsBoundPort = tcpServer.getBoundPort();
|
nfsBoundPort = tcpServer.getBoundPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Priority of the nfsd shutdown hook.
|
||||||
|
*/
|
||||||
|
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
|
||||||
|
|
||||||
|
private class Unregister implements Runnable {
|
||||||
|
@Override
|
||||||
|
public synchronized void run() {
|
||||||
|
rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, nfsBoundPort);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -78,23 +78,41 @@ public void register(int transport, int boundPort) {
|
|||||||
for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
|
for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
|
||||||
PortmapMapping mapEntry = new PortmapMapping(progNumber, vers, transport,
|
PortmapMapping mapEntry = new PortmapMapping(progNumber, vers, transport,
|
||||||
port);
|
port);
|
||||||
register(mapEntry);
|
register(mapEntry, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unregister this program with the local portmapper.
|
||||||
|
*/
|
||||||
|
public void unregister(int transport, int boundPort) {
|
||||||
|
if (boundPort != port) {
|
||||||
|
LOG.info("The bound port is " + boundPort
|
||||||
|
+ ", different with configured port " + port);
|
||||||
|
port = boundPort;
|
||||||
|
}
|
||||||
|
// Unregister all the program versions with portmapper for a given transport
|
||||||
|
for (int vers = lowProgVersion; vers <= highProgVersion; vers++) {
|
||||||
|
PortmapMapping mapEntry = new PortmapMapping(progNumber, vers, transport,
|
||||||
|
port);
|
||||||
|
register(mapEntry, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register the program with Portmap or Rpcbind
|
* Register the program with Portmap or Rpcbind
|
||||||
*/
|
*/
|
||||||
protected void register(PortmapMapping mapEntry) {
|
protected void register(PortmapMapping mapEntry, boolean set) {
|
||||||
XDR mappingRequest = PortmapRequest.create(mapEntry);
|
XDR mappingRequest = PortmapRequest.create(mapEntry, set);
|
||||||
SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
|
SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT,
|
||||||
mappingRequest);
|
mappingRequest);
|
||||||
try {
|
try {
|
||||||
registrationClient.run();
|
registrationClient.run();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Registration failure with " + host + ":" + port
|
String request = set ? "Registration" : "Unregistration";
|
||||||
|
LOG.error(request + " failure with " + host + ":" + port
|
||||||
+ ", portmap entry: " + mapEntry);
|
+ ", portmap entry: " + mapEntry);
|
||||||
throw new RuntimeException("Registration failure");
|
throw new RuntimeException(request + " failure");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,13 +31,14 @@ public static PortmapMapping mapping(XDR xdr) {
|
|||||||
return PortmapMapping.deserialize(xdr);
|
return PortmapMapping.deserialize(xdr);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static XDR create(PortmapMapping mapping) {
|
public static XDR create(PortmapMapping mapping, boolean set) {
|
||||||
XDR request = new XDR();
|
XDR request = new XDR();
|
||||||
|
int procedure = set ? RpcProgramPortmap.PMAPPROC_SET
|
||||||
|
: RpcProgramPortmap.PMAPPROC_UNSET;
|
||||||
RpcCall call = RpcCall.getInstance(
|
RpcCall call = RpcCall.getInstance(
|
||||||
RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)),
|
RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)),
|
||||||
RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION,
|
RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION, procedure,
|
||||||
RpcProgramPortmap.PMAPPROC_SET, new CredentialsNone(),
|
new CredentialsNone(), new VerifierNone());
|
||||||
new VerifierNone());
|
|
||||||
call.write(request);
|
call.write(request);
|
||||||
return mapping.serialize(request);
|
return mapping.serialize(request);
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,10 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@ -29,7 +33,9 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Objects;
|
import com.google.common.base.Objects;
|
||||||
@ -94,7 +100,7 @@ public int hashCode() {
|
|||||||
DFSClientCache(Configuration config) {
|
DFSClientCache(Configuration config) {
|
||||||
this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
|
this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
DFSClientCache(Configuration config, int clientCache) {
|
DFSClientCache(Configuration config, int clientCache) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.clientCache = CacheBuilder.newBuilder()
|
this.clientCache = CacheBuilder.newBuilder()
|
||||||
@ -107,8 +113,52 @@ public int hashCode() {
|
|||||||
.expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS)
|
.expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS)
|
||||||
.removalListener(inputStreamRemovalListener())
|
.removalListener(inputStreamRemovalListener())
|
||||||
.build(inputStreamLoader());
|
.build(inputStreamLoader());
|
||||||
|
|
||||||
|
ShutdownHookManager.get().addShutdownHook(new CacheFinalizer(),
|
||||||
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Priority of the FileSystem shutdown hook.
|
||||||
|
*/
|
||||||
|
public static final int SHUTDOWN_HOOK_PRIORITY = 10;
|
||||||
|
|
||||||
|
private class CacheFinalizer implements Runnable {
|
||||||
|
@Override
|
||||||
|
public synchronized void run() {
|
||||||
|
try {
|
||||||
|
closeAll(true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("DFSClientCache.closeAll() threw an exception:\n", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close all DFSClient instances in the Cache.
|
||||||
|
* @param onlyAutomatic only close those that are marked for automatic closing
|
||||||
|
*/
|
||||||
|
synchronized void closeAll(boolean onlyAutomatic) throws IOException {
|
||||||
|
List<IOException> exceptions = new ArrayList<IOException>();
|
||||||
|
|
||||||
|
ConcurrentMap<String, DFSClient> map = clientCache.asMap();
|
||||||
|
|
||||||
|
for (Entry<String, DFSClient> item : map.entrySet()) {
|
||||||
|
final DFSClient client = item.getValue();
|
||||||
|
if (client != null) {
|
||||||
|
try {
|
||||||
|
client.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
exceptions.add(ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!exceptions.isEmpty()) {
|
||||||
|
throw MultipleIOException.createIOException(exceptions);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private CacheLoader<String, DFSClient> clientLoader() {
|
private CacheLoader<String, DFSClient> clientLoader() {
|
||||||
return new CacheLoader<String, DFSClient>() {
|
return new CacheLoader<String, DFSClient>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -1061,6 +1061,9 @@ Release 2.3.0 - UNRELEASED
|
|||||||
|
|
||||||
HDFS-5671. Fix socket leak in DFSInputStream#getBlockReader. (JamesLi via umamahesh)
|
HDFS-5671. Fix socket leak in DFSInputStream#getBlockReader. (JamesLi via umamahesh)
|
||||||
|
|
||||||
|
HDFS-5649. Unregister NFS and Mount service when NFS gateway is shutting down.
|
||||||
|
(brandonli)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
Loading…
Reference in New Issue
Block a user