From 05f35518f19d48890770128727289582cca3457b Mon Sep 17 00:00:00 2001 From: Brandon Li Date: Thu, 24 Oct 2013 23:40:34 +0000 Subject: [PATCH] HDFS-5171. NFS should create input stream for a file and try to share it with multiple read requests. Contributed by Haohui Mai git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1535586 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hdfs/nfs/nfs3/DFSClientCache.java | 94 ++++++++++++++++++- .../hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java | 67 ++++++++----- .../hdfs/nfs/nfs3/TestDFSClientCache.java | 8 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + 4 files changed, 141 insertions(+), 31 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java index c50e9a3cc0..c7265ea2dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java @@ -20,15 +20,19 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.security.UserGroupInformation; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -41,15 +45,52 @@ class DFSClientCache { private static final Log LOG = LogFactory.getLog(DFSClientCache.class); /** - * Cache that maps User id to corresponding DFSClient. + * Cache that maps User id to the corresponding DFSClient. */ @VisibleForTesting final LoadingCache clientCache; final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256; + /** + * Cache that maps to the corresponding + * FSDataInputStream. + */ + final LoadingCache inputstreamCache; + + /** + * Time to live for a DFSClient (in seconds) + */ + final static int DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE = 1024; + final static int DEFAULT_DFS_INPUTSTREAM_CACHE_TTL = 10 * 60; + private final Configuration config; + private static class DFSInputStreamCaheKey { + final String userId; + final String inodePath; + + private DFSInputStreamCaheKey(String userId, String inodePath) { + super(); + this.userId = userId; + this.inodePath = inodePath; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof DFSInputStreamCaheKey) { + DFSInputStreamCaheKey k = (DFSInputStreamCaheKey) obj; + return userId.equals(k.userId) && inodePath.equals(k.inodePath); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(userId, inodePath); + } + } + DFSClientCache(Configuration config) { this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE); } @@ -60,6 +101,12 @@ class DFSClientCache { .maximumSize(clientCache) .removalListener(clientRemovealListener()) .build(clientLoader()); + + this.inputstreamCache = CacheBuilder.newBuilder() + .maximumSize(DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE) + .expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS) + .removalListener(inputStreamRemovalListener()) + .build(inputStreamLoader()); } private CacheLoader clientLoader() { @@ -95,7 +142,33 @@ public void onRemoval(RemovalNotification notification) { }; } - DFSClient get(String userName) { + private RemovalListener inputStreamRemovalListener() { + return new RemovalListener() { + + @Override + public void onRemoval( + RemovalNotification notification) { + try { + notification.getValue().close(); + } catch (IOException e) { + } + } + }; + } + + private CacheLoader inputStreamLoader() { + return new CacheLoader() { + + @Override + public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception { + DFSClient client = getDfsClient(key.userId); + DFSInputStream dis = client.open(key.inodePath); + return new FSDataInputStream(dis); + } + }; + } + + DFSClient getDfsClient(String userName) { DFSClient client = null; try { client = clientCache.get(userName); @@ -105,4 +178,21 @@ DFSClient get(String userName) { } return client; } + + FSDataInputStream getDfsInputStream(String userName, String inodePath) { + DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath); + FSDataInputStream s = null; + try { + s = inputstreamCache.get(k); + } catch (ExecutionException e) { + LOG.warn("Failed to create DFSInputStream for user:" + userName + + " Cause:" + e); + } + return s; + } + + public void invalidateDfsInputStream(String userName, String inodePath) { + DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath); + inputstreamCache.invalidate(k); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 554f702dfd..612171e084 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -235,7 +234,7 @@ public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -310,7 +309,7 @@ private void setattrInternal(DFSClient dfsClient, String fileIdPath, public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler, InetAddress client) { SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -392,7 +391,7 @@ public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -454,7 +453,7 @@ public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -502,7 +501,7 @@ public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -563,13 +562,14 @@ public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler, public READ3Response read(XDR xdr, SecurityHandler securityHandler, InetAddress client) { READ3Response response = new READ3Response(Nfs3Status.NFS3_OK); + final String userName = securityHandler.getUser(); if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { response.setStatus(Nfs3Status.NFS3ERR_ACCES); return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(userName); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -628,11 +628,28 @@ public READ3Response read(XDR xdr, SecurityHandler securityHandler, int buffSize = Math.min(MAX_READ_TRANSFER_SIZE, count); byte[] readbuffer = new byte[buffSize]; - DFSInputStream is = dfsClient.open(Nfs3Utils.getFileIdPath(handle)); - FSDataInputStream fis = new FSDataInputStream(is); - - int readCount = fis.read(offset, readbuffer, 0, count); - fis.close(); + int readCount = 0; + /** + * Retry exactly once because the DFSInputStream can be stale. + */ + for (int i = 0; i < 1; ++i) { + FSDataInputStream fis = clientCache.getDfsInputStream(userName, + Nfs3Utils.getFileIdPath(handle)); + + try { + readCount = fis.read(offset, readbuffer, 0, count); + } catch (IOException e) { + // TODO: A cleaner way is to throw a new type of exception + // which requires incompatible changes. + if (e.getMessage() == "Stream closed") { + clientCache.invalidateDfsInputStream(userName, + Nfs3Utils.getFileIdPath(handle)); + continue; + } else { + throw e; + } + } + } attrs = Nfs3Utils.getFileAttr(dfsClient, Nfs3Utils.getFileIdPath(handle), iug); @@ -660,7 +677,7 @@ public WRITE3Response write(XDR xdr, Channel channel, int xid, SecurityHandler securityHandler, InetAddress client) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -735,7 +752,7 @@ public WRITE3Response write(XDR xdr, Channel channel, int xid, public CREATE3Response create(XDR xdr, SecurityHandler securityHandler, InetAddress client) { CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -858,7 +875,7 @@ preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler, InetAddress client) { MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -954,7 +971,7 @@ public READDIR3Response mknod(XDR xdr, public REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler, InetAddress client) { REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1029,7 +1046,7 @@ public REMOVE3Response remove(XDR xdr, public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler, InetAddress client) { RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1111,7 +1128,7 @@ public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler, public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler, InetAddress client) { RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1205,7 +1222,7 @@ public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1293,7 +1310,7 @@ public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1430,7 +1447,7 @@ public READDIRPLUS3Response readdirplus(XDR xdr, return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES); } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT); } @@ -1587,7 +1604,7 @@ public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1645,7 +1662,7 @@ public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1697,7 +1714,7 @@ public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; @@ -1738,7 +1755,7 @@ public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler, public COMMIT3Response commit(XDR xdr, Channel channel, int xid, SecurityHandler securityHandler, InetAddress client) { COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.get(securityHandler.getUser()); + DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); return response; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java index c550c1e02b..360bb145fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java @@ -39,12 +39,12 @@ public void testEviction() throws IOException { DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE); - DFSClient c1 = cache.get("test1"); - assertTrue(cache.get("test1").toString().contains("ugi=test1")); - assertEquals(c1, cache.get("test1")); + DFSClient c1 = cache.getDfsClient("test1"); + assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1")); + assertEquals(c1, cache.getDfsClient("test1")); assertFalse(isDfsClientClose(c1)); - cache.get("test2"); + cache.getDfsClient("test2"); assertTrue(isDfsClientClose(c1)); assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 43c8dca5ad..0e8d274f9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -440,6 +440,9 @@ Release 2.2.1 - UNRELEASED HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers post HDFS-5306. (atm) + HDFS-5171. NFS should create input stream for a file and try to share it + with multiple read requests. (Haohui Mai via brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES