HDFS-5171. NFS should create input stream for a file and try to share it with multiple read requests. Contributed by Haohui Mai

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1535586 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2013-10-24 23:40:34 +00:00
parent 6772d07fdc
commit 05f35518f1
4 changed files with 141 additions and 31 deletions

View File

@ -20,15 +20,19 @@
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
@ -41,15 +45,52 @@
class DFSClientCache { class DFSClientCache {
private static final Log LOG = LogFactory.getLog(DFSClientCache.class); private static final Log LOG = LogFactory.getLog(DFSClientCache.class);
/** /**
* Cache that maps User id to corresponding DFSClient. * Cache that maps User id to the corresponding DFSClient.
*/ */
@VisibleForTesting @VisibleForTesting
final LoadingCache<String, DFSClient> clientCache; final LoadingCache<String, DFSClient> clientCache;
final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256; final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256;
/**
* Cache that maps <DFSClient, inode path> to the corresponding
* FSDataInputStream.
*/
final LoadingCache<DFSInputStreamCaheKey, FSDataInputStream> inputstreamCache;
/**
* Time to live for a DFSClient (in seconds)
*/
final static int DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE = 1024;
final static int DEFAULT_DFS_INPUTSTREAM_CACHE_TTL = 10 * 60;
private final Configuration config; private final Configuration config;
private static class DFSInputStreamCaheKey {
final String userId;
final String inodePath;
private DFSInputStreamCaheKey(String userId, String inodePath) {
super();
this.userId = userId;
this.inodePath = inodePath;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof DFSInputStreamCaheKey) {
DFSInputStreamCaheKey k = (DFSInputStreamCaheKey) obj;
return userId.equals(k.userId) && inodePath.equals(k.inodePath);
}
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(userId, inodePath);
}
}
DFSClientCache(Configuration config) { DFSClientCache(Configuration config) {
this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE); this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
} }
@ -60,6 +101,12 @@ class DFSClientCache {
.maximumSize(clientCache) .maximumSize(clientCache)
.removalListener(clientRemovealListener()) .removalListener(clientRemovealListener())
.build(clientLoader()); .build(clientLoader());
this.inputstreamCache = CacheBuilder.newBuilder()
.maximumSize(DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE)
.expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS)
.removalListener(inputStreamRemovalListener())
.build(inputStreamLoader());
} }
private CacheLoader<String, DFSClient> clientLoader() { private CacheLoader<String, DFSClient> clientLoader() {
@ -95,7 +142,33 @@ public void onRemoval(RemovalNotification<String, DFSClient> notification) {
}; };
} }
DFSClient get(String userName) { private RemovalListener<DFSInputStreamCaheKey, FSDataInputStream> inputStreamRemovalListener() {
return new RemovalListener<DFSClientCache.DFSInputStreamCaheKey, FSDataInputStream>() {
@Override
public void onRemoval(
RemovalNotification<DFSInputStreamCaheKey, FSDataInputStream> notification) {
try {
notification.getValue().close();
} catch (IOException e) {
}
}
};
}
private CacheLoader<DFSInputStreamCaheKey, FSDataInputStream> inputStreamLoader() {
return new CacheLoader<DFSInputStreamCaheKey, FSDataInputStream>() {
@Override
public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception {
DFSClient client = getDfsClient(key.userId);
DFSInputStream dis = client.open(key.inodePath);
return new FSDataInputStream(dis);
}
};
}
DFSClient getDfsClient(String userName) {
DFSClient client = null; DFSClient client = null;
try { try {
client = clientCache.get(userName); client = clientCache.get(userName);
@ -105,4 +178,21 @@ DFSClient get(String userName) {
} }
return client; return client;
} }
FSDataInputStream getDfsInputStream(String userName, String inodePath) {
DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath);
FSDataInputStream s = null;
try {
s = inputstreamCache.get(k);
} catch (ExecutionException e) {
LOG.warn("Failed to create DFSInputStream for user:" + userName
+ " Cause:" + e);
}
return s;
}
public void invalidateDfsInputStream(String userName, String inodePath) {
DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath);
inputstreamCache.invalidate(k);
}
} }

View File

