diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java index 1203e893ef..d389347161 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/mount/MountdBase.java @@ -23,6 +23,7 @@ import org.apache.hadoop.oncrpc.SimpleTcpServer; import org.apache.hadoop.oncrpc.SimpleUdpServer; import org.apache.hadoop.portmap.PortmapMapping; +import org.apache.hadoop.util.ShutdownHookManager; /** * Main class for starting mountd daemon. This daemon implements the NFS @@ -71,8 +72,24 @@ public void start(boolean register) { startUDPServer(); startTCPServer(); if (register) { + ShutdownHookManager.get().addShutdownHook(new Unregister(), + SHUTDOWN_HOOK_PRIORITY); rpcProgram.register(PortmapMapping.TRANSPORT_UDP, udpBoundPort); rpcProgram.register(PortmapMapping.TRANSPORT_TCP, tcpBoundPort); } } + + /** + * Priority of the mountd shutdown hook. + */ + public static final int SHUTDOWN_HOOK_PRIORITY = 10; + + private class Unregister implements Runnable { + @Override + public synchronized void run() { + rpcProgram.unregister(PortmapMapping.TRANSPORT_UDP, udpBoundPort); + rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, tcpBoundPort); + } + } + } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java index 2b6943aada..6bfaf79a81 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Base.java @@ -23,6 +23,7 @@ import org.apache.hadoop.oncrpc.RpcProgram; import org.apache.hadoop.oncrpc.SimpleTcpServer; import org.apache.hadoop.portmap.PortmapMapping; +import org.apache.hadoop.util.ShutdownHookManager; /** * Nfs server. Supports NFS v3 using {@link RpcProgram}. @@ -50,6 +51,8 @@ public void start(boolean register) { startTCPServer(); // Start TCP server if (register) { + ShutdownHookManager.get().addShutdownHook(new Unregister(), + SHUTDOWN_HOOK_PRIORITY); rpcProgram.register(PortmapMapping.TRANSPORT_TCP, nfsBoundPort); } } @@ -61,4 +64,16 @@ private void startTCPServer() { tcpServer.run(); nfsBoundPort = tcpServer.getBoundPort(); } + + /** + * Priority of the nfsd shutdown hook. + */ + public static final int SHUTDOWN_HOOK_PRIORITY = 10; + + private class Unregister implements Runnable { + @Override + public synchronized void run() { + rpcProgram.unregister(PortmapMapping.TRANSPORT_TCP, nfsBoundPort); + } + } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java index 69adcedc7b..628a6e62b3 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/RpcProgram.java @@ -78,23 +78,41 @@ public void register(int transport, int boundPort) { for (int vers = lowProgVersion; vers <= highProgVersion; vers++) { PortmapMapping mapEntry = new PortmapMapping(progNumber, vers, transport, port); - register(mapEntry); + register(mapEntry, true); + } + } + + /** + * Unregister this program with the local portmapper. + */ + public void unregister(int transport, int boundPort) { + if (boundPort != port) { + LOG.info("The bound port is " + boundPort + + ", different with configured port " + port); + port = boundPort; + } + // Unregister all the program versions with portmapper for a given transport + for (int vers = lowProgVersion; vers <= highProgVersion; vers++) { + PortmapMapping mapEntry = new PortmapMapping(progNumber, vers, transport, + port); + register(mapEntry, false); } } /** * Register the program with Portmap or Rpcbind */ - protected void register(PortmapMapping mapEntry) { - XDR mappingRequest = PortmapRequest.create(mapEntry); + protected void register(PortmapMapping mapEntry, boolean set) { + XDR mappingRequest = PortmapRequest.create(mapEntry, set); SimpleUdpClient registrationClient = new SimpleUdpClient(host, RPCB_PORT, mappingRequest); try { registrationClient.run(); } catch (IOException e) { - LOG.error("Registration failure with " + host + ":" + port + String request = set ? "Registration" : "Unregistration"; + LOG.error(request + " failure with " + host + ":" + port + ", portmap entry: " + mapEntry); - throw new RuntimeException("Registration failure"); + throw new RuntimeException(request + " failure"); } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java index 2932c78237..b1c9138da4 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/portmap/PortmapRequest.java @@ -31,13 +31,14 @@ public static PortmapMapping mapping(XDR xdr) { return PortmapMapping.deserialize(xdr); } - public static XDR create(PortmapMapping mapping) { + public static XDR create(PortmapMapping mapping, boolean set) { XDR request = new XDR(); + int procedure = set ? RpcProgramPortmap.PMAPPROC_SET + : RpcProgramPortmap.PMAPPROC_UNSET; RpcCall call = RpcCall.getInstance( RpcUtil.getNewXid(String.valueOf(RpcProgramPortmap.PROGRAM)), - RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION, - RpcProgramPortmap.PMAPPROC_SET, new CredentialsNone(), - new VerifierNone()); + RpcProgramPortmap.PROGRAM, RpcProgramPortmap.VERSION, procedure, + new CredentialsNone(), new VerifierNone()); call.write(request); return mapping.serialize(request); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java index fcd22c8ed6..174496557e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java @@ -19,6 +19,10 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -29,7 +33,9 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ShutdownHookManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; @@ -94,7 +100,7 @@ public int hashCode() { DFSClientCache(Configuration config) { this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE); } - + DFSClientCache(Configuration config, int clientCache) { this.config = config; this.clientCache = CacheBuilder.newBuilder() @@ -107,8 +113,52 @@ public int hashCode() { .expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS) .removalListener(inputStreamRemovalListener()) .build(inputStreamLoader()); + + ShutdownHookManager.get().addShutdownHook(new CacheFinalizer(), + SHUTDOWN_HOOK_PRIORITY); } + /** + * Priority of the FileSystem shutdown hook. + */ + public static final int SHUTDOWN_HOOK_PRIORITY = 10; + + private class CacheFinalizer implements Runnable { + @Override + public synchronized void run() { + try { + closeAll(true); + } catch (IOException e) { + LOG.info("DFSClientCache.closeAll() threw an exception:\n", e); + } + } + } + + /** + * Close all DFSClient instances in the Cache. + * @param onlyAutomatic only close those that are marked for automatic closing + */ + synchronized void closeAll(boolean onlyAutomatic) throws IOException { + List exceptions = new ArrayList(); + + ConcurrentMap map = clientCache.asMap(); + + for (Entry item : map.entrySet()) { + final DFSClient client = item.getValue(); + if (client != null) { + try { + client.close(); + } catch (IOException ioe) { + exceptions.add(ioe); + } + } + } + + if (!exceptions.isEmpty()) { + throw MultipleIOException.createIOException(exceptions); + } + } + private CacheLoader clientLoader() { return new CacheLoader() { @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c63b0faf16..ca1e7b2078 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1061,6 +1061,9 @@ Release 2.3.0 - UNRELEASED HDFS-5671. Fix socket leak in DFSInputStream#getBlockReader. (JamesLi via umamahesh) + HDFS-5649. Unregister NFS and Mount service when NFS gateway is shutting down. + (brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES