From d6602b5f39833611b4afa4581552f6c4c37e23a8 Mon Sep 17 00:00:00 2001 From: Jitendra Pandey Date: Tue, 10 Oct 2017 09:49:46 -0700 Subject: [PATCH] HDFS-11575. Supporting HDFS NFS gateway with Federated HDFS. Contributed by Mukul Kumar Singh. --- .../apache/hadoop/nfs/nfs3/FileHandle.java | 51 ++- .../nfs/nfs3/request/WRITE3Request.java | 4 +- .../hdfs/nfs/mount/RpcProgramMountd.java | 81 ++-- .../hadoop/hdfs/nfs/nfs3/DFSClientCache.java | 174 ++++++--- .../hadoop/hdfs/nfs/nfs3/Nfs3Utils.java | 46 +++ .../hadoop/hdfs/nfs/nfs3/OpenFileCtx.java | 19 +- .../hdfs/nfs/nfs3/OpenFileCtxCache.java | 6 +- .../hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java | 364 ++++++++++-------- .../apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java | 8 +- .../hadoop/hdfs/nfs/nfs3/WriteManager.java | 24 +- .../nfs/nfs3/TestClientAccessPrivilege.java | 3 +- .../hdfs/nfs/nfs3/TestDFSClientCache.java | 13 +- .../hdfs/nfs/nfs3/TestExportsTable.java | 161 +++++++- .../hadoop/hdfs/nfs/nfs3/TestReaddir.java | 19 +- .../hdfs/nfs/nfs3/TestRpcProgramNfs3.java | 66 ++-- .../hdfs/nfs/nfs3/TestViewfsWithNfs3.java | 330 ++++++++++++++++ .../hadoop/hdfs/nfs/nfs3/TestWrites.java | 9 +- 17 files changed, 1072 insertions(+), 306 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestViewfsWithNfs3.java diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/FileHandle.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/FileHandle.java index 5b327986f1..910b8f2665 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/FileHandle.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/FileHandle.java @@ -38,17 +38,21 @@ public class FileHandle { private static final int HANDLE_LEN = 32; private byte[] handle; // Opaque handle private long fileId = -1; + private int namenodeId = -1; public FileHandle() { handle = null; } /** - * Handle is a 32 bytes number. For HDFS, the last 8 bytes is fileId. + * Handle is a 32 bytes number. For HDFS, the last 8 bytes is fileId + * For ViewFs, last 8 byte is fileId while 4 bytes before that is namenodeId * @param v file id + * @param n namenode id */ - public FileHandle(long v) { + public FileHandle(long v, int n) { fileId = v; + namenodeId = n; handle = new byte[HANDLE_LEN]; handle[0] = (byte)(v >>> 56); handle[1] = (byte)(v >>> 48); @@ -58,11 +62,20 @@ public FileHandle(long v) { handle[5] = (byte)(v >>> 16); handle[6] = (byte)(v >>> 8); handle[7] = (byte)(v >>> 0); - for (int i = 8; i < HANDLE_LEN; i++) { + + handle[8] = (byte) (n >>> 24); + handle[9] = (byte) (n >>> 16); + handle[10] = (byte) (n >>> 8); + handle[11] = (byte) (n >>> 0); + for (int i = 12; i < HANDLE_LEN; i++) { handle[i] = (byte) 0; } } - + + public FileHandle(long v) { + this(v, 0); + } + public FileHandle(String s) { MessageDigest digest; try { @@ -93,22 +106,32 @@ public boolean serialize(XDR out) { return true; } - private long bytesToLong(byte[] data) { + private long bytesToLong(byte[] data, int offset) { ByteBuffer buffer = ByteBuffer.allocate(8); for (int i = 0; i < 8; i++) { - buffer.put(data[i]); + buffer.put(data[i + offset]); } - buffer.flip();// need flip + buffer.flip(); // need flip return buffer.getLong(); } - + + private int bytesToInt(byte[] data, int offset) { + ByteBuffer buffer = ByteBuffer.allocate(4); + for (int i = 0; i < 4; i++) { + buffer.put(data[i + offset]); + } + buffer.flip(); // need flip + return buffer.getInt(); + } + public boolean deserialize(XDR xdr) { if (!XDR.verifyLength(xdr, 32)) { return false; } int size = xdr.readInt(); handle = xdr.readFixedOpaque(size); - fileId = bytesToLong(handle); + fileId = bytesToLong(handle, 0); + namenodeId = bytesToInt(handle, 8); return true; } @@ -122,11 +145,15 @@ private static String hex(byte b) { public long getFileId() { return fileId; } + + public int getNamenodeId() { + return namenodeId; + } public byte[] getContent() { return handle.clone(); } - + @Override public String toString() { StringBuilder s = new StringBuilder(); @@ -154,4 +181,8 @@ public boolean equals(Object o) { public int hashCode() { return Arrays.hashCode(handle); } + + public String dumpFileHandle() { + return "fileId: " + fileId + " namenodeId: " + namenodeId; + } } diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java index d85dcbbd78..86a40c797a 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java @@ -87,7 +87,7 @@ public void serialize(XDR xdr) { @Override public String toString() { - return String.format("fileId: %d offset: %d count: %d stableHow: %s", - handle.getFileId(), offset, count, stableHow.name()); + return String.format("fileHandle: %s offset: %d count: %d stableHow: %s", + handle.dumpFileHandle(), offset, count, stableHow.name()); } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java index e31bc711ad..4ae51c62a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/RpcProgramMountd.java @@ -19,17 +19,20 @@ import java.io.IOException; import java.net.DatagramSocket; import java.net.InetAddress; +import java.net.URI; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.HashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; +import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3Utils; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.mount.MountEntry; import org.apache.hadoop.mount.MountInterface; @@ -64,14 +67,12 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface { public static final int VERSION_2 = 2; public static final int VERSION_3 = 3; - private final DFSClient dfsClient; - - /** Synchronized list */ + /** Synchronized list. */ private final List mounts; - /** List that is unmodifiable */ - private final List exports; - + /** List that is unmodifiable. */ + private final HashMap exports; + private final NfsConfiguration config; private final NfsExports hostsMatcher; public RpcProgramMountd(NfsConfiguration config, @@ -84,17 +85,29 @@ public RpcProgramMountd(NfsConfiguration config, VERSION_3, registrationSocket, allowInsecurePorts, config.getInt( NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY, NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT)); - exports = new ArrayList(); - exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, - NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT)); + this.config = config; + exports = new HashMap<>(); + addExports(); this.hostsMatcher = NfsExports.getInstance(config); this.mounts = Collections.synchronizedList(new ArrayList()); UserGroupInformation.setConfiguration(config); SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY, NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY); - this.dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config), config); } - + + private void addExports() throws IOException { + FileSystem fs = FileSystem.get(config); + String[] exportsPath = + config.getStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, + NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT); + for (String exportPath : exportsPath) { + URI exportURI = Nfs3Utils.getResolvedURI(fs, exportPath); + LOG.info("FS:" + fs.getScheme() + " adding export Path:" + exportPath + + " with URI: " + exportURI.toString()); + exports.put(exportPath, exportURI); + } + } + @Override public XDR nullOp(XDR out, int xid, InetAddress client) { if (LOG.isDebugEnabled()) { @@ -125,17 +138,28 @@ public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) { if (LOG.isDebugEnabled()) { LOG.debug("Got host: " + host + " path: " + path); } - if (!exports.contains(path)) { + URI exportURI = exports.get(path); + if (exportURI == null) { LOG.info("Path " + path + " is not shared."); MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null); return out; } + DFSClient dfsClient = null; + try { + dfsClient = new DFSClient(exportURI, config); + } catch (Exception e) { + LOG.error("Can't get handle for export:" + path, e); + MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null); + return out; + } + FileHandle handle = null; try { - HdfsFileStatus exFileStatus = dfsClient.getFileInfo(path); - - handle = new FileHandle(exFileStatus.getFileId()); + HdfsFileStatus exFileStatus = dfsClient.getFileInfo(exportURI.getPath()); + + handle = new FileHandle(exFileStatus.getFileId(), + Nfs3Utils.getNamenodeId(config, exportURI)); } catch (IOException e) { LOG.error("Can't get handle for export:" + path, e); MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null); @@ -143,7 +167,8 @@ public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) { } assert (handle != null); - LOG.info("Giving handle (fileId:" + handle.getFileId() + LOG.info("Giving handle (fileHandle:" + handle.dumpFileHandle() + + " file URI: " + exportURI + ") to client for export " + path); mounts.add(new MountEntry(host, path)); @@ -195,7 +220,8 @@ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) { info.data().readBytes(data); XDR xdr = new XDR(data); XDR out = new XDR(); - InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress(); + InetAddress client = + ((InetSocketAddress) info.remoteAddress()).getAddress(); if (mntproc == MNTPROC.NULL) { out = nullOp(out, xid, client); @@ -214,16 +240,20 @@ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) { } else if (mntproc == MNTPROC.UMNTALL) { umntall(out, xid, client); } else if (mntproc == MNTPROC.EXPORT) { - // Currently only support one NFS export + // Currently only support one NFS export per namenode List hostsMatchers = new ArrayList(); if (hostsMatcher != null) { - hostsMatchers.add(hostsMatcher); - out = MountResponse.writeExportList(out, xid, exports, hostsMatchers); + List exportsList = getExports(); + for (int i = 0; i < exportsList.size(); i++) { + hostsMatchers.add(hostsMatcher); + } + out = MountResponse.writeExportList(out, xid, + exportsList, hostsMatchers); } else { // This means there are no valid exports provided. RpcAcceptedReply.getInstance(xid, - RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( - out); + RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()) + .write(out); } } else { // Invalid procedure @@ -231,7 +261,8 @@ RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write( out); } - ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); + ChannelBuffer buf = + ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer()); RpcResponse rsp = new RpcResponse(buf, info.remoteAddress()); RpcUtil.sendRpcResponse(ctx, rsp); } @@ -244,6 +275,6 @@ protected boolean isIdempotent(RpcCall call) { @VisibleForTesting public List getExports() { - return this.exports; + return new ArrayList<>(this.exports.keySet()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java index b946bcef06..9a9366fd43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java @@ -20,8 +20,11 @@ import org.apache.commons.logging.LogFactory; import java.io.IOException; +import java.net.URI; +import java.nio.file.FileSystemException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; @@ -31,9 +34,10 @@ import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSInputStream; -import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.security.UserGroupInformation; @@ -48,63 +52,97 @@ import com.google.common.cache.RemovalNotification; /** - * A cache saves DFSClient objects for different users + * A cache saves DFSClient objects for different users. */ class DFSClientCache { private static final Log LOG = LogFactory.getLog(DFSClientCache.class); /** * Cache that maps User id to the corresponding DFSClient. */ - @VisibleForTesting - final LoadingCache clientCache; + private final LoadingCache clientCache; final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256; /** - * Cache that maps to the corresponding + * Cache that maps to the corresponding * FSDataInputStream. */ - final LoadingCache inputstreamCache; + private final LoadingCache inputstreamCache; /** - * Time to live for a DFSClient (in seconds) + * Time to live for a DFSClient (in seconds). */ final static int DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE = 1024; final static int DEFAULT_DFS_INPUTSTREAM_CACHE_TTL = 10 * 60; private final NfsConfiguration config; + private final HashMap namenodeUriMap; - private static class DFSInputStreamCaheKey { - final String userId; - final String inodePath; + private static final class DFSInputStreamCacheKey { + private final String userId; + private final String inodePath; + private final int namenodeId; - private DFSInputStreamCaheKey(String userId, String inodePath) { + private DFSInputStreamCacheKey(String userId, String inodePath, + int namenodeId) { super(); this.userId = userId; this.inodePath = inodePath; + this.namenodeId = namenodeId; } @Override public boolean equals(Object obj) { - if (obj instanceof DFSInputStreamCaheKey) { - DFSInputStreamCaheKey k = (DFSInputStreamCaheKey) obj; - return userId.equals(k.userId) && inodePath.equals(k.inodePath); + if (obj instanceof DFSInputStreamCacheKey) { + DFSInputStreamCacheKey k = (DFSInputStreamCacheKey) obj; + return userId.equals(k.userId) && + inodePath.equals(k.inodePath) && + (namenodeId == k.namenodeId); } return false; } @Override public int hashCode() { - return Objects.hashCode(userId, inodePath); + return Objects.hashCode(userId, inodePath, namenodeId); } } - DFSClientCache(NfsConfiguration config) { + private static final class DfsClientKey { + private final String userName; + private final int namenodeId; + + private DfsClientKey(String userName, int namenodeId) { + this.userName = userName; + this.namenodeId = namenodeId; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof DfsClientKey) { + DfsClientKey k = (DfsClientKey) obj; + return userName.equals(k.userName) && + (namenodeId == k.namenodeId); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(userName, namenodeId); + } + } + + DFSClientCache(NfsConfiguration config) throws IOException { this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE); } - - DFSClientCache(NfsConfiguration config, int clientCache) { + + DFSClientCache(NfsConfiguration config, int clientCache) throws IOException { this.config = config; + namenodeUriMap = new HashMap<>(); + prepareAddressMap(); + this.clientCache = CacheBuilder.newBuilder() .maximumSize(clientCache) .removalListener(clientRemovalListener()) @@ -115,11 +153,36 @@ public int hashCode() { .expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS) .removalListener(inputStreamRemovalListener()) .build(inputStreamLoader()); - + ShutdownHookManager.get().addShutdownHook(new CacheFinalizer(), SHUTDOWN_HOOK_PRIORITY); } + private void prepareAddressMap() throws IOException { + FileSystem fs = FileSystem.get(config); + String[] exportsPath = + config.getStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, + NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT); + for (String exportPath : exportsPath) { + URI exportURI = Nfs3Utils.getResolvedURI(fs, exportPath); + int namenodeId = Nfs3Utils.getNamenodeId(config, exportURI); + URI value = namenodeUriMap.get(namenodeId); + // if a unique nnid, add it to the map + if (value == null) { + LOG.info("Added export:" + exportPath + " FileSystem URI:" + exportURI + + " with namenodeId:" + namenodeId); + namenodeUriMap.put(namenodeId, exportURI); + } else { + // if the nnid already exists, it better be the for the same namenode + String msg = String.format("FS:%s, Namenode ID collision for path:%s " + + "nnid:%s uri being added:%s existing uri:%s", fs.getScheme(), + exportPath, namenodeId, exportURI, value); + LOG.error(msg); + throw new FileSystemException(msg); + } + } + } + /** * Priority of the FileSystem shutdown hook. */ @@ -135,7 +198,12 @@ public synchronized void run() { } } } - + + @VisibleForTesting + public LoadingCache getClientCache() { + return clientCache; + } + /** * Close all DFSClient instances in the Cache. * @param onlyAutomatic only close those that are marked for automatic closing @@ -143,9 +211,9 @@ public synchronized void run() { synchronized void closeAll(boolean onlyAutomatic) throws IOException { List exceptions = new ArrayList(); - ConcurrentMap map = clientCache.asMap(); + ConcurrentMap map = clientCache.asMap(); - for (Entry item : map.entrySet()) { + for (Entry item : map.entrySet()) { final DFSClient client = item.getValue(); if (client != null) { try { @@ -160,20 +228,24 @@ synchronized void closeAll(boolean onlyAutomatic) throws IOException { throw MultipleIOException.createIOException(exceptions); } } - - private CacheLoader clientLoader() { - return new CacheLoader() { + + private CacheLoader clientLoader() { + return new CacheLoader() { @Override - public DFSClient load(String userName) throws Exception { + public DFSClient load(final DfsClientKey key) throws Exception { UserGroupInformation ugi = getUserGroupInformation( - userName, - UserGroupInformation.getCurrentUser()); + key.userName, UserGroupInformation.getCurrentUser()); // Guava requires CacheLoader never returns null. return ugi.doAs(new PrivilegedExceptionAction() { @Override public DFSClient run() throws IOException { - return new DFSClient(DFSUtilClient.getNNAddress(config), config); + URI namenodeURI = namenodeUriMap.get(key.namenodeId); + if (namenodeURI == null) { + throw new IOException("No namenode URI found for user:" + + key.userName + " namenodeId:" + key.namenodeId); + } + return new DFSClient(namenodeURI, config); } }); } @@ -181,7 +253,7 @@ public DFSClient run() throws IOException { } /** - * This method uses the currentUser, and real user to create a proxy + * This method uses the currentUser, and real user to create a proxy. * @param effectiveUser The user who is being proxied by the real user * @param realUser The actual user who does the command * @return Proxy UserGroupInformation @@ -204,10 +276,11 @@ UserGroupInformation getUserGroupInformation( return ugi; } - private RemovalListener clientRemovalListener() { - return new RemovalListener() { + private RemovalListener clientRemovalListener() { + return new RemovalListener() { @Override - public void onRemoval(RemovalNotification notification) { + public void onRemoval( + RemovalNotification notification) { DFSClient client = notification.getValue(); try { client.close(); @@ -220,12 +293,15 @@ public void onRemoval(RemovalNotification notification) { }; } - private RemovalListener inputStreamRemovalListener() { - return new RemovalListener() { + private RemovalListener + inputStreamRemovalListener() { + return new RemovalListener + () { @Override public void onRemoval( - RemovalNotification notification) { + RemovalNotification + notification) { try { notification.getValue().close(); } catch (IOException ignored) { @@ -234,22 +310,24 @@ public void onRemoval( }; } - private CacheLoader inputStreamLoader() { - return new CacheLoader() { + private CacheLoader + inputStreamLoader() { + return new CacheLoader() { @Override - public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception { - DFSClient client = getDfsClient(key.userId); + public FSDataInputStream + load(DFSInputStreamCacheKey key) throws Exception { + DFSClient client = getDfsClient(key.userId, key.namenodeId); DFSInputStream dis = client.open(key.inodePath); return client.createWrappedInputStream(dis); } }; } - DFSClient getDfsClient(String userName) { + DFSClient getDfsClient(String userName, int namenodeId) { DFSClient client = null; try { - client = clientCache.get(userName); + client = clientCache.get(new DfsClientKey(userName, namenodeId)); } catch (ExecutionException e) { LOG.error("Failed to create DFSClient for user:" + userName + " Cause:" + e); @@ -257,8 +335,10 @@ DFSClient getDfsClient(String userName) { return client; } - FSDataInputStream getDfsInputStream(String userName, String inodePath) { - DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath); + FSDataInputStream getDfsInputStream(String userName, String inodePath, + int namenodeId) { + DFSInputStreamCacheKey k = + new DFSInputStreamCacheKey(userName, inodePath, namenodeId); FSDataInputStream s = null; try { s = inputstreamCache.get(k); @@ -269,8 +349,10 @@ FSDataInputStream getDfsInputStream(String userName, String inodePath) { return s; } - public void invalidateDfsInputStream(String userName, String inodePath) { - DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath); + public void invalidateDfsInputStream(String userName, String inodePath, + int namenodeId) { + DFSInputStreamCacheKey k = + new DFSInputStreamCacheKey(userName, inodePath, namenodeId); inputstreamCache.invalidate(k); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java index e376ebd2e7..c6da1981f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java @@ -18,8 +18,17 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.file.FileSystemException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.viewfs.ViewFileSystem; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.nfs.NfsFileType; import org.apache.hadoop.nfs.NfsTime; @@ -224,4 +233,41 @@ public static byte[] longToByte(long v) { public static long getElapsedTime(long startTimeNano) { return System.nanoTime() - startTimeNano; } + + public static int getNamenodeId(Configuration conf) { + URI filesystemURI = FileSystem.getDefaultUri(conf); + return getNamenodeId(conf, filesystemURI); + } + + public static int getNamenodeId(Configuration conf, URI namenodeURI) { + InetSocketAddress address = + DFSUtilClient.getNNAddressCheckLogical(conf, namenodeURI); + return address.hashCode(); + } + + public static URI getResolvedURI(FileSystem fs, String exportPath) + throws IOException { + URI fsURI = fs.getUri(); + String scheme = fs.getScheme(); + if (scheme.equalsIgnoreCase(FsConstants.VIEWFS_SCHEME)) { + ViewFileSystem viewFs = (ViewFileSystem)fs; + ViewFileSystem.MountPoint[] mountPoints = viewFs.getMountPoints(); + for (ViewFileSystem.MountPoint mount : mountPoints) { + String mountedPath = mount.getMountedOnPath().toString(); + if (exportPath.startsWith(mountedPath)) { + String subpath = exportPath.substring(mountedPath.length()); + fsURI = mount.getTargetFileSystemURIs()[0].resolve(subpath); + break; + } + } + } else if (scheme.equalsIgnoreCase(HdfsConstants.HDFS_URI_SCHEME)) { + fsURI = fsURI.resolve(exportPath); + } + + if (!fsURI.getScheme().equalsIgnoreCase(HdfsConstants.HDFS_URI_SCHEME)) { + throw new FileSystemException("Only HDFS is supported as underlying" + + "FileSystem, fs scheme:" + scheme + " uri to be added" + fsURI); + } + return fsURI; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index 261701973f..5b7dc14a1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -442,7 +442,7 @@ public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request, if (!activeState) { LOG.info("OpenFileCtx is inactive, fileId: " - + request.getHandle().getFileId()); + + request.getHandle().dumpFileHandle()); WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); @@ -981,7 +981,8 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, * Check stream status to decide if it should be closed * @return true, remove stream; false, keep stream */ - public synchronized boolean streamCleanup(long fileId, long streamTimeout) { + public synchronized boolean streamCleanup(FileHandle handle, + long streamTimeout) { Preconditions .checkState(streamTimeout >= NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT); if (!activeState) { @@ -992,7 +993,8 @@ public synchronized boolean streamCleanup(long fileId, long streamTimeout) { // Check the stream timeout if (checkStreamTimeout(streamTimeout)) { if (LOG.isDebugEnabled()) { - LOG.debug("stream can be closed for fileId: " + fileId); + LOG.debug("stream can be closed for fileId: " + + handle.dumpFileHandle()); } flag = true; } @@ -1188,7 +1190,7 @@ private void doSingleWrite(final WriteCtx writeCtx) { FileHandle handle = writeCtx.getHandle(); if (LOG.isDebugEnabled()) { - LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " + LOG.debug("do write, fileHandle " + handle.dumpFileHandle() + " offset: " + offset + " length: " + count + " stableHow: " + stableHow.name()); } @@ -1213,8 +1215,9 @@ private void doSingleWrite(final WriteCtx writeCtx) { writeCtx.setDataState(WriteCtx.DataState.NO_DUMP); updateNonSequentialWriteInMemory(-count); if (LOG.isDebugEnabled()) { - LOG.debug("After writing " + handle.getFileId() + " at offset " - + offset + ", updated the memory count, new value: " + LOG.debug("After writing " + handle.dumpFileHandle() + + " at offset " + offset + + ", updated the memory count, new value: " + nonSequentialWriteInMemory.get()); } } @@ -1257,8 +1260,8 @@ private void doSingleWrite(final WriteCtx writeCtx) { processCommits(writeCtx.getOffset() + writeCtx.getCount()); } catch (IOException e) { - LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " - + offset + " and length " + count, e); + LOG.error("Error writing to fileHandle " + handle.dumpFileHandle() + + " at offset " + offset + " and length " + count, e); if (!writeCtx.getReplied()) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO); Nfs3Utils.writeChannel(channel, response.serialize( diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java index e26fac5ab8..e23e4905d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java @@ -156,7 +156,7 @@ void scan(long streamTimeout) { Entry pairs = it.next(); FileHandle handle = pairs.getKey(); OpenFileCtx ctx = pairs.getValue(); - if (!ctx.streamCleanup(handle.getFileId(), streamTimeout)) { + if (!ctx.streamCleanup(handle, streamTimeout)) { continue; } @@ -164,10 +164,10 @@ void scan(long streamTimeout) { synchronized (this) { OpenFileCtx ctx2 = openFileMap.get(handle); if (ctx2 != null) { - if (ctx2.streamCleanup(handle.getFileId(), streamTimeout)) { + if (ctx2.streamCleanup(handle, streamTimeout)) { openFileMap.remove(handle); if (LOG.isDebugEnabled()) { - LOG.debug("After remove stream " + handle.getFileId() + LOG.debug("After remove stream " + handle.dumpFileHandle() + ", the stream number:" + size()); } ctxToRemove.add(ctx2); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 7a6aa89fde..0db633f3ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -319,12 +319,6 @@ GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - GETATTR3Request request; try { request = GETATTR3Request.deserialize(xdr); @@ -335,9 +329,17 @@ GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler, } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("GETATTR for fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("GETATTR for fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } Nfs3FileAttributes attrs = null; @@ -412,11 +414,6 @@ public SETATTR3Response setattr(XDR xdr, RpcInfo info) { SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } SETATTR3Request request; try { @@ -428,9 +425,17 @@ SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler, } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS SETATTR fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("NFS SETATTR fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } if (request.getAttr().getUpdateFields().contains(SetAttrField.SIZE)) { @@ -498,12 +503,6 @@ LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - LOOKUP3Request request; try { request = LOOKUP3Request.deserialize(xdr); @@ -514,15 +513,22 @@ LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler, FileHandle dirHandle = request.getHandle(); String fileName = request.getName(); + int namenodeId = dirHandle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS LOOKUP dir fileId: " + dirHandle.getFileId() + " name: " - + fileName + " client: " + remoteAddress); + LOG.debug("NFS LOOKUP dir fileHandle: " + dirHandle.dumpFileHandle() + + " name: " + fileName + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } try { String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle); Nfs3FileAttributes postOpObjAttr = writeManager.getFileAttr(dfsClient, - dirHandle, fileName); + dirHandle, fileName, namenodeId); if (postOpObjAttr == null) { if (LOG.isDebugEnabled()) { LOG.debug("NFS LOOKUP fileId: " + dirHandle.getFileId() + " name: " @@ -540,7 +546,8 @@ LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler, LOG.info("Can't get path for dir fileId: " + dirHandle.getFileId()); return new LOOKUP3Response(Nfs3Status.NFS3ERR_STALE); } - FileHandle fileHandle = new FileHandle(postOpObjAttr.getFileId()); + FileHandle fileHandle = + new FileHandle(postOpObjAttr.getFileId(), namenodeId); return new LOOKUP3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr, postOpDirAttr); @@ -566,12 +573,6 @@ ACCESS3Response access(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - ACCESS3Request request; try { request = ACCESS3Request.deserialize(xdr); @@ -581,13 +582,21 @@ ACCESS3Response access(XDR xdr, SecurityHandler securityHandler, } FileHandle handle = request.getHandle(); - Nfs3FileAttributes attrs; + int namenodeId = handle.getNamenodeId(); - if (LOG.isDebugEnabled()) { - LOG.debug("NFS ACCESS fileId: " + handle.getFileId() + " client: " - + remoteAddress); + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } + if (LOG.isDebugEnabled()) { + LOG.debug("NFS ACCESS fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); + } + + Nfs3FileAttributes attrs; try { attrs = writeManager.getFileAttr(dfsClient, handle, iug); @@ -639,12 +648,6 @@ READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - READLINK3Request request; try { @@ -655,9 +658,17 @@ READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler, } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS READLINK fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("NFS READLINK fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } String fileIdPath = Nfs3Utils.getFileIdPath(handle); @@ -715,12 +726,6 @@ READ3Response read(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.getDfsClient(userName); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - READ3Request request; try { @@ -734,9 +739,16 @@ READ3Response read(XDR xdr, SecurityHandler securityHandler, int count = request.getCount(); FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset - + " count: " + count + " client: " + remoteAddress); + LOG.debug("NFS READ fileHandle: " + handle.dumpFileHandle()+ " offset: " + + offset + " count: " + count + " client: " + remoteAddress); + } + + DFSClient dfsClient = clientCache.getDfsClient(userName, namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } Nfs3FileAttributes attrs; @@ -791,7 +803,7 @@ READ3Response read(XDR xdr, SecurityHandler securityHandler, */ for (int i = 0; i < 1; ++i) { FSDataInputStream fis = clientCache.getDfsInputStream(userName, - Nfs3Utils.getFileIdPath(handle)); + Nfs3Utils.getFileIdPath(handle), namenodeId); if (fis == null) { return new READ3Response(Nfs3Status.NFS3ERR_ACCES); @@ -805,7 +817,7 @@ READ3Response read(XDR xdr, SecurityHandler securityHandler, // which requires incompatible changes. if (e.getMessage().equals("Stream closed")) { clientCache.invalidateDfsInputStream(userName, - Nfs3Utils.getFileIdPath(handle)); + Nfs3Utils.getFileIdPath(handle), namenodeId); continue; } else { throw e; @@ -850,11 +862,6 @@ WRITE3Response write(XDR xdr, Channel channel, int xid, SecurityHandler securityHandler, SocketAddress remoteAddress) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } WRITE3Request request; @@ -875,12 +882,20 @@ WRITE3Response write(XDR xdr, Channel channel, int xid, } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS WRITE fileId: " + handle.getFileId() + " offset: " + LOG.debug("NFS WRITE fileHandle: " + handle.dumpFileHandle() + " offset: " + offset + " length: " + count + " stableHow: " + stableHow.getValue() + " xid: " + xid + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + Nfs3FileAttributes preOpAttr = null; try { preOpAttr = writeManager.getFileAttr(dfsClient, handle, iug); @@ -932,11 +947,6 @@ public CREATE3Response create(XDR xdr, RpcInfo info) { CREATE3Response create(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } CREATE3Request request; @@ -949,11 +959,19 @@ CREATE3Response create(XDR xdr, SecurityHandler securityHandler, FileHandle dirHandle = request.getHandle(); String fileName = request.getName(); + int namenodeId = dirHandle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS CREATE dir fileId: " + dirHandle.getFileId() + LOG.debug("NFS CREATE dir fileHandle: " + dirHandle.dumpFileHandle() + " filename: " + fileName + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + int createMode = request.getMode(); if ((createMode != Nfs3Constant.CREATE_EXCLUSIVE) && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE) @@ -1016,7 +1034,7 @@ preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug, aixCompatMode, config); - fileHandle = new FileHandle(postOpObjAttr.getFileId()); + fileHandle = new FileHandle(postOpObjAttr.getFileId(), namenodeId); if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) { LOG.warn("Can't add more stream, close it." + " Future write will become append"); @@ -1066,11 +1084,6 @@ public MKDIR3Response mkdir(XDR xdr, RpcInfo info) { MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } MKDIR3Request request; @@ -1082,9 +1095,18 @@ MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler, } FileHandle dirHandle = request.getHandle(); String fileName = request.getName(); + int namenodeId = dirHandle.getNamenodeId(); + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + if (LOG.isDebugEnabled()) { - LOG.debug("NFS MKDIR dirId: " + dirHandle.getFileId() + " filename: " - + fileName + " client: " + remoteAddress); + LOG.debug("NFS MKDIR dirHandle: " + dirHandle.dumpFileHandle() + + " filename: " + fileName + " client: " + remoteAddress); } if (request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)) { @@ -1130,11 +1152,11 @@ MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler, setattrInternal(dfsClient, fileIdPath, setAttr3, false); postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); - objFileHandle = new FileHandle(postOpObjAttr.getFileId()); + objFileHandle = new FileHandle(postOpObjAttr.getFileId(), namenodeId); WccData dirWcc = Nfs3Utils.createWccData( Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug); return new MKDIR3Response(Nfs3Status.NFS3_OK, new FileHandle( - postOpObjAttr.getFileId()), postOpObjAttr, dirWcc); + postOpObjAttr.getFileId(), namenodeId), postOpObjAttr, dirWcc); } catch (IOException e) { LOG.warn("Exception ", e); // Try to return correct WccData @@ -1167,11 +1189,6 @@ public REMOVE3Response remove(XDR xdr, RpcInfo info) { REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } REMOVE3Request request; try { @@ -1181,12 +1198,21 @@ REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler, return new REMOVE3Response(Nfs3Status.NFS3ERR_INVAL); } FileHandle dirHandle = request.getHandle(); + int namenodeId = dirHandle.getNamenodeId(); + String fileName = request.getName(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS REMOVE dir fileId: " + dirHandle.getFileId() + LOG.debug("NFS REMOVE dir fileHandle: " + dirHandle.dumpFileHandle() + " fileName: " + fileName + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle); Nfs3FileAttributes preOpDirAttr = null; Nfs3FileAttributes postOpDirAttr = null; @@ -1247,11 +1273,6 @@ public RMDIR3Response rmdir(XDR xdr, RpcInfo info) { RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } RMDIR3Request request; try { @@ -1262,12 +1283,19 @@ RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler, } FileHandle dirHandle = request.getHandle(); String fileName = request.getName(); - + int namenodeId = dirHandle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS RMDIR dir fileId: " + dirHandle.getFileId() + LOG.debug("NFS RMDIR dir fileHandle: " + dirHandle.dumpFileHandle() + " fileName: " + fileName + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle); Nfs3FileAttributes preOpDirAttr = null; Nfs3FileAttributes postOpDirAttr = null; @@ -1332,11 +1360,6 @@ public RENAME3Response rename(XDR xdr, RpcInfo info) { RENAME3Response rename(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } RENAME3Request request = null; try { @@ -1347,13 +1370,28 @@ RENAME3Response rename(XDR xdr, SecurityHandler securityHandler, } FileHandle fromHandle = request.getFromDirHandle(); + int fromNamenodeId = fromHandle.getNamenodeId(); String fromName = request.getFromName(); FileHandle toHandle = request.getToDirHandle(); + int toNamenodeId = toHandle.getNamenodeId(); String toName = request.getToName(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS RENAME from: " + fromHandle.getFileId() + "/" + fromName - + " to: " + toHandle.getFileId() + "/" + toName + " client: " - + remoteAddress); + LOG.debug("NFS RENAME from: " + fromHandle.dumpFileHandle() + + "/" + fromName + " to: " + toHandle.dumpFileHandle() + + "/" + toName + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), fromNamenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + + if (fromNamenodeId != toNamenodeId) { + // renaming file from one namenode to another is not supported + response.setStatus(Nfs3Status.NFS3ERR_INVAL); + return response; } String fromDirFileIdPath = Nfs3Utils.getFileIdPath(fromHandle); @@ -1429,12 +1467,6 @@ SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - SYMLINK3Request request; try { request = SYMLINK3Request.deserialize(xdr); @@ -1448,11 +1480,20 @@ SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler, String name = request.getName(); String symData = request.getSymData(); String linkDirIdPath = Nfs3Utils.getFileIdPath(dirHandle); + int namenodeId = dirHandle.getNamenodeId(); + // Don't do any name check to source path, just leave it to HDFS String linkIdPath = linkDirIdPath + "/" + name; if (LOG.isDebugEnabled()) { LOG.debug("NFS SYMLINK, target: " + symData + " link: " + linkIdPath - + " client: " + remoteAddress); + + " namenodeId: " + namenodeId + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } try { @@ -1471,7 +1512,7 @@ SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler, .setPostOpAttr(Nfs3Utils.getFileAttr(dfsClient, linkDirIdPath, iug)); return new SYMLINK3Response(Nfs3Status.NFS3_OK, new FileHandle( - objAttr.getFileId()), objAttr, dirWcc); + objAttr.getFileId(), namenodeId), objAttr, dirWcc); } catch (IOException e) { LOG.warn("Exception: " + e); @@ -1524,12 +1565,6 @@ public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - READDIR3Request request; try { request = READDIR3Request.deserialize(xdr); @@ -1538,6 +1573,8 @@ public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler, return new READDIR3Response(Nfs3Status.NFS3ERR_INVAL); } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); + long cookie = request.getCookie(); if (cookie < 0) { LOG.error("Invalid READDIR request, with negative cookie: " + cookie); @@ -1550,8 +1587,16 @@ public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler, } if (LOG.isDebugEnabled()) { - LOG.debug("NFS READDIR fileId: " + handle.getFileId() + " cookie: " - + cookie + " count: " + count + " client: " + remoteAddress); + LOG.debug("NFS READDIR fileHandle: " + handle.dumpFileHandle() + + " cookie: " + cookie + " count: " + count + " client: " + + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } HdfsFileStatus dirStatus; @@ -1684,10 +1729,6 @@ READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler, return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES); } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT); - } READDIRPLUS3Request request = null; try { @@ -1698,6 +1739,7 @@ READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler, } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); long cookie = request.getCookie(); if (cookie < 0) { LOG.error("Invalid READDIRPLUS request, with negative cookie: " + cookie); @@ -1715,9 +1757,15 @@ READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler, } if (LOG.isDebugEnabled()) { - LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: " - + cookie + " dirCount: " + dirCount + " maxCount: " + maxCount - + " client: " + remoteAddress); + LOG.debug("NFS READDIRPLUS fileHandle: " + handle.dumpFileHandle() + + " cookie: " + cookie + " dirCount: " + dirCount + " maxCount: " + + maxCount + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT); } HdfsFileStatus dirStatus; @@ -1805,14 +1853,14 @@ READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler, entries[0] = new READDIRPLUS3Response.EntryPlus3( postOpDirAttr.getFileId(), ".", 0, postOpDirAttr, new FileHandle( - postOpDirAttr.getFileId())); + postOpDirAttr.getFileId(), namenodeId)); entries[1] = new READDIRPLUS3Response.EntryPlus3(dotdotFileId, "..", dotdotFileId, Nfs3Utils.getNfs3FileAttrFromFileStatus(dotdotStatus, - iug), new FileHandle(dotdotFileId)); + iug), new FileHandle(dotdotFileId, namenodeId)); for (int i = 2; i < n + 2; i++) { long fileId = fstatus[i - 2].getFileId(); - FileHandle childHandle = new FileHandle(fileId); + FileHandle childHandle = new FileHandle(fileId, namenodeId); Nfs3FileAttributes attr; try { attr = writeManager.getFileAttr(dfsClient, childHandle, iug); @@ -1829,7 +1877,7 @@ READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler, entries = new READDIRPLUS3Response.EntryPlus3[n]; for (int i = 0; i < n; i++) { long fileId = fstatus[i].getFileId(); - FileHandle childHandle = new FileHandle(fileId); + FileHandle childHandle = new FileHandle(fileId, namenodeId); Nfs3FileAttributes attr; try { attr = writeManager.getFileAttr(dfsClient, childHandle, iug); @@ -1863,11 +1911,6 @@ FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } FSSTAT3Request request; try { @@ -1878,9 +1921,17 @@ FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler, } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS FSSTAT fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("NFS FSSTAT fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } try { @@ -1938,12 +1989,6 @@ FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - FSINFO3Request request; try { request = FSINFO3Request.deserialize(xdr); @@ -1953,9 +1998,17 @@ FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler, } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS FSINFO fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("NFS FSINFO fileHandle: " + handle.dumpFileHandle() + +" client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } try { @@ -2003,12 +2056,6 @@ PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler, return response; } - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } - PATHCONF3Request request; try { request = PATHCONF3Request.deserialize(xdr); @@ -2019,10 +2066,18 @@ PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler, FileHandle handle = request.getHandle(); Nfs3FileAttributes attrs; + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS PATHCONF fileId: " + handle.getFileId() + " client: " - + remoteAddress); + LOG.debug("NFS PATHCONF fileHandle: " + handle.dumpFileHandle() + + " client: " + remoteAddress); + } + + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; } try { @@ -2055,11 +2110,6 @@ public COMMIT3Response commit(XDR xdr, RpcInfo info) { COMMIT3Response commit(XDR xdr, Channel channel, int xid, SecurityHandler securityHandler, SocketAddress remoteAddress) { COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); - DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); - if (dfsClient == null) { - response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); - return response; - } COMMIT3Request request; try { @@ -2071,12 +2121,20 @@ COMMIT3Response commit(XDR xdr, Channel channel, int xid, } FileHandle handle = request.getHandle(); + int namenodeId = handle.getNamenodeId(); if (LOG.isDebugEnabled()) { - LOG.debug("NFS COMMIT fileId: " + handle.getFileId() + " offset=" + LOG.debug("NFS COMMIT fileHandle: " + handle.dumpFileHandle() + " offset=" + request.getOffset() + " count=" + request.getCount() + " client: " + remoteAddress); } + DFSClient dfsClient = + clientCache.getDfsClient(securityHandler.getUser(), namenodeId); + if (dfsClient == null) { + response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); + return response; + } + String fileIdPath = Nfs3Utils.getFileIdPath(handle); Nfs3FileAttributes preOpAttr = null; try { @@ -2097,7 +2155,7 @@ COMMIT3Response commit(XDR xdr, Channel channel, int xid, // Insert commit as an async request writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid, - preOpAttr); + preOpAttr, namenodeId); return null; } catch (IOException e) { LOG.warn("Exception ", e); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java index f89679f073..5d667515a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java @@ -318,9 +318,9 @@ void setReplied(boolean replied) { @Override public String toString() { - return "Id:" + handle.getFileId() + " offset:" + getPlainOffset() + " " + - "count:" + count + " originalCount:" + getOriginalCount() + - " stableHow:" + stableHow + " replied:" + replied + " dataState:" + - dataState + " xid:" + xid; + return "FileHandle:" + handle.dumpFileHandle() + " offset:" + + getPlainOffset() + " " + "count:" + count + " originalCount:" + + getOriginalCount() + " stableHow:" + stableHow + " replied:" + + replied + " dataState:" + dataState + " xid:" + xid; } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java index 7810ce2d1a..0a3450d66e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java @@ -139,7 +139,8 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, FileHandle fileHandle = request.getHandle(); OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx == null) { - LOG.info("No opened stream for fileId: " + fileHandle.getFileId()); + LOG.info("No opened stream for fileHandle: " + + fileHandle.dumpFileHandle()); String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle.getFileId()); HdfsDataOutputStream fos = null; @@ -188,7 +189,8 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, try { fos.close(); } catch (IOException e) { - LOG.error("Can't close stream for fileId: " + handle.getFileId(), e); + LOG.error("Can't close stream for fileHandle: " + + handle.dumpFileHandle(), e); } // Notify client to retry WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); @@ -201,7 +203,8 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, } if (LOG.isDebugEnabled()) { - LOG.debug("Opened stream for appending file: " + fileHandle.getFileId()); + LOG.debug("Opened stream for appending file: " + + fileHandle.dumpFileHandle()); } } @@ -220,7 +223,7 @@ int commitBeforeRead(DFSClient dfsClient, FileHandle fileHandle, if (openFileCtx == null) { if (LOG.isDebugEnabled()) { - LOG.debug("No opened stream for fileId: " + fileHandle.getFileId() + LOG.debug("No opened stream for fileId: " + fileHandle.dumpFileHandle() + " commitOffset=" + commitOffset + ". Return success in this case."); } @@ -263,13 +266,14 @@ int commitBeforeRead(DFSClient dfsClient, FileHandle fileHandle, } void handleCommit(DFSClient dfsClient, FileHandle fileHandle, - long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) { + long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr, + int namenodeId) { long startTime = System.nanoTime(); int status; OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx == null) { - LOG.info("No opened stream for fileId: " + fileHandle.getFileId() + LOG.info("No opened stream for fileId: " + fileHandle.dumpFileHandle() + " commitOffset=" + commitOffset + ". Return success in this case."); status = Nfs3Status.NFS3_OK; @@ -304,7 +308,9 @@ void handleCommit(DFSClient dfsClient, FileHandle fileHandle, // Send out the response Nfs3FileAttributes postOpAttr = null; try { - postOpAttr = getFileAttr(dfsClient, new FileHandle(preOpAttr.getFileId()), iug); + postOpAttr = + getFileAttr(dfsClient, new FileHandle(preOpAttr.getFileId(), + namenodeId), iug); } catch (IOException e1) { LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileId(), e1); } @@ -334,13 +340,13 @@ Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle fileHandle, } Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle dirHandle, - String fileName) throws IOException { + String fileName, int namenodeId) throws IOException { String fileIdPath = Nfs3Utils.getFileIdPath(dirHandle) + "/" + fileName; Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug); if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) { OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr - .getFileId())); + .getFileId(), namenodeId)); if (openFileCtx != null) { attr.setSize(openFileCtx.getNextOffset()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java index b68fdb8510..007803d903 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java @@ -101,9 +101,10 @@ public void testClientAccessPrivilegeForRemove() throws Exception { // Create a remove request HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); handle.serialize(xdr_req); xdr_req.writeString("f1"); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java index 54b7ee7fd1..ba9d46e07d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestDFSClientCache.java @@ -43,15 +43,17 @@ public void testEviction() throws IOException { DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE); - DFSClient c1 = cache.getDfsClient("test1"); - assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1")); - assertEquals(c1, cache.getDfsClient("test1")); + int namenodeId = Nfs3Utils.getNamenodeId(conf); + DFSClient c1 = cache.getDfsClient("test1", namenodeId); + assertTrue(cache.getDfsClient("test1", namenodeId) + .toString().contains("ugi=test1")); + assertEquals(c1, cache.getDfsClient("test1", namenodeId)); assertFalse(isDfsClientClose(c1)); - cache.getDfsClient("test2"); + cache.getDfsClient("test2", namenodeId); assertTrue(isDfsClientClose(c1)); assertTrue("cache size should be the max size or less", - cache.clientCache.size() <= MAX_CACHE_SIZE); + cache.getClientCache().size() <= MAX_CACHE_SIZE); } @Test @@ -61,6 +63,7 @@ public void testGetUserGroupInformationSecure() throws IOException { NfsConfiguration conf = new NfsConfiguration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost"); UserGroupInformation currentUserUgi = UserGroupInformation.createRemoteUser(currentUser); currentUserUgi.setAuthenticationMethod(KERBEROS); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java index c802be072d..211a166cf0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestExportsTable.java @@ -21,7 +21,14 @@ import java.io.IOException; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.viewfs.ConfigUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; import org.apache.hadoop.hdfs.nfs.mount.Mountd; @@ -31,7 +38,152 @@ public class TestExportsTable { @Test - public void testExportPoint() throws IOException { + public void testHdfsExportPoint() throws IOException { + NfsConfiguration config = new NfsConfiguration(); + MiniDFSCluster cluster = null; + + // Use emphral port in case tests are running in parallel + config.setInt("nfs3.mountd.port", 0); + config.setInt("nfs3.server.port", 0); + config.set("nfs.http.address", "0.0.0.0:0"); + + try { + cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); + cluster.waitActive(); + + // Start nfs + final Nfs3 nfsServer = new Nfs3(config); + nfsServer.startServiceInternal(false); + + Mountd mountd = nfsServer.getMountd(); + RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram(); + assertTrue(rpcMount.getExports().size() == 1); + + String exportInMountd = rpcMount.getExports().get(0); + assertTrue(exportInMountd.equals("/")); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testViewFsExportPoint() throws IOException { + NfsConfiguration config = new NfsConfiguration(); + MiniDFSCluster cluster = null; + String clusterName = RandomStringUtils.randomAlphabetic(10); + + String exportPoint = "/hdfs1,/hdfs2"; + config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, exportPoint); + config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + FsConstants.VIEWFS_SCHEME + "://" + clusterName); + // Use emphral port in case tests are running in parallel + config.setInt("nfs3.mountd.port", 0); + config.setInt("nfs3.server.port", 0); + config.set("nfs.http.address", "0.0.0.0:0"); + + try { + cluster = + new MiniDFSCluster.Builder(config).nnTopology( + MiniDFSNNTopology.simpleFederatedTopology(2)) + .numDataNodes(2) + .build(); + cluster.waitActive(); + DistributedFileSystem hdfs1 = cluster.getFileSystem(0); + DistributedFileSystem hdfs2 = cluster.getFileSystem(1); + cluster.waitActive(); + Path base1 = new Path("/user1"); + Path base2 = new Path("/user2"); + hdfs1.delete(base1, true); + hdfs2.delete(base2, true); + hdfs1.mkdirs(base1); + hdfs2.mkdirs(base2); + ConfigUtil.addLink(config, clusterName, "/hdfs1", + hdfs1.makeQualified(base1).toUri()); + ConfigUtil.addLink(config, clusterName, "/hdfs2", + hdfs2.makeQualified(base2).toUri()); + + // Start nfs + final Nfs3 nfsServer = new Nfs3(config); + nfsServer.startServiceInternal(false); + + Mountd mountd = nfsServer.getMountd(); + RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram(); + assertTrue(rpcMount.getExports().size() == 2); + + String exportInMountd1 = rpcMount.getExports().get(0); + assertTrue(exportInMountd1.equals("/hdfs1")); + + String exportInMountd2 = rpcMount.getExports().get(1); + assertTrue(exportInMountd2.equals("/hdfs2")); + + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testViewFsInternalExportPoint() throws IOException { + NfsConfiguration config = new NfsConfiguration(); + MiniDFSCluster cluster = null; + String clusterName = RandomStringUtils.randomAlphabetic(10); + + String exportPoint = "/hdfs1/subpath"; + config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, exportPoint); + config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + FsConstants.VIEWFS_SCHEME + "://" + clusterName); + // Use emphral port in case tests are running in parallel + config.setInt("nfs3.mountd.port", 0); + config.setInt("nfs3.server.port", 0); + config.set("nfs.http.address", "0.0.0.0:0"); + + try { + cluster = + new MiniDFSCluster.Builder(config).nnTopology( + MiniDFSNNTopology.simpleFederatedTopology(2)) + .numDataNodes(2) + .build(); + cluster.waitActive(); + DistributedFileSystem hdfs1 = cluster.getFileSystem(0); + DistributedFileSystem hdfs2 = cluster.getFileSystem(1); + cluster.waitActive(); + Path base1 = new Path("/user1"); + Path base2 = new Path("/user2"); + hdfs1.delete(base1, true); + hdfs2.delete(base2, true); + hdfs1.mkdirs(base1); + hdfs2.mkdirs(base2); + ConfigUtil.addLink(config, clusterName, "/hdfs1", + hdfs1.makeQualified(base1).toUri()); + ConfigUtil.addLink(config, clusterName, "/hdfs2", + hdfs2.makeQualified(base2).toUri()); + Path subPath = new Path(base1, "subpath"); + hdfs1.delete(subPath, true); + hdfs1.mkdirs(subPath); + + // Start nfs + final Nfs3 nfsServer = new Nfs3(config); + nfsServer.startServiceInternal(false); + + Mountd mountd = nfsServer.getMountd(); + RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram(); + assertTrue(rpcMount.getExports().size() == 1); + + String exportInMountd = rpcMount.getExports().get(0); + assertTrue(exportInMountd.equals(exportPoint)); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testHdfsInternalExportPoint() throws IOException { NfsConfiguration config = new NfsConfiguration(); MiniDFSCluster cluster = null; @@ -40,10 +192,15 @@ public void testExportPoint() throws IOException { // Use emphral port in case tests are running in parallel config.setInt("nfs3.mountd.port", 0); config.setInt("nfs3.server.port", 0); - + config.set("nfs.http.address", "0.0.0.0:0"); + Path base = new Path(exportPoint); + try { cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); cluster.waitActive(); + DistributedFileSystem hdfs = cluster.getFileSystem(0); + hdfs.delete(base, true); + hdfs.mkdirs(base); // Start nfs final Nfs3 nfsServer = new Nfs3(config); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java index 05ba2db05a..0af7cedefb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestReaddir.java @@ -21,9 +21,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.List; import org.apache.hadoop.fs.Path; @@ -31,8 +29,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; -import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3; -import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.nfs.nfs3.FileHandle; @@ -40,15 +36,10 @@ import org.apache.hadoop.nfs.nfs3.response.READDIR3Response.Entry3; import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response; import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response.EntryPlus3; -import org.apache.hadoop.oncrpc.RpcInfo; -import org.apache.hadoop.oncrpc.RpcMessage; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.SecurityHandler; import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; import org.apache.hadoop.security.authorize.ProxyUsers; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -119,10 +110,11 @@ public void testReaddirBasic() throws IOException { // Get inodeId of /tmp HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); // Create related part of the XDR request XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); handle.serialize(xdr_req); xdr_req.writeLongAsHyper(0); // cookie xdr_req.writeLongAsHyper(0); // verifier @@ -139,7 +131,7 @@ public void testReaddirBasic() throws IOException { // Create related part of the XDR request xdr_req = new XDR(); - handle = new FileHandle(dirId); + handle = new FileHandle(dirId, namenodeId); handle.serialize(xdr_req); xdr_req.writeLongAsHyper(f2Id); // cookie xdr_req.writeLongAsHyper(0); // verifier @@ -167,10 +159,11 @@ public void testReaddirPlus() throws IOException { // Get inodeId of /tmp HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); // Create related part of the XDR request XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); handle.serialize(xdr_req); xdr_req.writeLongAsHyper(0); // cookie xdr_req.writeLongAsHyper(0); // verifier @@ -189,7 +182,7 @@ public void testReaddirPlus() throws IOException { // Create related part of the XDR request xdr_req = new XDR(); - handle = new FileHandle(dirId); + handle = new FileHandle(dirId, namenodeId); handle.serialize(xdr_req); xdr_req.writeLongAsHyper(f2Id); // cookie xdr_req.writeLongAsHyper(0); // verifier diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java index f308763b24..30ecc0b824 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java @@ -186,7 +186,8 @@ public void createFiles() throws IllegalArgumentException, IOException { public void testGetattr() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar"); long dirId = status.getFileId(); - FileHandle handle = new FileHandle(dirId); + int namenodeId = Nfs3Utils.getNamenodeId(config); + FileHandle handle = new FileHandle(dirId, namenodeId); XDR xdr_req = new XDR(); GETATTR3Request req = new GETATTR3Request(handle); req.serialize(xdr_req); @@ -209,8 +210,9 @@ public void testGetattr() throws Exception { public void testSetattr() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); SetAttr3 symAttr = new SetAttr3(0, 1, 0, 0, null, null, EnumSet.of(SetAttrField.UID)); SETATTR3Request req = new SETATTR3Request(handle, symAttr, false, null); @@ -234,7 +236,8 @@ public void testSetattr() throws Exception { public void testLookup() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); - FileHandle handle = new FileHandle(dirId); + int namenodeId = Nfs3Utils.getNamenodeId(config); + FileHandle handle = new FileHandle(dirId, namenodeId); LOOKUP3Request lookupReq = new LOOKUP3Request(handle, "bar"); XDR xdr_req = new XDR(); lookupReq.serialize(xdr_req); @@ -257,7 +260,8 @@ public void testLookup() throws Exception { public void testAccess() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar"); long dirId = status.getFileId(); - FileHandle handle = new FileHandle(dirId); + int namenodeId = Nfs3Utils.getNamenodeId(config); + FileHandle handle = new FileHandle(dirId, namenodeId); XDR xdr_req = new XDR(); ACCESS3Request req = new ACCESS3Request(handle); req.serialize(xdr_req); @@ -281,8 +285,9 @@ public void testReadlink() throws Exception { // Create a symlink first. HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); SYMLINK3Request req = new SYMLINK3Request(handle, "fubar", new SetAttr3(), "bar"); req.serialize(xdr_req); @@ -316,7 +321,8 @@ public void testReadlink() throws Exception { public void testRead() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar"); long dirId = status.getFileId(); - FileHandle handle = new FileHandle(dirId); + int namenodeId = Nfs3Utils.getNamenodeId(config); + FileHandle handle = new FileHandle(dirId, namenodeId); READ3Request readReq = new READ3Request(handle, 0, 5); XDR xdr_req = new XDR(); @@ -373,7 +379,8 @@ private void createFileUsingNfs(String fileName, byte[] buffer) final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName); final long dirId = status.getFileId(); - final FileHandle handle = new FileHandle(dirId); + final int namenodeId = Nfs3Utils.getNamenodeId(config); + final FileHandle handle = new FileHandle(dirId, namenodeId); final WRITE3Request writeReq = new WRITE3Request(handle, 0, buffer.length, WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer)); @@ -390,7 +397,8 @@ private byte[] getFileContentsUsingNfs(String fileName, int len) throws Exception { final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName); final long dirId = status.getFileId(); - final FileHandle handle = new FileHandle(dirId); + final int namenodeId = Nfs3Utils.getNamenodeId(config); + final FileHandle handle = new FileHandle(dirId, namenodeId); final READ3Request readReq = new READ3Request(handle, 0, len); final XDR xdr_req = new XDR(); @@ -422,7 +430,8 @@ private byte[] getFileContentsUsingDfs(String fileName, int len) private void commit(String fileName, int len) throws Exception { final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName); final long dirId = status.getFileId(); - final FileHandle handle = new FileHandle(dirId); + final int namenodeId = Nfs3Utils.getNamenodeId(config); + final FileHandle handle = new FileHandle(dirId, namenodeId); final XDR xdr_req = new XDR(); final COMMIT3Request req = new COMMIT3Request(handle, 0, len); req.serialize(xdr_req); @@ -439,7 +448,8 @@ private void commit(String fileName, int len) throws Exception { public void testWrite() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar"); long dirId = status.getFileId(); - FileHandle handle = new FileHandle(dirId); + int namenodeId = Nfs3Utils.getNamenodeId(config); + FileHandle handle = new FileHandle(dirId, namenodeId); byte[] buffer = new byte[10]; for (int i = 0; i < 10; i++) { @@ -469,8 +479,9 @@ public void testWrite() throws Exception { public void testCreate() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); CREATE3Request req = new CREATE3Request(handle, "fubar", Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0); req.serialize(xdr_req); @@ -493,8 +504,9 @@ public void testCreate() throws Exception { public void testMkdir() throws Exception {//FixME HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); MKDIR3Request req = new MKDIR3Request(handle, "fubar1", new SetAttr3()); req.serialize(xdr_req); @@ -520,8 +532,9 @@ public void testMkdir() throws Exception {//FixME public void testSymlink() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); SYMLINK3Request req = new SYMLINK3Request(handle, "fubar", new SetAttr3(), "bar"); req.serialize(xdr_req); @@ -544,8 +557,9 @@ public void testSymlink() throws Exception { public void testRemove() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); REMOVE3Request req = new REMOVE3Request(handle, "bar"); req.serialize(xdr_req); @@ -567,8 +581,9 @@ public void testRemove() throws Exception { public void testRmdir() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); RMDIR3Request req = new RMDIR3Request(handle, "foo"); req.serialize(xdr_req); @@ -590,8 +605,9 @@ public void testRmdir() throws Exception { public void testRename() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); + int namenodeId = Nfs3Utils.getNamenodeId(config); XDR xdr_req = new XDR(); - FileHandle handle = new FileHandle(dirId); + FileHandle handle = new FileHandle(dirId, namenodeId); RENAME3Request req = new RENAME3Request(handle, "bar", handle, "fubar"); req.serialize(xdr_req); @@ -613,7 +629,8 @@ public void testRename() throws Exception { public void testReaddir() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); - FileHandle handle = new FileHandle(dirId); + int namenodeId = Nfs3Utils.getNamenodeId(config); + FileHandle handle = new FileHandle(dirId, namenodeId); XDR xdr_req = new XDR(); READDIR3Request req = new READDIR3Request(handle, 0, 0, 100); req.serialize(xdr_req); @@ -636,7 +653,8 @@ public void testReaddir() throws Exception { public void testReaddirplus() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); long dirId = status.getFileId(); - FileHandle handle = new FileHandle(dirId); + int namenodeId = Nfs3Utils.getNamenodeId(config); + FileHandle handle = new FileHandle(dirId, namenodeId); XDR xdr_req = new XDR(); READDIRPLUS3Request req = new READDIRPLUS3Request(handle, 0, 0, 3, 2); req.serialize(xdr_req); @@ -659,7 +677,8 @@ public void testReaddirplus() throws Exception { public void testFsstat() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar"); long dirId = status.getFileId(); - FileHandle handle = new FileHandle(dirId); + int namenodeId = Nfs3Utils.getNamenodeId(config); + FileHandle handle = new FileHandle(dirId, namenodeId); XDR xdr_req = new XDR(); FSSTAT3Request req = new FSSTAT3Request(handle); req.serialize(xdr_req); @@ -682,7 +701,8 @@ public void testFsstat() throws Exception { public void testFsinfo() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar"); long dirId = status.getFileId(); - FileHandle handle = new FileHandle(dirId); + int namenodeId = Nfs3Utils.getNamenodeId(config); + FileHandle handle = new FileHandle(dirId, namenodeId); XDR xdr_req = new XDR(); FSINFO3Request req = new FSINFO3Request(handle); req.serialize(xdr_req); @@ -705,7 +725,8 @@ public void testFsinfo() throws Exception { public void testPathconf() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar"); long dirId = status.getFileId(); - FileHandle handle = new FileHandle(dirId); + int namenodeId = Nfs3Utils.getNamenodeId(config); + FileHandle handle = new FileHandle(dirId, namenodeId); XDR xdr_req = new XDR(); PATHCONF3Request req = new PATHCONF3Request(handle); req.serialize(xdr_req); @@ -728,7 +749,8 @@ public void testPathconf() throws Exception { public void testCommit() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar"); long dirId = status.getFileId(); - FileHandle handle = new FileHandle(dirId); + int namenodeId = Nfs3Utils.getNamenodeId(config); + FileHandle handle = new FileHandle(dirId, namenodeId); XDR xdr_req = new XDR(); COMMIT3Request req = new COMMIT3Request(handle, 0, 5); req.serialize(xdr_req); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestViewfsWithNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestViewfsWithNfs3.java new file mode 100644 index 0000000000..a5997b46a9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestViewfsWithNfs3.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.nfs.nfs3; + +import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FsConstants; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.viewfs.ConfigUtil; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.client.HdfsAdmin; +import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; +import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; +import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.nfs.nfs3.FileHandle; +import org.apache.hadoop.nfs.nfs3.Nfs3Constant; +import org.apache.hadoop.nfs.nfs3.Nfs3Status; +import org.apache.hadoop.nfs.nfs3.request.GETATTR3Request; +import org.apache.hadoop.nfs.nfs3.request.RENAME3Request; +import org.apache.hadoop.nfs.nfs3.response.GETATTR3Response; +import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; +import org.apache.hadoop.nfs.nfs3.response.RENAME3Response; +import org.apache.hadoop.nfs.nfs3.response.WRITE3Response; +import org.apache.hadoop.oncrpc.XDR; +import org.apache.hadoop.oncrpc.security.SecurityHandler; +import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.Assert; +import org.mockito.Mockito; + +import java.io.File; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + + +/** + * Tests for {@link RpcProgramNfs3} with + * {@link org.apache.hadoop.fs.viewfs.ViewFileSystem}. + */ +public class TestViewfsWithNfs3 { + private static DistributedFileSystem hdfs1; + private static DistributedFileSystem hdfs2; + private static MiniDFSCluster cluster = null; + private static NfsConfiguration config = new NfsConfiguration(); + private static HdfsAdmin dfsAdmin1; + private static HdfsAdmin dfsAdmin2; + private static FileSystem viewFs; + + private static NameNode nn1; + private static NameNode nn2; + private static Nfs3 nfs; + private static RpcProgramNfs3 nfsd; + private static RpcProgramMountd mountd; + private static SecurityHandler securityHandler; + private static FileSystemTestHelper fsHelper; + private static File testRootDir; + + @BeforeClass + public static void setup() throws Exception { + String currentUser = System.getProperty("user.name"); + + config.set("fs.permissions.umask-mode", "u=rwx,g=,o="); + config.set(DefaultImpersonationProvider.getTestProvider() + .getProxySuperuserGroupConfKey(currentUser), "*"); + config.set(DefaultImpersonationProvider.getTestProvider() + .getProxySuperuserIpConfKey(currentUser), "*"); + fsHelper = new FileSystemTestHelper(); + // Set up java key store + String testRoot = fsHelper.getTestRootDir(); + testRootDir = new File(testRoot).getAbsoluteFile(); + final Path jksPath = new Path(testRootDir.toString(), "test.jks"); + config.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, + JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()); + ProxyUsers.refreshSuperUserGroupsConfiguration(config); + + cluster = + new MiniDFSCluster.Builder(config).nnTopology( + MiniDFSNNTopology.simpleFederatedTopology(2)) + .numDataNodes(2) + .build(); + cluster.waitActive(); + hdfs1 = cluster.getFileSystem(0); + hdfs2 = cluster.getFileSystem(1); + + nn1 = cluster.getNameNode(0); + nn2 = cluster.getNameNode(1); + nn2.getServiceRpcAddress(); + dfsAdmin1 = new HdfsAdmin(cluster.getURI(0), config); + dfsAdmin2 = new HdfsAdmin(cluster.getURI(1), config); + + // Use ephemeral ports in case tests are running in parallel + config.setInt("nfs3.mountd.port", 0); + config.setInt("nfs3.server.port", 0); + config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + FsConstants.VIEWFS_URI.toString()); + // Start NFS with allowed.hosts set to "* rw" + config.set("dfs.nfs.exports.allowed.hosts", "* rw"); + + Path base1 = new Path("/user1"); + Path base2 = new Path("/user2"); + hdfs1.delete(base1, true); + hdfs2.delete(base2, true); + hdfs1.mkdirs(base1); + hdfs2.mkdirs(base2); + ConfigUtil.addLink(config, "/hdfs1", hdfs1.makeQualified(base1).toUri()); + ConfigUtil.addLink(config, "/hdfs2", hdfs2.makeQualified(base2).toUri()); + + + viewFs = FileSystem.get(config); + config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, + "/hdfs1", "/hdfs2"); + + nfs = new Nfs3(config); + nfs.startServiceInternal(false); + nfsd = (RpcProgramNfs3) nfs.getRpcProgram(); + mountd = (RpcProgramMountd) nfs.getMountd().getRpcProgram(); + + // Mock SecurityHandler which returns system user.name + securityHandler = Mockito.mock(SecurityHandler.class); + Mockito.when(securityHandler.getUser()).thenReturn(currentUser); + viewFs.delete(new Path("/hdfs2/dir2"), true); + viewFs.mkdirs(new Path("/hdfs2/dir2")); + DFSTestUtil.createFile(viewFs, new Path("/hdfs1/file1"), 0, (short) 1, 0); + DFSTestUtil.createFile(viewFs, new Path("/hdfs1/file2"), 0, (short) 1, 0); + DFSTestUtil.createFile(viewFs, new Path("/hdfs1/write1"), 0, (short) 1, 0); + DFSTestUtil.createFile(viewFs, new Path("/hdfs2/write2"), 0, (short) 1, 0); + DFSTestUtil.createFile(viewFs, new Path("/hdfs1/renameMultiNN"), + 0, (short) 1, 0); + DFSTestUtil.createFile(viewFs, new Path("/hdfs1/renameSingleNN"), + 0, (short) 1, 0); + } + + @AfterClass + public static void shutdown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testNumExports() throws Exception { + Assert.assertEquals(mountd.getExports().size(), + viewFs.getChildFileSystems().length); + } + + @Test + public void testPaths() throws Exception { + Assert.assertEquals(hdfs1.resolvePath(new Path("/user1/file1")), + viewFs.resolvePath(new Path("/hdfs1/file1"))); + Assert.assertEquals(hdfs1.resolvePath(new Path("/user1/file2")), + viewFs.resolvePath(new Path("/hdfs1/file2"))); + Assert.assertEquals(hdfs2.resolvePath(new Path("/user2/dir2")), + viewFs.resolvePath(new Path("/hdfs2/dir2"))); + } + + @Test + public void testFileStatus() throws Exception { + HdfsFileStatus status = nn1.getRpcServer().getFileInfo("/user1/file1"); + FileStatus st = viewFs.getFileStatus(new Path("/hdfs1/file1")); + Assert.assertEquals(st.isDirectory(), status.isDirectory()); + + HdfsFileStatus status2 = nn2.getRpcServer().getFileInfo("/user2/dir2"); + FileStatus st2 = viewFs.getFileStatus(new Path("/hdfs2/dir2")); + Assert.assertEquals(st2.isDirectory(), status2.isDirectory()); + } + + // Test for getattr + private void testNfsGetAttrResponse(long fileId, int namenodeId, + int expectedStatus) { + FileHandle handle = new FileHandle(fileId, namenodeId); + XDR xdrReq = new XDR(); + GETATTR3Request req = new GETATTR3Request(handle); + req.serialize(xdrReq); + GETATTR3Response response = nfsd.getattr(xdrReq.asReadOnlyWrap(), + securityHandler, new InetSocketAddress("localhost", 1234)); + Assert.assertEquals("Incorrect return code", + expectedStatus, response.getStatus()); + } + + @Test (timeout = 60000) + public void testNfsAccessNN1() throws Exception { + HdfsFileStatus status = nn1.getRpcServer().getFileInfo("/user1/file1"); + int namenodeId = Nfs3Utils.getNamenodeId(config, hdfs1.getUri()); + testNfsGetAttrResponse(status.getFileId(), namenodeId, Nfs3Status.NFS3_OK); + } + + @Test (timeout = 60000) + public void testNfsAccessNN2() throws Exception { + HdfsFileStatus status = nn2.getRpcServer().getFileInfo("/user2/dir2"); + int namenodeId = Nfs3Utils.getNamenodeId(config, hdfs2.getUri()); + testNfsGetAttrResponse(status.getFileId(), namenodeId, Nfs3Status.NFS3_OK); + } + + @Test (timeout = 60000) + public void testWrongNfsAccess() throws Exception { + DFSTestUtil.createFile(viewFs, new Path("/hdfs1/file3"), 0, (short) 1, 0); + HdfsFileStatus status = nn1.getRpcServer().getFileInfo("/user1/file3"); + int namenodeId = Nfs3Utils.getNamenodeId(config, hdfs2.getUri()); + testNfsGetAttrResponse(status.getFileId(), namenodeId, + Nfs3Status.NFS3ERR_IO); + } + + // Test for write + private void testNfsWriteResponse(long dirId, int namenodeId) + throws Exception { + FileHandle handle = new FileHandle(dirId, namenodeId); + + byte[] buffer = new byte[10]; + for (int i = 0; i < 10; i++) { + buffer[i] = (byte) i; + } + + WRITE3Request writeReq = new WRITE3Request(handle, 0, 10, + Nfs3Constant.WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer)); + XDR xdrReq = new XDR(); + writeReq.serialize(xdrReq); + + // Attempt by a priviledged user should pass. + WRITE3Response response = nfsd.write(xdrReq.asReadOnlyWrap(), + null, 1, securityHandler, + new InetSocketAddress("localhost", 1234)); + Assert.assertEquals("Incorrect response:", null, response); + } + + @Test (timeout = 60000) + public void testNfsWriteNN1() throws Exception { + HdfsFileStatus status = nn1.getRpcServer().getFileInfo("/user1/write1"); + int namenodeId = Nfs3Utils.getNamenodeId(config, hdfs1.getUri()); + testNfsWriteResponse(status.getFileId(), namenodeId); + } + + @Test (timeout = 60000) + public void testNfsWriteNN2() throws Exception { + HdfsFileStatus status = nn2.getRpcServer().getFileInfo("/user2/write2"); + int namenodeId = Nfs3Utils.getNamenodeId(config, hdfs2.getUri()); + testNfsWriteResponse(status.getFileId(), namenodeId); + } + + // Test for rename + private void testNfsRename(FileHandle fromDirHandle, String fromFileName, + FileHandle toDirHandle, String toFileName, + int expectedStatus) throws Exception { + XDR xdrReq = new XDR(); + RENAME3Request req = new RENAME3Request(fromDirHandle, fromFileName, + toDirHandle, toFileName); + req.serialize(xdrReq); + + // Attempt by a privileged user should pass. + RENAME3Response response = nfsd.rename(xdrReq.asReadOnlyWrap(), + securityHandler, new InetSocketAddress("localhost", 1234)); + assertEquals(expectedStatus, response.getStatus()); + } + + @Test (timeout = 60000) + public void testNfsRenameMultiNN() throws Exception { + HdfsFileStatus fromFileStatus = nn1.getRpcServer().getFileInfo("/user1"); + int fromNNId = Nfs3Utils.getNamenodeId(config, hdfs1.getUri()); + FileHandle fromHandle = + new FileHandle(fromFileStatus.getFileId(), fromNNId); + + HdfsFileStatus toFileStatus = nn2.getRpcServer().getFileInfo("/user2"); + int toNNId = Nfs3Utils.getNamenodeId(config, hdfs2.getUri()); + FileHandle toHandle = new FileHandle(toFileStatus.getFileId(), toNNId); + + HdfsFileStatus statusBeforeRename = + nn1.getRpcServer().getFileInfo("/user1/renameMultiNN"); + Assert.assertEquals(statusBeforeRename.isDirectory(), false); + + testNfsRename(fromHandle, "renameMultiNN", + toHandle, "renameMultiNNFail", Nfs3Status.NFS3ERR_INVAL); + + HdfsFileStatus statusAfterRename = + nn2.getRpcServer().getFileInfo("/user2/renameMultiNNFail"); + Assert.assertEquals(statusAfterRename, null); + + statusAfterRename = nn1.getRpcServer().getFileInfo("/user1/renameMultiNN"); + Assert.assertEquals(statusAfterRename.isDirectory(), false); + } + + @Test (timeout = 60000) + public void testNfsRenameSingleNN() throws Exception { + HdfsFileStatus fromFileStatus = nn1.getRpcServer().getFileInfo("/user1"); + int fromNNId = Nfs3Utils.getNamenodeId(config, hdfs1.getUri()); + FileHandle fromHandle = + new FileHandle(fromFileStatus.getFileId(), fromNNId); + + HdfsFileStatus statusBeforeRename = + nn1.getRpcServer().getFileInfo("/user1/renameSingleNN"); + Assert.assertEquals(statusBeforeRename.isDirectory(), false); + + testNfsRename(fromHandle, "renameSingleNN", + fromHandle, "renameSingleNNSucess", Nfs3Status.NFS3_OK); + + HdfsFileStatus statusAfterRename = + nn1.getRpcServer().getFileInfo("/user1/renameSingleNNSucess"); + Assert.assertEquals(statusAfterRename.isDirectory(), false); + + statusAfterRename = + nn1.getRpcServer().getFileInfo("/user1/renameSingleNN"); + Assert.assertEquals(statusAfterRename, null); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index 9c327c425e..f7a92fac53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -481,6 +481,7 @@ public void testWriteStableHow() throws IOException, InterruptedException { cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); cluster.waitActive(); client = new DFSClient(DFSUtilClient.getNNAddress(config), config); + int namenodeId = Nfs3Utils.getNamenodeId(config); // Use emphral port in case tests are running in parallel config.setInt("nfs3.mountd.port", 0); @@ -492,7 +493,7 @@ public void testWriteStableHow() throws IOException, InterruptedException { nfsd = (RpcProgramNfs3) nfs3.getRpcProgram(); HdfsFileStatus status = client.getFileInfo("/"); - FileHandle rootHandle = new FileHandle(status.getFileId()); + FileHandle rootHandle = new FileHandle(status.getFileId(), namenodeId); // Create file1 CREATE3Request createReq = new CREATE3Request(rootHandle, "file1", Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0); @@ -598,8 +599,9 @@ public void testOOOWrites() throws IOException, InterruptedException { DFSClient dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config), config); + int namenodeId = Nfs3Utils.getNamenodeId(config); HdfsFileStatus status = dfsClient.getFileInfo("/"); - FileHandle rootHandle = new FileHandle(status.getFileId()); + FileHandle rootHandle = new FileHandle(status.getFileId(), namenodeId); CREATE3Request createReq = new CREATE3Request(rootHandle, "out-of-order-write" + System.currentTimeMillis(), @@ -674,8 +676,9 @@ public void testOverlappingWrites() throws IOException, InterruptedException { DFSClient dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config), config); + int namenodeId = Nfs3Utils.getNamenodeId(config); HdfsFileStatus status = dfsClient.getFileInfo("/"); - FileHandle rootHandle = new FileHandle(status.getFileId()); + FileHandle rootHandle = new FileHandle(status.getFileId(), namenodeId); CREATE3Request createReq = new CREATE3Request(rootHandle, "overlapping-writes" + System.currentTimeMillis(),