@ -39,7 +39,6 @@
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -235,7 +234,7 @@ public GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -310,7 +309,7 @@ private void setattrInternal(DFSClient dfsClient, String fileIdPath,
public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler, public SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -392,7 +391,7 @@ public LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -454,7 +453,7 @@ public ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -502,7 +501,7 @@ public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -563,13 +562,14 @@ public READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
public READ3Response read(XDR xdr, SecurityHandler securityHandler, public READ3Response read(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
READ3Response response = new READ3Response(Nfs3Status.NFS3_OK); READ3Response response = new READ3Response(Nfs3Status.NFS3_OK);
final String userName = securityHandler.getUser();
if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) { if (!checkAccessPrivilege(client, AccessPrivilege.READ_ONLY)) {
response.setStatus(Nfs3Status.NFS3ERR_ACCES); response.setStatus(Nfs3Status.NFS3ERR_ACCES);
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(userName);
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -628,11 +628,28 @@ public READ3Response read(XDR xdr, SecurityHandler securityHandler,
int buffSize = Math.min(MAX_READ_TRANSFER_SIZE, count); int buffSize = Math.min(MAX_READ_TRANSFER_SIZE, count);
byte[] readbuffer = new byte[buffSize]; byte[] readbuffer = new byte[buffSize];
DFSInputStream is = dfsClient.open(Nfs3Utils.getFileIdPath(handle)); int readCount = 0;
FSDataInputStream fis = new FSDataInputStream(is); /**
* Retry exactly once because the DFSInputStream can be stale.
*/
for (int i = 0; i < 1; ++i) {
FSDataInputStream fis = clientCache.getDfsInputStream(userName,
Nfs3Utils.getFileIdPath(handle));
int readCount = fis.read(offset, readbuffer, 0, count); try {
fis.close(); readCount = fis.read(offset, readbuffer, 0, count);
} catch (IOException e) {
// TODO: A cleaner way is to throw a new type of exception
// which requires incompatible changes.
if (e.getMessage() == "Stream closed") {
clientCache.invalidateDfsInputStream(userName,
Nfs3Utils.getFileIdPath(handle));
continue;
} else {
throw e;
}
}
}
attrs = Nfs3Utils.getFileAttr(dfsClient, Nfs3Utils.getFileIdPath(handle), attrs = Nfs3Utils.getFileAttr(dfsClient, Nfs3Utils.getFileIdPath(handle),
iug); iug);
@ -660,7 +677,7 @@ public WRITE3Response write(XDR xdr, Channel channel, int xid,
SecurityHandler securityHandler, InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -735,7 +752,7 @@ public WRITE3Response write(XDR xdr, Channel channel, int xid,
public CREATE3Response create(XDR xdr, SecurityHandler securityHandler, public CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -858,7 +875,7 @@ preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler, public MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK); MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -954,7 +971,7 @@ public READDIR3Response mknod(XDR xdr,
public REMOVE3Response remove(XDR xdr, public REMOVE3Response remove(XDR xdr,
SecurityHandler securityHandler, InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK); REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1029,7 +1046,7 @@ public REMOVE3Response remove(XDR xdr,
public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler, public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK); RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1111,7 +1128,7 @@ public RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler, public RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
InetAddress client) { InetAddress client) {
RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1205,7 +1222,7 @@ public SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1293,7 +1310,7 @@ public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1430,7 +1447,7 @@ public READDIRPLUS3Response readdirplus(XDR xdr,
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES); return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT); return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
} }
@ -1587,7 +1604,7 @@ public FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1645,7 +1662,7 @@ public FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1697,7 +1714,7 @@ public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
return response; return response;
} }
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;
@ -1738,7 +1755,7 @@ public PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
public COMMIT3Response commit(XDR xdr, Channel channel, int xid, public COMMIT3Response commit(XDR xdr, Channel channel, int xid,
SecurityHandler securityHandler, InetAddress client) { SecurityHandler securityHandler, InetAddress client) {
COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
DFSClient dfsClient = clientCache.get(securityHandler.getUser()); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
if (dfsClient == null) { if (dfsClient == null) {
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
return response; return response;

View File

@ -39,12 +39,12 @@ public void testEviction() throws IOException {
DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE); DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);
DFSClient c1 = cache.get("test1"); DFSClient c1 = cache.getDfsClient("test1");
assertTrue(cache.get("test1").toString().contains("ugi=test1")); assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1"));
assertEquals(c1, cache.get("test1")); assertEquals(c1, cache.getDfsClient("test1"));
assertFalse(isDfsClientClose(c1)); assertFalse(isDfsClientClose(c1));
cache.get("test2"); cache.getDfsClient("test2");
assertTrue(isDfsClientClose(c1)); assertTrue(isDfsClientClose(c1));
assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size()); assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size());
} }

View File

@ -440,6 +440,9 @@ Release 2.2.1 - UNRELEASED
HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers HDFS-5403. WebHdfs client cannot communicate with older WebHdfs servers
post HDFS-5306. (atm) post HDFS-5306. (atm)
HDFS-5171. NFS should create input stream for a file and try to share it
with multiple read requests. (Haohui Mai via brandonli)
Release 2.2.0 - 2013-10-13 Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES