HDFS-11575. Supporting HDFS NFS gateway with Federated HDFS. Contributed by Mukul Kumar Singh.
This commit is contained in:
parent
ec8bf9e48a
commit
d6602b5f39
@ -38,17 +38,21 @@ public class FileHandle {
|
||||
private static final int HANDLE_LEN = 32;
|
||||
private byte[] handle; // Opaque handle
|
||||
private long fileId = -1;
|
||||
private int namenodeId = -1;
|
||||
|
||||
public FileHandle() {
|
||||
handle = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle is a 32 bytes number. For HDFS, the last 8 bytes is fileId.
|
||||
* Handle is a 32 bytes number. For HDFS, the last 8 bytes is fileId
|
||||
* For ViewFs, last 8 byte is fileId while 4 bytes before that is namenodeId
|
||||
* @param v file id
|
||||
* @param n namenode id
|
||||
*/
|
||||
public FileHandle(long v) {
|
||||
public FileHandle(long v, int n) {
|
||||
fileId = v;
|
||||
namenodeId = n;
|
||||
handle = new byte[HANDLE_LEN];
|
||||
handle[0] = (byte)(v >>> 56);
|
||||
handle[1] = (byte)(v >>> 48);
|
||||
@ -58,11 +62,20 @@ public FileHandle(long v) {
|
||||
handle[5] = (byte)(v >>> 16);
|
||||
handle[6] = (byte)(v >>> 8);
|
||||
handle[7] = (byte)(v >>> 0);
|
||||
for (int i = 8; i < HANDLE_LEN; i++) {
|
||||
|
||||
handle[8] = (byte) (n >>> 24);
|
||||
handle[9] = (byte) (n >>> 16);
|
||||
handle[10] = (byte) (n >>> 8);
|
||||
handle[11] = (byte) (n >>> 0);
|
||||
for (int i = 12; i < HANDLE_LEN; i++) {
|
||||
handle[i] = (byte) 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public FileHandle(long v) {
|
||||
this(v, 0);
|
||||
}
|
||||
|
||||
public FileHandle(String s) {
|
||||
MessageDigest digest;
|
||||
try {
|
||||
@ -93,22 +106,32 @@ public boolean serialize(XDR out) {
|
||||
return true;
|
||||
}
|
||||
|
||||
private long bytesToLong(byte[] data) {
|
||||
private long bytesToLong(byte[] data, int offset) {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(8);
|
||||
for (int i = 0; i < 8; i++) {
|
||||
buffer.put(data[i]);
|
||||
buffer.put(data[i + offset]);
|
||||
}
|
||||
buffer.flip();// need flip
|
||||
buffer.flip(); // need flip
|
||||
return buffer.getLong();
|
||||
}
|
||||
|
||||
|
||||
private int bytesToInt(byte[] data, int offset) {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(4);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
buffer.put(data[i + offset]);
|
||||
}
|
||||
buffer.flip(); // need flip
|
||||
return buffer.getInt();
|
||||
}
|
||||
|
||||
public boolean deserialize(XDR xdr) {
|
||||
if (!XDR.verifyLength(xdr, 32)) {
|
||||
return false;
|
||||
}
|
||||
int size = xdr.readInt();
|
||||
handle = xdr.readFixedOpaque(size);
|
||||
fileId = bytesToLong(handle);
|
||||
fileId = bytesToLong(handle, 0);
|
||||
namenodeId = bytesToInt(handle, 8);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -122,11 +145,15 @@ private static String hex(byte b) {
|
||||
public long getFileId() {
|
||||
return fileId;
|
||||
}
|
||||
|
||||
public int getNamenodeId() {
|
||||
return namenodeId;
|
||||
}
|
||||
|
||||
public byte[] getContent() {
|
||||
return handle.clone();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder s = new StringBuilder();
|
||||
@ -154,4 +181,8 @@ public boolean equals(Object o) {
|
||||
public int hashCode() {
|
||||
return Arrays.hashCode(handle);
|
||||
}
|
||||
|
||||
public String dumpFileHandle() {
|
||||
return "fileId: " + fileId + " namenodeId: " + namenodeId;
|
||||
}
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ public void serialize(XDR xdr) {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("fileId: %d offset: %d count: %d stableHow: %s",
|
||||
handle.getFileId(), offset, count, stableHow.name());
|
||||
return String.format("fileHandle: %s offset: %d count: %d stableHow: %s",
|
||||
handle.dumpFileHandle(), offset, count, stableHow.name());
|
||||
}
|
||||
}
|
@ -19,17 +19,20 @@
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
|
||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3Utils;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.mount.MountEntry;
|
||||
import org.apache.hadoop.mount.MountInterface;
|
||||
@ -64,14 +67,12 @@ public class RpcProgramMountd extends RpcProgram implements MountInterface {
|
||||
public static final int VERSION_2 = 2;
|
||||
public static final int VERSION_3 = 3;
|
||||
|
||||
private final DFSClient dfsClient;
|
||||
|
||||
/** Synchronized list */
|
||||
/** Synchronized list. */
|
||||
private final List<MountEntry> mounts;
|
||||
|
||||
/** List that is unmodifiable */
|
||||
private final List<String> exports;
|
||||
|
||||
/** List that is unmodifiable. */
|
||||
private final HashMap<String, URI> exports;
|
||||
private final NfsConfiguration config;
|
||||
private final NfsExports hostsMatcher;
|
||||
|
||||
public RpcProgramMountd(NfsConfiguration config,
|
||||
@ -84,17 +85,29 @@ public RpcProgramMountd(NfsConfiguration config,
|
||||
VERSION_3, registrationSocket, allowInsecurePorts, config.getInt(
|
||||
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_KEY,
|
||||
NfsConfigKeys.NFS_UDP_CLIENT_PORTMAP_TIMEOUT_MILLIS_DEFAULT));
|
||||
exports = new ArrayList<String>();
|
||||
exports.add(config.get(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY,
|
||||
NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT));
|
||||
this.config = config;
|
||||
exports = new HashMap<>();
|
||||
addExports();
|
||||
this.hostsMatcher = NfsExports.getInstance(config);
|
||||
this.mounts = Collections.synchronizedList(new ArrayList<MountEntry>());
|
||||
UserGroupInformation.setConfiguration(config);
|
||||
SecurityUtil.login(config, NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY,
|
||||
NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY);
|
||||
this.dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config), config);
|
||||
}
|
||||
|
||||
|
||||
private void addExports() throws IOException {
|
||||
FileSystem fs = FileSystem.get(config);
|
||||
String[] exportsPath =
|
||||
config.getStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY,
|
||||
NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT);
|
||||
for (String exportPath : exportsPath) {
|
||||
URI exportURI = Nfs3Utils.getResolvedURI(fs, exportPath);
|
||||
LOG.info("FS:" + fs.getScheme() + " adding export Path:" + exportPath +
|
||||
" with URI: " + exportURI.toString());
|
||||
exports.put(exportPath, exportURI);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XDR nullOp(XDR out, int xid, InetAddress client) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -125,17 +138,28 @@ public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Got host: " + host + " path: " + path);
|
||||
}
|
||||
if (!exports.contains(path)) {
|
||||
URI exportURI = exports.get(path);
|
||||
if (exportURI == null) {
|
||||
LOG.info("Path " + path + " is not shared.");
|
||||
MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null);
|
||||
return out;
|
||||
}
|
||||
|
||||
DFSClient dfsClient = null;
|
||||
try {
|
||||
dfsClient = new DFSClient(exportURI, config);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Can't get handle for export:" + path, e);
|
||||
MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null);
|
||||
return out;
|
||||
}
|
||||
|
||||
FileHandle handle = null;
|
||||
try {
|
||||
HdfsFileStatus exFileStatus = dfsClient.getFileInfo(path);
|
||||
|
||||
handle = new FileHandle(exFileStatus.getFileId());
|
||||
HdfsFileStatus exFileStatus = dfsClient.getFileInfo(exportURI.getPath());
|
||||
|
||||
handle = new FileHandle(exFileStatus.getFileId(),
|
||||
Nfs3Utils.getNamenodeId(config, exportURI));
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't get handle for export:" + path, e);
|
||||
MountResponse.writeMNTResponse(Nfs3Status.NFS3ERR_NOENT, out, xid, null);
|
||||
@ -143,7 +167,8 @@ public XDR mnt(XDR xdr, XDR out, int xid, InetAddress client) {
|
||||
}
|
||||
|
||||
assert (handle != null);
|
||||
LOG.info("Giving handle (fileId:" + handle.getFileId()
|
||||
LOG.info("Giving handle (fileHandle:" + handle.dumpFileHandle()
|
||||
+ " file URI: " + exportURI
|
||||
+ ") to client for export " + path);
|
||||
mounts.add(new MountEntry(host, path));
|
||||
|
||||
@ -195,7 +220,8 @@ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
|
||||
info.data().readBytes(data);
|
||||
XDR xdr = new XDR(data);
|
||||
XDR out = new XDR();
|
||||
InetAddress client = ((InetSocketAddress) info.remoteAddress()).getAddress();
|
||||
InetAddress client =
|
||||
((InetSocketAddress) info.remoteAddress()).getAddress();
|
||||
|
||||
if (mntproc == MNTPROC.NULL) {
|
||||
out = nullOp(out, xid, client);
|
||||
@ -214,16 +240,20 @@ public void handleInternal(ChannelHandlerContext ctx, RpcInfo info) {
|
||||
} else if (mntproc == MNTPROC.UMNTALL) {
|
||||
umntall(out, xid, client);
|
||||
} else if (mntproc == MNTPROC.EXPORT) {
|
||||
// Currently only support one NFS export
|
||||
// Currently only support one NFS export per namenode
|
||||
List<NfsExports> hostsMatchers = new ArrayList<NfsExports>();
|
||||
if (hostsMatcher != null) {
|
||||
hostsMatchers.add(hostsMatcher);
|
||||
out = MountResponse.writeExportList(out, xid, exports, hostsMatchers);
|
||||
List exportsList = getExports();
|
||||
for (int i = 0; i < exportsList.size(); i++) {
|
||||
hostsMatchers.add(hostsMatcher);
|
||||
}
|
||||
out = MountResponse.writeExportList(out, xid,
|
||||
exportsList, hostsMatchers);
|
||||
} else {
|
||||
// This means there are no valid exports provided.
|
||||
RpcAcceptedReply.getInstance(xid,
|
||||
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
|
||||
out);
|
||||
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone())
|
||||
.write(out);
|
||||
}
|
||||
} else {
|
||||
// Invalid procedure
|
||||
@ -231,7 +261,8 @@ RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
|
||||
RpcAcceptedReply.AcceptState.PROC_UNAVAIL, new VerifierNone()).write(
|
||||
out);
|
||||
}
|
||||
ChannelBuffer buf = ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
|
||||
ChannelBuffer buf =
|
||||
ChannelBuffers.wrappedBuffer(out.asReadOnlyWrap().buffer());
|
||||
RpcResponse rsp = new RpcResponse(buf, info.remoteAddress());
|
||||
RpcUtil.sendRpcResponse(ctx, rsp);
|
||||
}
|
||||
@ -244,6 +275,6 @@ protected boolean isIdempotent(RpcCall call) {
|
||||
|
||||
@VisibleForTesting
|
||||
public List<String> getExports() {
|
||||
return this.exports;
|
||||
return new ArrayList<>(this.exports.keySet());
|
||||
}
|
||||
}
|
||||
|
@ -20,8 +20,11 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.FileSystemException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
@ -31,9 +34,10 @@
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSInputStream;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
|
||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
@ -48,63 +52,97 @@
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
|
||||
/**
|
||||
* A cache saves DFSClient objects for different users
|
||||
* A cache saves DFSClient objects for different users.
|
||||
*/
|
||||
class DFSClientCache {
|
||||
private static final Log LOG = LogFactory.getLog(DFSClientCache.class);
|
||||
/**
|
||||
* Cache that maps User id to the corresponding DFSClient.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
final LoadingCache<String, DFSClient> clientCache;
|
||||
private final LoadingCache<DfsClientKey, DFSClient> clientCache;
|
||||
|
||||
final static int DEFAULT_DFS_CLIENT_CACHE_SIZE = 256;
|
||||
|
||||
/**
|
||||
* Cache that maps <DFSClient, inode path> to the corresponding
|
||||
* Cache that maps <DFSClient, inode path, nnid> to the corresponding
|
||||
* FSDataInputStream.
|
||||
*/
|
||||
final LoadingCache<DFSInputStreamCaheKey, FSDataInputStream> inputstreamCache;
|
||||
private final LoadingCache<DFSInputStreamCacheKey,
|
||||
FSDataInputStream> inputstreamCache;
|
||||
|
||||
/**
|
||||
* Time to live for a DFSClient (in seconds)
|
||||
* Time to live for a DFSClient (in seconds).
|
||||
*/
|
||||
final static int DEFAULT_DFS_INPUTSTREAM_CACHE_SIZE = 1024;
|
||||
final static int DEFAULT_DFS_INPUTSTREAM_CACHE_TTL = 10 * 60;
|
||||
|
||||
private final NfsConfiguration config;
|
||||
private final HashMap<Integer, URI> namenodeUriMap;
|
||||
|
||||
private static class DFSInputStreamCaheKey {
|
||||
final String userId;
|
||||
final String inodePath;
|
||||
private static final class DFSInputStreamCacheKey {
|
||||
private final String userId;
|
||||
private final String inodePath;
|
||||
private final int namenodeId;
|
||||
|
||||
private DFSInputStreamCaheKey(String userId, String inodePath) {
|
||||
private DFSInputStreamCacheKey(String userId, String inodePath,
|
||||
int namenodeId) {
|
||||
super();
|
||||
this.userId = userId;
|
||||
this.inodePath = inodePath;
|
||||
this.namenodeId = namenodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof DFSInputStreamCaheKey) {
|
||||
DFSInputStreamCaheKey k = (DFSInputStreamCaheKey) obj;
|
||||
return userId.equals(k.userId) && inodePath.equals(k.inodePath);
|
||||
if (obj instanceof DFSInputStreamCacheKey) {
|
||||
DFSInputStreamCacheKey k = (DFSInputStreamCacheKey) obj;
|
||||
return userId.equals(k.userId) &&
|
||||
inodePath.equals(k.inodePath) &&
|
||||
(namenodeId == k.namenodeId);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(userId, inodePath);
|
||||
return Objects.hashCode(userId, inodePath, namenodeId);
|
||||
}
|
||||
}
|
||||
|
||||
DFSClientCache(NfsConfiguration config) {
|
||||
private static final class DfsClientKey {
|
||||
private final String userName;
|
||||
private final int namenodeId;
|
||||
|
||||
private DfsClientKey(String userName, int namenodeId) {
|
||||
this.userName = userName;
|
||||
this.namenodeId = namenodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof DfsClientKey) {
|
||||
DfsClientKey k = (DfsClientKey) obj;
|
||||
return userName.equals(k.userName) &&
|
||||
(namenodeId == k.namenodeId);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(userName, namenodeId);
|
||||
}
|
||||
}
|
||||
|
||||
DFSClientCache(NfsConfiguration config) throws IOException {
|
||||
this(config, DEFAULT_DFS_CLIENT_CACHE_SIZE);
|
||||
}
|
||||
|
||||
DFSClientCache(NfsConfiguration config, int clientCache) {
|
||||
|
||||
DFSClientCache(NfsConfiguration config, int clientCache) throws IOException {
|
||||
this.config = config;
|
||||
namenodeUriMap = new HashMap<>();
|
||||
prepareAddressMap();
|
||||
|
||||
this.clientCache = CacheBuilder.newBuilder()
|
||||
.maximumSize(clientCache)
|
||||
.removalListener(clientRemovalListener())
|
||||
@ -115,11 +153,36 @@ public int hashCode() {
|
||||
.expireAfterAccess(DEFAULT_DFS_INPUTSTREAM_CACHE_TTL, TimeUnit.SECONDS)
|
||||
.removalListener(inputStreamRemovalListener())
|
||||
.build(inputStreamLoader());
|
||||
|
||||
|
||||
ShutdownHookManager.get().addShutdownHook(new CacheFinalizer(),
|
||||
SHUTDOWN_HOOK_PRIORITY);
|
||||
}
|
||||
|
||||
private void prepareAddressMap() throws IOException {
|
||||
FileSystem fs = FileSystem.get(config);
|
||||
String[] exportsPath =
|
||||
config.getStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY,
|
||||
NfsConfigKeys.DFS_NFS_EXPORT_POINT_DEFAULT);
|
||||
for (String exportPath : exportsPath) {
|
||||
URI exportURI = Nfs3Utils.getResolvedURI(fs, exportPath);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config, exportURI);
|
||||
URI value = namenodeUriMap.get(namenodeId);
|
||||
// if a unique nnid, add it to the map
|
||||
if (value == null) {
|
||||
LOG.info("Added export:" + exportPath + " FileSystem URI:" + exportURI
|
||||
+ " with namenodeId:" + namenodeId);
|
||||
namenodeUriMap.put(namenodeId, exportURI);
|
||||
} else {
|
||||
// if the nnid already exists, it better be the for the same namenode
|
||||
String msg = String.format("FS:%s, Namenode ID collision for path:%s "
|
||||
+ "nnid:%s uri being added:%s existing uri:%s", fs.getScheme(),
|
||||
exportPath, namenodeId, exportURI, value);
|
||||
LOG.error(msg);
|
||||
throw new FileSystemException(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Priority of the FileSystem shutdown hook.
|
||||
*/
|
||||
@ -135,7 +198,12 @@ public synchronized void run() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@VisibleForTesting
|
||||
public LoadingCache<DfsClientKey, DFSClient> getClientCache() {
|
||||
return clientCache;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close all DFSClient instances in the Cache.
|
||||
* @param onlyAutomatic only close those that are marked for automatic closing
|
||||
@ -143,9 +211,9 @@ public synchronized void run() {
|
||||
synchronized void closeAll(boolean onlyAutomatic) throws IOException {
|
||||
List<IOException> exceptions = new ArrayList<IOException>();
|
||||
|
||||
ConcurrentMap<String, DFSClient> map = clientCache.asMap();
|
||||
ConcurrentMap<DfsClientKey, DFSClient> map = clientCache.asMap();
|
||||
|
||||
for (Entry<String, DFSClient> item : map.entrySet()) {
|
||||
for (Entry<DfsClientKey, DFSClient> item : map.entrySet()) {
|
||||
final DFSClient client = item.getValue();
|
||||
if (client != null) {
|
||||
try {
|
||||
@ -160,20 +228,24 @@ synchronized void closeAll(boolean onlyAutomatic) throws IOException {
|
||||
throw MultipleIOException.createIOException(exceptions);
|
||||
}
|
||||
}
|
||||
|
||||
private CacheLoader<String, DFSClient> clientLoader() {
|
||||
return new CacheLoader<String, DFSClient>() {
|
||||
|
||||
private CacheLoader<DfsClientKey, DFSClient> clientLoader() {
|
||||
return new CacheLoader<DfsClientKey, DFSClient>() {
|
||||
@Override
|
||||
public DFSClient load(String userName) throws Exception {
|
||||
public DFSClient load(final DfsClientKey key) throws Exception {
|
||||
UserGroupInformation ugi = getUserGroupInformation(
|
||||
userName,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
key.userName, UserGroupInformation.getCurrentUser());
|
||||
|
||||
// Guava requires CacheLoader never returns null.
|
||||
return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
|
||||
@Override
|
||||
public DFSClient run() throws IOException {
|
||||
return new DFSClient(DFSUtilClient.getNNAddress(config), config);
|
||||
URI namenodeURI = namenodeUriMap.get(key.namenodeId);
|
||||
if (namenodeURI == null) {
|
||||
throw new IOException("No namenode URI found for user:" +
|
||||
key.userName + " namenodeId:" + key.namenodeId);
|
||||
}
|
||||
return new DFSClient(namenodeURI, config);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -181,7 +253,7 @@ public DFSClient run() throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* This method uses the currentUser, and real user to create a proxy
|
||||
* This method uses the currentUser, and real user to create a proxy.
|
||||
* @param effectiveUser The user who is being proxied by the real user
|
||||
* @param realUser The actual user who does the command
|
||||
* @return Proxy UserGroupInformation
|
||||
@ -204,10 +276,11 @@ UserGroupInformation getUserGroupInformation(
|
||||
return ugi;
|
||||
}
|
||||
|
||||
private RemovalListener<String, DFSClient> clientRemovalListener() {
|
||||
return new RemovalListener<String, DFSClient>() {
|
||||
private RemovalListener<DfsClientKey, DFSClient> clientRemovalListener() {
|
||||
return new RemovalListener<DfsClientKey, DFSClient>() {
|
||||
@Override
|
||||
public void onRemoval(RemovalNotification<String, DFSClient> notification) {
|
||||
public void onRemoval(
|
||||
RemovalNotification<DfsClientKey, DFSClient> notification) {
|
||||
DFSClient client = notification.getValue();
|
||||
try {
|
||||
client.close();
|
||||
@ -220,12 +293,15 @@ public void onRemoval(RemovalNotification<String, DFSClient> notification) {
|
||||
};
|
||||
}
|
||||
|
||||
private RemovalListener<DFSInputStreamCaheKey, FSDataInputStream> inputStreamRemovalListener() {
|
||||
return new RemovalListener<DFSClientCache.DFSInputStreamCaheKey, FSDataInputStream>() {
|
||||
private RemovalListener
|
||||
<DFSInputStreamCacheKey, FSDataInputStream> inputStreamRemovalListener() {
|
||||
return new RemovalListener
|
||||
<DFSClientCache.DFSInputStreamCacheKey, FSDataInputStream>() {
|
||||
|
||||
@Override
|
||||
public void onRemoval(
|
||||
RemovalNotification<DFSInputStreamCaheKey, FSDataInputStream> notification) {
|
||||
RemovalNotification<DFSInputStreamCacheKey, FSDataInputStream>
|
||||
notification) {
|
||||
try {
|
||||
notification.getValue().close();
|
||||
} catch (IOException ignored) {
|
||||
@ -234,22 +310,24 @@ public void onRemoval(
|
||||
};
|
||||
}
|
||||
|
||||
private CacheLoader<DFSInputStreamCaheKey, FSDataInputStream> inputStreamLoader() {
|
||||
return new CacheLoader<DFSInputStreamCaheKey, FSDataInputStream>() {
|
||||
private CacheLoader<DFSInputStreamCacheKey, FSDataInputStream>
|
||||
inputStreamLoader() {
|
||||
return new CacheLoader<DFSInputStreamCacheKey, FSDataInputStream>() {
|
||||
|
||||
@Override
|
||||
public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception {
|
||||
DFSClient client = getDfsClient(key.userId);
|
||||
public FSDataInputStream
|
||||
load(DFSInputStreamCacheKey key) throws Exception {
|
||||
DFSClient client = getDfsClient(key.userId, key.namenodeId);
|
||||
DFSInputStream dis = client.open(key.inodePath);
|
||||
return client.createWrappedInputStream(dis);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
DFSClient getDfsClient(String userName) {
|
||||
DFSClient getDfsClient(String userName, int namenodeId) {
|
||||
DFSClient client = null;
|
||||
try {
|
||||
client = clientCache.get(userName);
|
||||
client = clientCache.get(new DfsClientKey(userName, namenodeId));
|
||||
} catch (ExecutionException e) {
|
||||
LOG.error("Failed to create DFSClient for user:" + userName + " Cause:"
|
||||
+ e);
|
||||
@ -257,8 +335,10 @@ DFSClient getDfsClient(String userName) {
|
||||
return client;
|
||||
}
|
||||
|
||||
FSDataInputStream getDfsInputStream(String userName, String inodePath) {
|
||||
DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath);
|
||||
FSDataInputStream getDfsInputStream(String userName, String inodePath,
|
||||
int namenodeId) {
|
||||
DFSInputStreamCacheKey k =
|
||||
new DFSInputStreamCacheKey(userName, inodePath, namenodeId);
|
||||
FSDataInputStream s = null;
|
||||
try {
|
||||
s = inputstreamCache.get(k);
|
||||
@ -269,8 +349,10 @@ FSDataInputStream getDfsInputStream(String userName, String inodePath) {
|
||||
return s;
|
||||
}
|
||||
|
||||
public void invalidateDfsInputStream(String userName, String inodePath) {
|
||||
DFSInputStreamCaheKey k = new DFSInputStreamCaheKey(userName, inodePath);
|
||||
public void invalidateDfsInputStream(String userName, String inodePath,
|
||||
int namenodeId) {
|
||||
DFSInputStreamCacheKey k =
|
||||
new DFSInputStreamCacheKey(userName, inodePath, namenodeId);
|
||||
inputstreamCache.invalidate(k);
|
||||
}
|
||||
}
|
||||
|
@ -18,8 +18,17 @@
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.file.FileSystemException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsConstants;
|
||||
import org.apache.hadoop.fs.viewfs.ViewFileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.nfs.NfsFileType;
|
||||
import org.apache.hadoop.nfs.NfsTime;
|
||||
@ -224,4 +233,41 @@ public static byte[] longToByte(long v) {
|
||||
public static long getElapsedTime(long startTimeNano) {
|
||||
return System.nanoTime() - startTimeNano;
|
||||
}
|
||||
|
||||
public static int getNamenodeId(Configuration conf) {
|
||||
URI filesystemURI = FileSystem.getDefaultUri(conf);
|
||||
return getNamenodeId(conf, filesystemURI);
|
||||
}
|
||||
|
||||
public static int getNamenodeId(Configuration conf, URI namenodeURI) {
|
||||
InetSocketAddress address =
|
||||
DFSUtilClient.getNNAddressCheckLogical(conf, namenodeURI);
|
||||
return address.hashCode();
|
||||
}
|
||||
|
||||
public static URI getResolvedURI(FileSystem fs, String exportPath)
|
||||
throws IOException {
|
||||
URI fsURI = fs.getUri();
|
||||
String scheme = fs.getScheme();
|
||||
if (scheme.equalsIgnoreCase(FsConstants.VIEWFS_SCHEME)) {
|
||||
ViewFileSystem viewFs = (ViewFileSystem)fs;
|
||||
ViewFileSystem.MountPoint[] mountPoints = viewFs.getMountPoints();
|
||||
for (ViewFileSystem.MountPoint mount : mountPoints) {
|
||||
String mountedPath = mount.getMountedOnPath().toString();
|
||||
if (exportPath.startsWith(mountedPath)) {
|
||||
String subpath = exportPath.substring(mountedPath.length());
|
||||
fsURI = mount.getTargetFileSystemURIs()[0].resolve(subpath);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else if (scheme.equalsIgnoreCase(HdfsConstants.HDFS_URI_SCHEME)) {
|
||||
fsURI = fsURI.resolve(exportPath);
|
||||
}
|
||||
|
||||
if (!fsURI.getScheme().equalsIgnoreCase(HdfsConstants.HDFS_URI_SCHEME)) {
|
||||
throw new FileSystemException("Only HDFS is supported as underlying"
|
||||
+ "FileSystem, fs scheme:" + scheme + " uri to be added" + fsURI);
|
||||
}
|
||||
return fsURI;
|
||||
}
|
||||
}
|
||||
|
@ -442,7 +442,7 @@ public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
|
||||
|
||||
if (!activeState) {
|
||||
LOG.info("OpenFileCtx is inactive, fileId: "
|
||||
+ request.getHandle().getFileId());
|
||||
+ request.getHandle().dumpFileHandle());
|
||||
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
|
||||
fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
|
||||
@ -981,7 +981,8 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
|
||||
* Check stream status to decide if it should be closed
|
||||
* @return true, remove stream; false, keep stream
|
||||
*/
|
||||
public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
|
||||
public synchronized boolean streamCleanup(FileHandle handle,
|
||||
long streamTimeout) {
|
||||
Preconditions
|
||||
.checkState(streamTimeout >= NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
|
||||
if (!activeState) {
|
||||
@ -992,7 +993,8 @@ public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
|
||||
// Check the stream timeout
|
||||
if (checkStreamTimeout(streamTimeout)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stream can be closed for fileId: " + fileId);
|
||||
LOG.debug("stream can be closed for fileId: "
|
||||
+ handle.dumpFileHandle());
|
||||
}
|
||||
flag = true;
|
||||
}
|
||||
@ -1188,7 +1190,7 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
||||
|
||||
FileHandle handle = writeCtx.getHandle();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
|
||||
LOG.debug("do write, fileHandle " + handle.dumpFileHandle() + " offset: "
|
||||
+ offset + " length: " + count + " stableHow: " + stableHow.name());
|
||||
}
|
||||
|
||||
@ -1213,8 +1215,9 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
||||
writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
|
||||
updateNonSequentialWriteInMemory(-count);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("After writing " + handle.getFileId() + " at offset "
|
||||
+ offset + ", updated the memory count, new value: "
|
||||
LOG.debug("After writing " + handle.dumpFileHandle()
|
||||
+ " at offset " + offset
|
||||
+ ", updated the memory count, new value: "
|
||||
+ nonSequentialWriteInMemory.get());
|
||||
}
|
||||
}
|
||||
@ -1257,8 +1260,8 @@ private void doSingleWrite(final WriteCtx writeCtx) {
|
||||
processCommits(writeCtx.getOffset() + writeCtx.getCount());
|
||||
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
|
||||
+ offset + " and length " + count, e);
|
||||
LOG.error("Error writing to fileHandle " + handle.dumpFileHandle()
|
||||
+ " at offset " + offset + " and length " + count, e);
|
||||
if (!writeCtx.getReplied()) {
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
|
||||
Nfs3Utils.writeChannel(channel, response.serialize(
|
||||
|
@ -156,7 +156,7 @@ void scan(long streamTimeout) {
|
||||
Entry<FileHandle, OpenFileCtx> pairs = it.next();
|
||||
FileHandle handle = pairs.getKey();
|
||||
OpenFileCtx ctx = pairs.getValue();
|
||||
if (!ctx.streamCleanup(handle.getFileId(), streamTimeout)) {
|
||||
if (!ctx.streamCleanup(handle, streamTimeout)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -164,10 +164,10 @@ void scan(long streamTimeout) {
|
||||
synchronized (this) {
|
||||
OpenFileCtx ctx2 = openFileMap.get(handle);
|
||||
if (ctx2 != null) {
|
||||
if (ctx2.streamCleanup(handle.getFileId(), streamTimeout)) {
|
||||
if (ctx2.streamCleanup(handle, streamTimeout)) {
|
||||
openFileMap.remove(handle);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("After remove stream " + handle.getFileId()
|
||||
LOG.debug("After remove stream " + handle.dumpFileHandle()
|
||||
+ ", the stream number:" + size());
|
||||
}
|
||||
ctxToRemove.add(ctx2);
|
||||
|
@ -319,12 +319,6 @@ GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
|
||||
return response;
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
GETATTR3Request request;
|
||||
try {
|
||||
request = GETATTR3Request.deserialize(xdr);
|
||||
@ -335,9 +329,17 @@ GETATTR3Response getattr(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("GETATTR for fileId: " + handle.getFileId() + " client: "
|
||||
+ remoteAddress);
|
||||
LOG.debug("GETATTR for fileHandle: " + handle.dumpFileHandle()
|
||||
+ " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
Nfs3FileAttributes attrs = null;
|
||||
@ -412,11 +414,6 @@ public SETATTR3Response setattr(XDR xdr, RpcInfo info) {
|
||||
SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
|
||||
SocketAddress remoteAddress) {
|
||||
SETATTR3Response response = new SETATTR3Response(Nfs3Status.NFS3_OK);
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
SETATTR3Request request;
|
||||
try {
|
||||
@ -428,9 +425,17 @@ SETATTR3Response setattr(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS SETATTR fileId: " + handle.getFileId() + " client: "
|
||||
+ remoteAddress);
|
||||
LOG.debug("NFS SETATTR fileHandle: " + handle.dumpFileHandle()
|
||||
+ " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
if (request.getAttr().getUpdateFields().contains(SetAttrField.SIZE)) {
|
||||
@ -498,12 +503,6 @@ LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
|
||||
return response;
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
LOOKUP3Request request;
|
||||
try {
|
||||
request = LOOKUP3Request.deserialize(xdr);
|
||||
@ -514,15 +513,22 @@ LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
|
||||
|
||||
FileHandle dirHandle = request.getHandle();
|
||||
String fileName = request.getName();
|
||||
int namenodeId = dirHandle.getNamenodeId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS LOOKUP dir fileId: " + dirHandle.getFileId() + " name: "
|
||||
+ fileName + " client: " + remoteAddress);
|
||||
LOG.debug("NFS LOOKUP dir fileHandle: " + dirHandle.dumpFileHandle()
|
||||
+ " name: " + fileName + " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
try {
|
||||
String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle);
|
||||
Nfs3FileAttributes postOpObjAttr = writeManager.getFileAttr(dfsClient,
|
||||
dirHandle, fileName);
|
||||
dirHandle, fileName, namenodeId);
|
||||
if (postOpObjAttr == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS LOOKUP fileId: " + dirHandle.getFileId() + " name: "
|
||||
@ -540,7 +546,8 @@ LOOKUP3Response lookup(XDR xdr, SecurityHandler securityHandler,
|
||||
LOG.info("Can't get path for dir fileId: " + dirHandle.getFileId());
|
||||
return new LOOKUP3Response(Nfs3Status.NFS3ERR_STALE);
|
||||
}
|
||||
FileHandle fileHandle = new FileHandle(postOpObjAttr.getFileId());
|
||||
FileHandle fileHandle =
|
||||
new FileHandle(postOpObjAttr.getFileId(), namenodeId);
|
||||
return new LOOKUP3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr,
|
||||
postOpDirAttr);
|
||||
|
||||
@ -566,12 +573,6 @@ ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
|
||||
return response;
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
ACCESS3Request request;
|
||||
try {
|
||||
request = ACCESS3Request.deserialize(xdr);
|
||||
@ -581,13 +582,21 @@ ACCESS3Response access(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
Nfs3FileAttributes attrs;
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS ACCESS fileId: " + handle.getFileId() + " client: "
|
||||
+ remoteAddress);
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS ACCESS fileHandle: " + handle.dumpFileHandle()
|
||||
+ " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
Nfs3FileAttributes attrs;
|
||||
try {
|
||||
attrs = writeManager.getFileAttr(dfsClient, handle, iug);
|
||||
|
||||
@ -639,12 +648,6 @@ READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
|
||||
return response;
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
READLINK3Request request;
|
||||
|
||||
try {
|
||||
@ -655,9 +658,17 @@ READLINK3Response readlink(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS READLINK fileId: " + handle.getFileId() + " client: "
|
||||
+ remoteAddress);
|
||||
LOG.debug("NFS READLINK fileHandle: " + handle.dumpFileHandle()
|
||||
+ " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
String fileIdPath = Nfs3Utils.getFileIdPath(handle);
|
||||
@ -715,12 +726,6 @@ READ3Response read(XDR xdr, SecurityHandler securityHandler,
|
||||
return response;
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(userName);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
READ3Request request;
|
||||
|
||||
try {
|
||||
@ -734,9 +739,16 @@ READ3Response read(XDR xdr, SecurityHandler securityHandler,
|
||||
int count = request.getCount();
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS READ fileId: " + handle.getFileId() + " offset: " + offset
|
||||
+ " count: " + count + " client: " + remoteAddress);
|
||||
LOG.debug("NFS READ fileHandle: " + handle.dumpFileHandle()+ " offset: "
|
||||
+ offset + " count: " + count + " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(userName, namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
Nfs3FileAttributes attrs;
|
||||
@ -791,7 +803,7 @@ READ3Response read(XDR xdr, SecurityHandler securityHandler,
|
||||
*/
|
||||
for (int i = 0; i < 1; ++i) {
|
||||
FSDataInputStream fis = clientCache.getDfsInputStream(userName,
|
||||
Nfs3Utils.getFileIdPath(handle));
|
||||
Nfs3Utils.getFileIdPath(handle), namenodeId);
|
||||
|
||||
if (fis == null) {
|
||||
return new READ3Response(Nfs3Status.NFS3ERR_ACCES);
|
||||
@ -805,7 +817,7 @@ READ3Response read(XDR xdr, SecurityHandler securityHandler,
|
||||
// which requires incompatible changes.
|
||||
if (e.getMessage().equals("Stream closed")) {
|
||||
clientCache.invalidateDfsInputStream(userName,
|
||||
Nfs3Utils.getFileIdPath(handle));
|
||||
Nfs3Utils.getFileIdPath(handle), namenodeId);
|
||||
continue;
|
||||
} else {
|
||||
throw e;
|
||||
@ -850,11 +862,6 @@ WRITE3Response write(XDR xdr, Channel channel, int xid,
|
||||
SecurityHandler securityHandler, SocketAddress remoteAddress) {
|
||||
WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK);
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
WRITE3Request request;
|
||||
|
||||
@ -875,12 +882,20 @@ WRITE3Response write(XDR xdr, Channel channel, int xid,
|
||||
}
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS WRITE fileId: " + handle.getFileId() + " offset: "
|
||||
LOG.debug("NFS WRITE fileHandle: " + handle.dumpFileHandle() + " offset: "
|
||||
+ offset + " length: " + count + " stableHow: " + stableHow.getValue()
|
||||
+ " xid: " + xid + " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
Nfs3FileAttributes preOpAttr = null;
|
||||
try {
|
||||
preOpAttr = writeManager.getFileAttr(dfsClient, handle, iug);
|
||||
@ -932,11 +947,6 @@ public CREATE3Response create(XDR xdr, RpcInfo info) {
|
||||
CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
|
||||
SocketAddress remoteAddress) {
|
||||
CREATE3Response response = new CREATE3Response(Nfs3Status.NFS3_OK);
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
CREATE3Request request;
|
||||
|
||||
@ -949,11 +959,19 @@ CREATE3Response create(XDR xdr, SecurityHandler securityHandler,
|
||||
|
||||
FileHandle dirHandle = request.getHandle();
|
||||
String fileName = request.getName();
|
||||
int namenodeId = dirHandle.getNamenodeId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS CREATE dir fileId: " + dirHandle.getFileId()
|
||||
LOG.debug("NFS CREATE dir fileHandle: " + dirHandle.dumpFileHandle()
|
||||
+ " filename: " + fileName + " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
int createMode = request.getMode();
|
||||
if ((createMode != Nfs3Constant.CREATE_EXCLUSIVE)
|
||||
&& request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)
|
||||
@ -1016,7 +1034,7 @@ preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr),
|
||||
OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
|
||||
writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug,
|
||||
aixCompatMode, config);
|
||||
fileHandle = new FileHandle(postOpObjAttr.getFileId());
|
||||
fileHandle = new FileHandle(postOpObjAttr.getFileId(), namenodeId);
|
||||
if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
|
||||
LOG.warn("Can't add more stream, close it."
|
||||
+ " Future write will become append");
|
||||
@ -1066,11 +1084,6 @@ public MKDIR3Response mkdir(XDR xdr, RpcInfo info) {
|
||||
MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
|
||||
SocketAddress remoteAddress) {
|
||||
MKDIR3Response response = new MKDIR3Response(Nfs3Status.NFS3_OK);
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
MKDIR3Request request;
|
||||
|
||||
@ -1082,9 +1095,18 @@ MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
FileHandle dirHandle = request.getHandle();
|
||||
String fileName = request.getName();
|
||||
int namenodeId = dirHandle.getNamenodeId();
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS MKDIR dirId: " + dirHandle.getFileId() + " filename: "
|
||||
+ fileName + " client: " + remoteAddress);
|
||||
LOG.debug("NFS MKDIR dirHandle: " + dirHandle.dumpFileHandle()
|
||||
+ " filename: " + fileName + " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
if (request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)) {
|
||||
@ -1130,11 +1152,11 @@ MKDIR3Response mkdir(XDR xdr, SecurityHandler securityHandler,
|
||||
setattrInternal(dfsClient, fileIdPath, setAttr3, false);
|
||||
|
||||
postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
|
||||
objFileHandle = new FileHandle(postOpObjAttr.getFileId());
|
||||
objFileHandle = new FileHandle(postOpObjAttr.getFileId(), namenodeId);
|
||||
WccData dirWcc = Nfs3Utils.createWccData(
|
||||
Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug);
|
||||
return new MKDIR3Response(Nfs3Status.NFS3_OK, new FileHandle(
|
||||
postOpObjAttr.getFileId()), postOpObjAttr, dirWcc);
|
||||
postOpObjAttr.getFileId(), namenodeId), postOpObjAttr, dirWcc);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception ", e);
|
||||
// Try to return correct WccData
|
||||
@ -1167,11 +1189,6 @@ public REMOVE3Response remove(XDR xdr, RpcInfo info) {
|
||||
REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler,
|
||||
SocketAddress remoteAddress) {
|
||||
REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK);
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
REMOVE3Request request;
|
||||
try {
|
||||
@ -1181,12 +1198,21 @@ REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler,
|
||||
return new REMOVE3Response(Nfs3Status.NFS3ERR_INVAL);
|
||||
}
|
||||
FileHandle dirHandle = request.getHandle();
|
||||
int namenodeId = dirHandle.getNamenodeId();
|
||||
|
||||
String fileName = request.getName();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS REMOVE dir fileId: " + dirHandle.getFileId()
|
||||
LOG.debug("NFS REMOVE dir fileHandle: " + dirHandle.dumpFileHandle()
|
||||
+ " fileName: " + fileName + " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle);
|
||||
Nfs3FileAttributes preOpDirAttr = null;
|
||||
Nfs3FileAttributes postOpDirAttr = null;
|
||||
@ -1247,11 +1273,6 @@ public RMDIR3Response rmdir(XDR xdr, RpcInfo info) {
|
||||
RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
|
||||
SocketAddress remoteAddress) {
|
||||
RMDIR3Response response = new RMDIR3Response(Nfs3Status.NFS3_OK);
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
RMDIR3Request request;
|
||||
try {
|
||||
@ -1262,12 +1283,19 @@ RMDIR3Response rmdir(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
FileHandle dirHandle = request.getHandle();
|
||||
String fileName = request.getName();
|
||||
|
||||
int namenodeId = dirHandle.getNamenodeId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS RMDIR dir fileId: " + dirHandle.getFileId()
|
||||
LOG.debug("NFS RMDIR dir fileHandle: " + dirHandle.dumpFileHandle()
|
||||
+ " fileName: " + fileName + " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
String dirFileIdPath = Nfs3Utils.getFileIdPath(dirHandle);
|
||||
Nfs3FileAttributes preOpDirAttr = null;
|
||||
Nfs3FileAttributes postOpDirAttr = null;
|
||||
@ -1332,11 +1360,6 @@ public RENAME3Response rename(XDR xdr, RpcInfo info) {
|
||||
RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
|
||||
SocketAddress remoteAddress) {
|
||||
RENAME3Response response = new RENAME3Response(Nfs3Status.NFS3_OK);
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
RENAME3Request request = null;
|
||||
try {
|
||||
@ -1347,13 +1370,28 @@ RENAME3Response rename(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
|
||||
FileHandle fromHandle = request.getFromDirHandle();
|
||||
int fromNamenodeId = fromHandle.getNamenodeId();
|
||||
String fromName = request.getFromName();
|
||||
FileHandle toHandle = request.getToDirHandle();
|
||||
int toNamenodeId = toHandle.getNamenodeId();
|
||||
String toName = request.getToName();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS RENAME from: " + fromHandle.getFileId() + "/" + fromName
|
||||
+ " to: " + toHandle.getFileId() + "/" + toName + " client: "
|
||||
+ remoteAddress);
|
||||
LOG.debug("NFS RENAME from: " + fromHandle.dumpFileHandle()
|
||||
+ "/" + fromName + " to: " + toHandle.dumpFileHandle()
|
||||
+ "/" + toName + " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), fromNamenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
if (fromNamenodeId != toNamenodeId) {
|
||||
// renaming file from one namenode to another is not supported
|
||||
response.setStatus(Nfs3Status.NFS3ERR_INVAL);
|
||||
return response;
|
||||
}
|
||||
|
||||
String fromDirFileIdPath = Nfs3Utils.getFileIdPath(fromHandle);
|
||||
@ -1429,12 +1467,6 @@ SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
|
||||
return response;
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
SYMLINK3Request request;
|
||||
try {
|
||||
request = SYMLINK3Request.deserialize(xdr);
|
||||
@ -1448,11 +1480,20 @@ SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
|
||||
String name = request.getName();
|
||||
String symData = request.getSymData();
|
||||
String linkDirIdPath = Nfs3Utils.getFileIdPath(dirHandle);
|
||||
int namenodeId = dirHandle.getNamenodeId();
|
||||
|
||||
// Don't do any name check to source path, just leave it to HDFS
|
||||
String linkIdPath = linkDirIdPath + "/" + name;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS SYMLINK, target: " + symData + " link: " + linkIdPath
|
||||
+ " client: " + remoteAddress);
|
||||
+ " namenodeId: " + namenodeId + " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
try {
|
||||
@ -1471,7 +1512,7 @@ SYMLINK3Response symlink(XDR xdr, SecurityHandler securityHandler,
|
||||
.setPostOpAttr(Nfs3Utils.getFileAttr(dfsClient, linkDirIdPath, iug));
|
||||
|
||||
return new SYMLINK3Response(Nfs3Status.NFS3_OK, new FileHandle(
|
||||
objAttr.getFileId()), objAttr, dirWcc);
|
||||
objAttr.getFileId(), namenodeId), objAttr, dirWcc);
|
||||
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception: " + e);
|
||||
@ -1524,12 +1565,6 @@ public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
|
||||
return response;
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
READDIR3Request request;
|
||||
try {
|
||||
request = READDIR3Request.deserialize(xdr);
|
||||
@ -1538,6 +1573,8 @@ public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
|
||||
return new READDIR3Response(Nfs3Status.NFS3ERR_INVAL);
|
||||
}
|
||||
FileHandle handle = request.getHandle();
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
|
||||
long cookie = request.getCookie();
|
||||
if (cookie < 0) {
|
||||
LOG.error("Invalid READDIR request, with negative cookie: " + cookie);
|
||||
@ -1550,8 +1587,16 @@ public READDIR3Response readdir(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS READDIR fileId: " + handle.getFileId() + " cookie: "
|
||||
+ cookie + " count: " + count + " client: " + remoteAddress);
|
||||
LOG.debug("NFS READDIR fileHandle: " + handle.dumpFileHandle()
|
||||
+ " cookie: " + cookie + " count: " + count + " client: "
|
||||
+ remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
HdfsFileStatus dirStatus;
|
||||
@ -1684,10 +1729,6 @@ READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler,
|
||||
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_ACCES);
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
}
|
||||
|
||||
READDIRPLUS3Request request = null;
|
||||
try {
|
||||
@ -1698,6 +1739,7 @@ READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
long cookie = request.getCookie();
|
||||
if (cookie < 0) {
|
||||
LOG.error("Invalid READDIRPLUS request, with negative cookie: " + cookie);
|
||||
@ -1715,9 +1757,15 @@ READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS READDIRPLUS fileId: " + handle.getFileId() + " cookie: "
|
||||
+ cookie + " dirCount: " + dirCount + " maxCount: " + maxCount
|
||||
+ " client: " + remoteAddress);
|
||||
LOG.debug("NFS READDIRPLUS fileHandle: " + handle.dumpFileHandle()
|
||||
+ " cookie: " + cookie + " dirCount: " + dirCount + " maxCount: "
|
||||
+ maxCount + " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
return new READDIRPLUS3Response(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
}
|
||||
|
||||
HdfsFileStatus dirStatus;
|
||||
@ -1805,14 +1853,14 @@ READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler,
|
||||
|
||||
entries[0] = new READDIRPLUS3Response.EntryPlus3(
|
||||
postOpDirAttr.getFileId(), ".", 0, postOpDirAttr, new FileHandle(
|
||||
postOpDirAttr.getFileId()));
|
||||
postOpDirAttr.getFileId(), namenodeId));
|
||||
entries[1] = new READDIRPLUS3Response.EntryPlus3(dotdotFileId, "..",
|
||||
dotdotFileId, Nfs3Utils.getNfs3FileAttrFromFileStatus(dotdotStatus,
|
||||
iug), new FileHandle(dotdotFileId));
|
||||
iug), new FileHandle(dotdotFileId, namenodeId));
|
||||
|
||||
for (int i = 2; i < n + 2; i++) {
|
||||
long fileId = fstatus[i - 2].getFileId();
|
||||
FileHandle childHandle = new FileHandle(fileId);
|
||||
FileHandle childHandle = new FileHandle(fileId, namenodeId);
|
||||
Nfs3FileAttributes attr;
|
||||
try {
|
||||
attr = writeManager.getFileAttr(dfsClient, childHandle, iug);
|
||||
@ -1829,7 +1877,7 @@ READDIRPLUS3Response readdirplus(XDR xdr, SecurityHandler securityHandler,
|
||||
entries = new READDIRPLUS3Response.EntryPlus3[n];
|
||||
for (int i = 0; i < n; i++) {
|
||||
long fileId = fstatus[i].getFileId();
|
||||
FileHandle childHandle = new FileHandle(fileId);
|
||||
FileHandle childHandle = new FileHandle(fileId, namenodeId);
|
||||
Nfs3FileAttributes attr;
|
||||
try {
|
||||
attr = writeManager.getFileAttr(dfsClient, childHandle, iug);
|
||||
@ -1863,11 +1911,6 @@ FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
|
||||
return response;
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
FSSTAT3Request request;
|
||||
try {
|
||||
@ -1878,9 +1921,17 @@ FSSTAT3Response fsstat(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS FSSTAT fileId: " + handle.getFileId() + " client: "
|
||||
+ remoteAddress);
|
||||
LOG.debug("NFS FSSTAT fileHandle: " + handle.dumpFileHandle()
|
||||
+ " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
try {
|
||||
@ -1938,12 +1989,6 @@ FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
|
||||
return response;
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
FSINFO3Request request;
|
||||
try {
|
||||
request = FSINFO3Request.deserialize(xdr);
|
||||
@ -1953,9 +1998,17 @@ FSINFO3Response fsinfo(XDR xdr, SecurityHandler securityHandler,
|
||||
}
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS FSINFO fileId: " + handle.getFileId() + " client: "
|
||||
+ remoteAddress);
|
||||
LOG.debug("NFS FSINFO fileHandle: " + handle.dumpFileHandle()
|
||||
+" client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
try {
|
||||
@ -2003,12 +2056,6 @@ PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
|
||||
return response;
|
||||
}
|
||||
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
PATHCONF3Request request;
|
||||
try {
|
||||
request = PATHCONF3Request.deserialize(xdr);
|
||||
@ -2019,10 +2066,18 @@ PATHCONF3Response pathconf(XDR xdr, SecurityHandler securityHandler,
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
Nfs3FileAttributes attrs;
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS PATHCONF fileId: " + handle.getFileId() + " client: "
|
||||
+ remoteAddress);
|
||||
LOG.debug("NFS PATHCONF fileHandle: " + handle.dumpFileHandle()
|
||||
+ " client: " + remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
try {
|
||||
@ -2055,11 +2110,6 @@ public COMMIT3Response commit(XDR xdr, RpcInfo info) {
|
||||
COMMIT3Response commit(XDR xdr, Channel channel, int xid,
|
||||
SecurityHandler securityHandler, SocketAddress remoteAddress) {
|
||||
COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK);
|
||||
DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser());
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
COMMIT3Request request;
|
||||
try {
|
||||
@ -2071,12 +2121,20 @@ COMMIT3Response commit(XDR xdr, Channel channel, int xid,
|
||||
}
|
||||
|
||||
FileHandle handle = request.getHandle();
|
||||
int namenodeId = handle.getNamenodeId();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("NFS COMMIT fileId: " + handle.getFileId() + " offset="
|
||||
LOG.debug("NFS COMMIT fileHandle: " + handle.dumpFileHandle() + " offset="
|
||||
+ request.getOffset() + " count=" + request.getCount() + " client: "
|
||||
+ remoteAddress);
|
||||
}
|
||||
|
||||
DFSClient dfsClient =
|
||||
clientCache.getDfsClient(securityHandler.getUser(), namenodeId);
|
||||
if (dfsClient == null) {
|
||||
response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT);
|
||||
return response;
|
||||
}
|
||||
|
||||
String fileIdPath = Nfs3Utils.getFileIdPath(handle);
|
||||
Nfs3FileAttributes preOpAttr = null;
|
||||
try {
|
||||
@ -2097,7 +2155,7 @@ COMMIT3Response commit(XDR xdr, Channel channel, int xid,
|
||||
|
||||
// Insert commit as an async request
|
||||
writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid,
|
||||
preOpAttr);
|
||||
preOpAttr, namenodeId);
|
||||
return null;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception ", e);
|
||||
|
@ -318,9 +318,9 @@ void setReplied(boolean replied) {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Id:" + handle.getFileId() + " offset:" + getPlainOffset() + " " +
|
||||
"count:" + count + " originalCount:" + getOriginalCount() +
|
||||
" stableHow:" + stableHow + " replied:" + replied + " dataState:" +
|
||||
dataState + " xid:" + xid;
|
||||
return "FileHandle:" + handle.dumpFileHandle() + " offset:"
|
||||
+ getPlainOffset() + " " + "count:" + count + " originalCount:"
|
||||
+ getOriginalCount() + " stableHow:" + stableHow + " replied:"
|
||||
+ replied + " dataState:" + dataState + " xid:" + xid;
|
||||
}
|
||||
}
|
@ -139,7 +139,8 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
|
||||
FileHandle fileHandle = request.getHandle();
|
||||
OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
|
||||
if (openFileCtx == null) {
|
||||
LOG.info("No opened stream for fileId: " + fileHandle.getFileId());
|
||||
LOG.info("No opened stream for fileHandle: "
|
||||
+ fileHandle.dumpFileHandle());
|
||||
|
||||
String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle.getFileId());
|
||||
HdfsDataOutputStream fos = null;
|
||||
@ -188,7 +189,8 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
|
||||
try {
|
||||
fos.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Can't close stream for fileId: " + handle.getFileId(), e);
|
||||
LOG.error("Can't close stream for fileHandle: "
|
||||
+ handle.dumpFileHandle(), e);
|
||||
}
|
||||
// Notify client to retry
|
||||
WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
|
||||
@ -201,7 +203,8 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Opened stream for appending file: " + fileHandle.getFileId());
|
||||
LOG.debug("Opened stream for appending file: "
|
||||
+ fileHandle.dumpFileHandle());
|
||||
}
|
||||
}
|
||||
|
||||
@ -220,7 +223,7 @@ int commitBeforeRead(DFSClient dfsClient, FileHandle fileHandle,
|
||||
|
||||
if (openFileCtx == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No opened stream for fileId: " + fileHandle.getFileId()
|
||||
LOG.debug("No opened stream for fileId: " + fileHandle.dumpFileHandle()
|
||||
+ " commitOffset=" + commitOffset
|
||||
+ ". Return success in this case.");
|
||||
}
|
||||
@ -263,13 +266,14 @@ int commitBeforeRead(DFSClient dfsClient, FileHandle fileHandle,
|
||||
}
|
||||
|
||||
void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
|
||||
long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
|
||||
long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr,
|
||||
int namenodeId) {
|
||||
long startTime = System.nanoTime();
|
||||
int status;
|
||||
OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
|
||||
|
||||
if (openFileCtx == null) {
|
||||
LOG.info("No opened stream for fileId: " + fileHandle.getFileId()
|
||||
LOG.info("No opened stream for fileId: " + fileHandle.dumpFileHandle()
|
||||
+ " commitOffset=" + commitOffset + ". Return success in this case.");
|
||||
status = Nfs3Status.NFS3_OK;
|
||||
|
||||
@ -304,7 +308,9 @@ void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
|
||||
// Send out the response
|
||||
Nfs3FileAttributes postOpAttr = null;
|
||||
try {
|
||||
postOpAttr = getFileAttr(dfsClient, new FileHandle(preOpAttr.getFileId()), iug);
|
||||
postOpAttr =
|
||||
getFileAttr(dfsClient, new FileHandle(preOpAttr.getFileId(),
|
||||
namenodeId), iug);
|
||||
} catch (IOException e1) {
|
||||
LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileId(), e1);
|
||||
}
|
||||
@ -334,13 +340,13 @@ Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle fileHandle,
|
||||
}
|
||||
|
||||
Nfs3FileAttributes getFileAttr(DFSClient client, FileHandle dirHandle,
|
||||
String fileName) throws IOException {
|
||||
String fileName, int namenodeId) throws IOException {
|
||||
String fileIdPath = Nfs3Utils.getFileIdPath(dirHandle) + "/" + fileName;
|
||||
Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
|
||||
|
||||
if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) {
|
||||
OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr
|
||||
.getFileId()));
|
||||
.getFileId(), namenodeId));
|
||||
|
||||
if (openFileCtx != null) {
|
||||
attr.setSize(openFileCtx.getNextOffset());
|
||||
|
@ -101,9 +101,10 @@ public void testClientAccessPrivilegeForRemove() throws Exception {
|
||||
// Create a remove request
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
handle.serialize(xdr_req);
|
||||
xdr_req.writeString("f1");
|
||||
|
||||
|
@ -43,15 +43,17 @@ public void testEviction() throws IOException {
|
||||
|
||||
DFSClientCache cache = new DFSClientCache(conf, MAX_CACHE_SIZE);
|
||||
|
||||
DFSClient c1 = cache.getDfsClient("test1");
|
||||
assertTrue(cache.getDfsClient("test1").toString().contains("ugi=test1"));
|
||||
assertEquals(c1, cache.getDfsClient("test1"));
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(conf);
|
||||
DFSClient c1 = cache.getDfsClient("test1", namenodeId);
|
||||
assertTrue(cache.getDfsClient("test1", namenodeId)
|
||||
.toString().contains("ugi=test1"));
|
||||
assertEquals(c1, cache.getDfsClient("test1", namenodeId));
|
||||
assertFalse(isDfsClientClose(c1));
|
||||
|
||||
cache.getDfsClient("test2");
|
||||
cache.getDfsClient("test2", namenodeId);
|
||||
assertTrue(isDfsClientClose(c1));
|
||||
assertTrue("cache size should be the max size or less",
|
||||
cache.clientCache.size() <= MAX_CACHE_SIZE);
|
||||
cache.getClientCache().size() <= MAX_CACHE_SIZE);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -61,6 +63,7 @@ public void testGetUserGroupInformationSecure() throws IOException {
|
||||
|
||||
|
||||
NfsConfiguration conf = new NfsConfiguration();
|
||||
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
|
||||
UserGroupInformation currentUserUgi
|
||||
= UserGroupInformation.createRemoteUser(currentUser);
|
||||
currentUserUgi.setAuthenticationMethod(KERBEROS);
|
||||
|
@ -21,7 +21,14 @@
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FsConstants;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.viewfs.ConfigUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
|
||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.nfs.mount.Mountd;
|
||||
@ -31,7 +38,152 @@
|
||||
public class TestExportsTable {
|
||||
|
||||
@Test
|
||||
public void testExportPoint() throws IOException {
|
||||
public void testHdfsExportPoint() throws IOException {
|
||||
NfsConfiguration config = new NfsConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
// Use emphral port in case tests are running in parallel
|
||||
config.setInt("nfs3.mountd.port", 0);
|
||||
config.setInt("nfs3.server.port", 0);
|
||||
config.set("nfs.http.address", "0.0.0.0:0");
|
||||
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
|
||||
// Start nfs
|
||||
final Nfs3 nfsServer = new Nfs3(config);
|
||||
nfsServer.startServiceInternal(false);
|
||||
|
||||
Mountd mountd = nfsServer.getMountd();
|
||||
RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram();
|
||||
assertTrue(rpcMount.getExports().size() == 1);
|
||||
|
||||
String exportInMountd = rpcMount.getExports().get(0);
|
||||
assertTrue(exportInMountd.equals("/"));
|
||||
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testViewFsExportPoint() throws IOException {
|
||||
NfsConfiguration config = new NfsConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
String clusterName = RandomStringUtils.randomAlphabetic(10);
|
||||
|
||||
String exportPoint = "/hdfs1,/hdfs2";
|
||||
config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, exportPoint);
|
||||
config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
||||
FsConstants.VIEWFS_SCHEME + "://" + clusterName);
|
||||
// Use emphral port in case tests are running in parallel
|
||||
config.setInt("nfs3.mountd.port", 0);
|
||||
config.setInt("nfs3.server.port", 0);
|
||||
config.set("nfs.http.address", "0.0.0.0:0");
|
||||
|
||||
try {
|
||||
cluster =
|
||||
new MiniDFSCluster.Builder(config).nnTopology(
|
||||
MiniDFSNNTopology.simpleFederatedTopology(2))
|
||||
.numDataNodes(2)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem hdfs1 = cluster.getFileSystem(0);
|
||||
DistributedFileSystem hdfs2 = cluster.getFileSystem(1);
|
||||
cluster.waitActive();
|
||||
Path base1 = new Path("/user1");
|
||||
Path base2 = new Path("/user2");
|
||||
hdfs1.delete(base1, true);
|
||||
hdfs2.delete(base2, true);
|
||||
hdfs1.mkdirs(base1);
|
||||
hdfs2.mkdirs(base2);
|
||||
ConfigUtil.addLink(config, clusterName, "/hdfs1",
|
||||
hdfs1.makeQualified(base1).toUri());
|
||||
ConfigUtil.addLink(config, clusterName, "/hdfs2",
|
||||
hdfs2.makeQualified(base2).toUri());
|
||||
|
||||
// Start nfs
|
||||
final Nfs3 nfsServer = new Nfs3(config);
|
||||
nfsServer.startServiceInternal(false);
|
||||
|
||||
Mountd mountd = nfsServer.getMountd();
|
||||
RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram();
|
||||
assertTrue(rpcMount.getExports().size() == 2);
|
||||
|
||||
String exportInMountd1 = rpcMount.getExports().get(0);
|
||||
assertTrue(exportInMountd1.equals("/hdfs1"));
|
||||
|
||||
String exportInMountd2 = rpcMount.getExports().get(1);
|
||||
assertTrue(exportInMountd2.equals("/hdfs2"));
|
||||
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testViewFsInternalExportPoint() throws IOException {
|
||||
NfsConfiguration config = new NfsConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
String clusterName = RandomStringUtils.randomAlphabetic(10);
|
||||
|
||||
String exportPoint = "/hdfs1/subpath";
|
||||
config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY, exportPoint);
|
||||
config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
||||
FsConstants.VIEWFS_SCHEME + "://" + clusterName);
|
||||
// Use emphral port in case tests are running in parallel
|
||||
config.setInt("nfs3.mountd.port", 0);
|
||||
config.setInt("nfs3.server.port", 0);
|
||||
config.set("nfs.http.address", "0.0.0.0:0");
|
||||
|
||||
try {
|
||||
cluster =
|
||||
new MiniDFSCluster.Builder(config).nnTopology(
|
||||
MiniDFSNNTopology.simpleFederatedTopology(2))
|
||||
.numDataNodes(2)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem hdfs1 = cluster.getFileSystem(0);
|
||||
DistributedFileSystem hdfs2 = cluster.getFileSystem(1);
|
||||
cluster.waitActive();
|
||||
Path base1 = new Path("/user1");
|
||||
Path base2 = new Path("/user2");
|
||||
hdfs1.delete(base1, true);
|
||||
hdfs2.delete(base2, true);
|
||||
hdfs1.mkdirs(base1);
|
||||
hdfs2.mkdirs(base2);
|
||||
ConfigUtil.addLink(config, clusterName, "/hdfs1",
|
||||
hdfs1.makeQualified(base1).toUri());
|
||||
ConfigUtil.addLink(config, clusterName, "/hdfs2",
|
||||
hdfs2.makeQualified(base2).toUri());
|
||||
Path subPath = new Path(base1, "subpath");
|
||||
hdfs1.delete(subPath, true);
|
||||
hdfs1.mkdirs(subPath);
|
||||
|
||||
// Start nfs
|
||||
final Nfs3 nfsServer = new Nfs3(config);
|
||||
nfsServer.startServiceInternal(false);
|
||||
|
||||
Mountd mountd = nfsServer.getMountd();
|
||||
RpcProgramMountd rpcMount = (RpcProgramMountd) mountd.getRpcProgram();
|
||||
assertTrue(rpcMount.getExports().size() == 1);
|
||||
|
||||
String exportInMountd = rpcMount.getExports().get(0);
|
||||
assertTrue(exportInMountd.equals(exportPoint));
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHdfsInternalExportPoint() throws IOException {
|
||||
NfsConfiguration config = new NfsConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
|
||||
@ -40,10 +192,15 @@ public void testExportPoint() throws IOException {
|
||||
// Use emphral port in case tests are running in parallel
|
||||
config.setInt("nfs3.mountd.port", 0);
|
||||
config.setInt("nfs3.server.port", 0);
|
||||
|
||||
config.set("nfs.http.address", "0.0.0.0:0");
|
||||
Path base = new Path(exportPoint);
|
||||
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem hdfs = cluster.getFileSystem(0);
|
||||
hdfs.delete(base, true);
|
||||
hdfs.mkdirs(base);
|
||||
|
||||
// Start nfs
|
||||
final Nfs3 nfsServer = new Nfs3(config);
|
||||
|
@ -21,9 +21,7 @@
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -31,8 +29,6 @@
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.nfs.nfs3.Nfs3;
|
||||
import org.apache.hadoop.hdfs.nfs.nfs3.RpcProgramNfs3;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
@ -40,15 +36,10 @@
|
||||
import org.apache.hadoop.nfs.nfs3.response.READDIR3Response.Entry3;
|
||||
import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response;
|
||||
import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response.EntryPlus3;
|
||||
import org.apache.hadoop.oncrpc.RpcInfo;
|
||||
import org.apache.hadoop.oncrpc.RpcMessage;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
import org.apache.hadoop.oncrpc.security.SecurityHandler;
|
||||
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
@ -119,10 +110,11 @@ public void testReaddirBasic() throws IOException {
|
||||
// Get inodeId of /tmp
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
|
||||
// Create related part of the XDR request
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
handle.serialize(xdr_req);
|
||||
xdr_req.writeLongAsHyper(0); // cookie
|
||||
xdr_req.writeLongAsHyper(0); // verifier
|
||||
@ -139,7 +131,7 @@ public void testReaddirBasic() throws IOException {
|
||||
|
||||
// Create related part of the XDR request
|
||||
xdr_req = new XDR();
|
||||
handle = new FileHandle(dirId);
|
||||
handle = new FileHandle(dirId, namenodeId);
|
||||
handle.serialize(xdr_req);
|
||||
xdr_req.writeLongAsHyper(f2Id); // cookie
|
||||
xdr_req.writeLongAsHyper(0); // verifier
|
||||
@ -167,10 +159,11 @@ public void testReaddirPlus() throws IOException {
|
||||
// Get inodeId of /tmp
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
|
||||
// Create related part of the XDR request
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
handle.serialize(xdr_req);
|
||||
xdr_req.writeLongAsHyper(0); // cookie
|
||||
xdr_req.writeLongAsHyper(0); // verifier
|
||||
@ -189,7 +182,7 @@ public void testReaddirPlus() throws IOException {
|
||||
|
||||
// Create related part of the XDR request
|
||||
xdr_req = new XDR();
|
||||
handle = new FileHandle(dirId);
|
||||
handle = new FileHandle(dirId, namenodeId);
|
||||
handle.serialize(xdr_req);
|
||||
xdr_req.writeLongAsHyper(f2Id); // cookie
|
||||
xdr_req.writeLongAsHyper(0); // verifier
|
||||
|
@ -186,7 +186,8 @@ public void createFiles() throws IllegalArgumentException, IOException {
|
||||
public void testGetattr() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
|
||||
long dirId = status.getFileId();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
XDR xdr_req = new XDR();
|
||||
GETATTR3Request req = new GETATTR3Request(handle);
|
||||
req.serialize(xdr_req);
|
||||
@ -209,8 +210,9 @@ public void testGetattr() throws Exception {
|
||||
public void testSetattr() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
SetAttr3 symAttr = new SetAttr3(0, 1, 0, 0, null, null,
|
||||
EnumSet.of(SetAttrField.UID));
|
||||
SETATTR3Request req = new SETATTR3Request(handle, symAttr, false, null);
|
||||
@ -234,7 +236,8 @@ public void testSetattr() throws Exception {
|
||||
public void testLookup() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
LOOKUP3Request lookupReq = new LOOKUP3Request(handle, "bar");
|
||||
XDR xdr_req = new XDR();
|
||||
lookupReq.serialize(xdr_req);
|
||||
@ -257,7 +260,8 @@ public void testLookup() throws Exception {
|
||||
public void testAccess() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
|
||||
long dirId = status.getFileId();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
XDR xdr_req = new XDR();
|
||||
ACCESS3Request req = new ACCESS3Request(handle);
|
||||
req.serialize(xdr_req);
|
||||
@ -281,8 +285,9 @@ public void testReadlink() throws Exception {
|
||||
// Create a symlink first.
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
SYMLINK3Request req = new SYMLINK3Request(handle, "fubar", new SetAttr3(),
|
||||
"bar");
|
||||
req.serialize(xdr_req);
|
||||
@ -316,7 +321,8 @@ public void testReadlink() throws Exception {
|
||||
public void testRead() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
|
||||
long dirId = status.getFileId();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
|
||||
READ3Request readReq = new READ3Request(handle, 0, 5);
|
||||
XDR xdr_req = new XDR();
|
||||
@ -373,7 +379,8 @@ private void createFileUsingNfs(String fileName, byte[] buffer)
|
||||
|
||||
final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName);
|
||||
final long dirId = status.getFileId();
|
||||
final FileHandle handle = new FileHandle(dirId);
|
||||
final int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
final FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
|
||||
final WRITE3Request writeReq = new WRITE3Request(handle, 0,
|
||||
buffer.length, WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer));
|
||||
@ -390,7 +397,8 @@ private byte[] getFileContentsUsingNfs(String fileName, int len)
|
||||
throws Exception {
|
||||
final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName);
|
||||
final long dirId = status.getFileId();
|
||||
final FileHandle handle = new FileHandle(dirId);
|
||||
final int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
final FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
|
||||
final READ3Request readReq = new READ3Request(handle, 0, len);
|
||||
final XDR xdr_req = new XDR();
|
||||
@ -422,7 +430,8 @@ private byte[] getFileContentsUsingDfs(String fileName, int len)
|
||||
private void commit(String fileName, int len) throws Exception {
|
||||
final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName);
|
||||
final long dirId = status.getFileId();
|
||||
final FileHandle handle = new FileHandle(dirId);
|
||||
final int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
final FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
final XDR xdr_req = new XDR();
|
||||
final COMMIT3Request req = new COMMIT3Request(handle, 0, len);
|
||||
req.serialize(xdr_req);
|
||||
@ -439,7 +448,8 @@ private void commit(String fileName, int len) throws Exception {
|
||||
public void testWrite() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
|
||||
long dirId = status.getFileId();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
|
||||
byte[] buffer = new byte[10];
|
||||
for (int i = 0; i < 10; i++) {
|
||||
@ -469,8 +479,9 @@ public void testWrite() throws Exception {
|
||||
public void testCreate() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
CREATE3Request req = new CREATE3Request(handle, "fubar",
|
||||
Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
|
||||
req.serialize(xdr_req);
|
||||
@ -493,8 +504,9 @@ public void testCreate() throws Exception {
|
||||
public void testMkdir() throws Exception {//FixME
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
MKDIR3Request req = new MKDIR3Request(handle, "fubar1", new SetAttr3());
|
||||
req.serialize(xdr_req);
|
||||
|
||||
@ -520,8 +532,9 @@ public void testMkdir() throws Exception {//FixME
|
||||
public void testSymlink() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
SYMLINK3Request req = new SYMLINK3Request(handle, "fubar", new SetAttr3(),
|
||||
"bar");
|
||||
req.serialize(xdr_req);
|
||||
@ -544,8 +557,9 @@ public void testSymlink() throws Exception {
|
||||
public void testRemove() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
REMOVE3Request req = new REMOVE3Request(handle, "bar");
|
||||
req.serialize(xdr_req);
|
||||
|
||||
@ -567,8 +581,9 @@ public void testRemove() throws Exception {
|
||||
public void testRmdir() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
RMDIR3Request req = new RMDIR3Request(handle, "foo");
|
||||
req.serialize(xdr_req);
|
||||
|
||||
@ -590,8 +605,9 @@ public void testRmdir() throws Exception {
|
||||
public void testRename() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
XDR xdr_req = new XDR();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
RENAME3Request req = new RENAME3Request(handle, "bar", handle, "fubar");
|
||||
req.serialize(xdr_req);
|
||||
|
||||
@ -613,7 +629,8 @@ public void testRename() throws Exception {
|
||||
public void testReaddir() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
XDR xdr_req = new XDR();
|
||||
READDIR3Request req = new READDIR3Request(handle, 0, 0, 100);
|
||||
req.serialize(xdr_req);
|
||||
@ -636,7 +653,8 @@ public void testReaddir() throws Exception {
|
||||
public void testReaddirplus() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir);
|
||||
long dirId = status.getFileId();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
XDR xdr_req = new XDR();
|
||||
READDIRPLUS3Request req = new READDIRPLUS3Request(handle, 0, 0, 3, 2);
|
||||
req.serialize(xdr_req);
|
||||
@ -659,7 +677,8 @@ public void testReaddirplus() throws Exception {
|
||||
public void testFsstat() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
|
||||
long dirId = status.getFileId();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
XDR xdr_req = new XDR();
|
||||
FSSTAT3Request req = new FSSTAT3Request(handle);
|
||||
req.serialize(xdr_req);
|
||||
@ -682,7 +701,8 @@ public void testFsstat() throws Exception {
|
||||
public void testFsinfo() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
|
||||
long dirId = status.getFileId();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
XDR xdr_req = new XDR();
|
||||
FSINFO3Request req = new FSINFO3Request(handle);
|
||||
req.serialize(xdr_req);
|
||||
@ -705,7 +725,8 @@ public void testFsinfo() throws Exception {
|
||||
public void testPathconf() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
|
||||
long dirId = status.getFileId();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
XDR xdr_req = new XDR();
|
||||
PATHCONF3Request req = new PATHCONF3Request(handle);
|
||||
req.serialize(xdr_req);
|
||||
@ -728,7 +749,8 @@ public void testPathconf() throws Exception {
|
||||
public void testCommit() throws Exception {
|
||||
HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");
|
||||
long dirId = status.getFileId();
|
||||
FileHandle handle = new FileHandle(dirId);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
XDR xdr_req = new XDR();
|
||||
COMMIT3Request req = new COMMIT3Request(handle, 0, 5);
|
||||
req.serialize(xdr_req);
|
||||
|
@ -0,0 +1,330 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.nfs.nfs3;
|
||||
|
||||
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FsConstants;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.viewfs.ConfigUtil;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.client.HdfsAdmin;
|
||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
|
||||
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.nfs.mount.RpcProgramMountd;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.nfs.nfs3.FileHandle;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
|
||||
import org.apache.hadoop.nfs.nfs3.Nfs3Status;
|
||||
import org.apache.hadoop.nfs.nfs3.request.GETATTR3Request;
|
||||
import org.apache.hadoop.nfs.nfs3.request.RENAME3Request;
|
||||
import org.apache.hadoop.nfs.nfs3.response.GETATTR3Response;
|
||||
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
|
||||
import org.apache.hadoop.nfs.nfs3.response.RENAME3Response;
|
||||
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
|
||||
import org.apache.hadoop.oncrpc.XDR;
|
||||
import org.apache.hadoop.oncrpc.security.SecurityHandler;
|
||||
import org.apache.hadoop.security.authorize.DefaultImpersonationProvider;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.Assert;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
||||
/**
|
||||
* Tests for {@link RpcProgramNfs3} with
|
||||
* {@link org.apache.hadoop.fs.viewfs.ViewFileSystem}.
|
||||
*/
|
||||
public class TestViewfsWithNfs3 {
|
||||
private static DistributedFileSystem hdfs1;
|
||||
private static DistributedFileSystem hdfs2;
|
||||
private static MiniDFSCluster cluster = null;
|
||||
private static NfsConfiguration config = new NfsConfiguration();
|
||||
private static HdfsAdmin dfsAdmin1;
|
||||
private static HdfsAdmin dfsAdmin2;
|
||||
private static FileSystem viewFs;
|
||||
|
||||
private static NameNode nn1;
|
||||
private static NameNode nn2;
|
||||
private static Nfs3 nfs;
|
||||
private static RpcProgramNfs3 nfsd;
|
||||
private static RpcProgramMountd mountd;
|
||||
private static SecurityHandler securityHandler;
|
||||
private static FileSystemTestHelper fsHelper;
|
||||
private static File testRootDir;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
String currentUser = System.getProperty("user.name");
|
||||
|
||||
config.set("fs.permissions.umask-mode", "u=rwx,g=,o=");
|
||||
config.set(DefaultImpersonationProvider.getTestProvider()
|
||||
.getProxySuperuserGroupConfKey(currentUser), "*");
|
||||
config.set(DefaultImpersonationProvider.getTestProvider()
|
||||
.getProxySuperuserIpConfKey(currentUser), "*");
|
||||
fsHelper = new FileSystemTestHelper();
|
||||
// Set up java key store
|
||||
String testRoot = fsHelper.getTestRootDir();
|
||||
testRootDir = new File(testRoot).getAbsoluteFile();
|
||||
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
|
||||
config.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
|
||||
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
|
||||
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
|
||||
|
||||
cluster =
|
||||
new MiniDFSCluster.Builder(config).nnTopology(
|
||||
MiniDFSNNTopology.simpleFederatedTopology(2))
|
||||
.numDataNodes(2)
|
||||
.build();
|
||||
cluster.waitActive();
|
||||
hdfs1 = cluster.getFileSystem(0);
|
||||
hdfs2 = cluster.getFileSystem(1);
|
||||
|
||||
nn1 = cluster.getNameNode(0);
|
||||
nn2 = cluster.getNameNode(1);
|
||||
nn2.getServiceRpcAddress();
|
||||
dfsAdmin1 = new HdfsAdmin(cluster.getURI(0), config);
|
||||
dfsAdmin2 = new HdfsAdmin(cluster.getURI(1), config);
|
||||
|
||||
// Use ephemeral ports in case tests are running in parallel
|
||||
config.setInt("nfs3.mountd.port", 0);
|
||||
config.setInt("nfs3.server.port", 0);
|
||||
config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
||||
FsConstants.VIEWFS_URI.toString());
|
||||
// Start NFS with allowed.hosts set to "* rw"
|
||||
config.set("dfs.nfs.exports.allowed.hosts", "* rw");
|
||||
|
||||
Path base1 = new Path("/user1");
|
||||
Path base2 = new Path("/user2");
|
||||
hdfs1.delete(base1, true);
|
||||
hdfs2.delete(base2, true);
|
||||
hdfs1.mkdirs(base1);
|
||||
hdfs2.mkdirs(base2);
|
||||
ConfigUtil.addLink(config, "/hdfs1", hdfs1.makeQualified(base1).toUri());
|
||||
ConfigUtil.addLink(config, "/hdfs2", hdfs2.makeQualified(base2).toUri());
|
||||
|
||||
|
||||
viewFs = FileSystem.get(config);
|
||||
config.setStrings(NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY,
|
||||
"/hdfs1", "/hdfs2");
|
||||
|
||||
nfs = new Nfs3(config);
|
||||
nfs.startServiceInternal(false);
|
||||
nfsd = (RpcProgramNfs3) nfs.getRpcProgram();
|
||||
mountd = (RpcProgramMountd) nfs.getMountd().getRpcProgram();
|
||||
|
||||
// Mock SecurityHandler which returns system user.name
|
||||
securityHandler = Mockito.mock(SecurityHandler.class);
|
||||
Mockito.when(securityHandler.getUser()).thenReturn(currentUser);
|
||||
viewFs.delete(new Path("/hdfs2/dir2"), true);
|
||||
viewFs.mkdirs(new Path("/hdfs2/dir2"));
|
||||
DFSTestUtil.createFile(viewFs, new Path("/hdfs1/file1"), 0, (short) 1, 0);
|
||||
DFSTestUtil.createFile(viewFs, new Path("/hdfs1/file2"), 0, (short) 1, 0);
|
||||
DFSTestUtil.createFile(viewFs, new Path("/hdfs1/write1"), 0, (short) 1, 0);
|
||||
DFSTestUtil.createFile(viewFs, new Path("/hdfs2/write2"), 0, (short) 1, 0);
|
||||
DFSTestUtil.createFile(viewFs, new Path("/hdfs1/renameMultiNN"),
|
||||
0, (short) 1, 0);
|
||||
DFSTestUtil.createFile(viewFs, new Path("/hdfs1/renameSingleNN"),
|
||||
0, (short) 1, 0);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() throws Exception {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNumExports() throws Exception {
|
||||
Assert.assertEquals(mountd.getExports().size(),
|
||||
viewFs.getChildFileSystems().length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPaths() throws Exception {
|
||||
Assert.assertEquals(hdfs1.resolvePath(new Path("/user1/file1")),
|
||||
viewFs.resolvePath(new Path("/hdfs1/file1")));
|
||||
Assert.assertEquals(hdfs1.resolvePath(new Path("/user1/file2")),
|
||||
viewFs.resolvePath(new Path("/hdfs1/file2")));
|
||||
Assert.assertEquals(hdfs2.resolvePath(new Path("/user2/dir2")),
|
||||
viewFs.resolvePath(new Path("/hdfs2/dir2")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFileStatus() throws Exception {
|
||||
HdfsFileStatus status = nn1.getRpcServer().getFileInfo("/user1/file1");
|
||||
FileStatus st = viewFs.getFileStatus(new Path("/hdfs1/file1"));
|
||||
Assert.assertEquals(st.isDirectory(), status.isDirectory());
|
||||
|
||||
HdfsFileStatus status2 = nn2.getRpcServer().getFileInfo("/user2/dir2");
|
||||
FileStatus st2 = viewFs.getFileStatus(new Path("/hdfs2/dir2"));
|
||||
Assert.assertEquals(st2.isDirectory(), status2.isDirectory());
|
||||
}
|
||||
|
||||
// Test for getattr
|
||||
private void testNfsGetAttrResponse(long fileId, int namenodeId,
|
||||
int expectedStatus) {
|
||||
FileHandle handle = new FileHandle(fileId, namenodeId);
|
||||
XDR xdrReq = new XDR();
|
||||
GETATTR3Request req = new GETATTR3Request(handle);
|
||||
req.serialize(xdrReq);
|
||||
GETATTR3Response response = nfsd.getattr(xdrReq.asReadOnlyWrap(),
|
||||
securityHandler, new InetSocketAddress("localhost", 1234));
|
||||
Assert.assertEquals("Incorrect return code",
|
||||
expectedStatus, response.getStatus());
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testNfsAccessNN1() throws Exception {
|
||||
HdfsFileStatus status = nn1.getRpcServer().getFileInfo("/user1/file1");
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config, hdfs1.getUri());
|
||||
testNfsGetAttrResponse(status.getFileId(), namenodeId, Nfs3Status.NFS3_OK);
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testNfsAccessNN2() throws Exception {
|
||||
HdfsFileStatus status = nn2.getRpcServer().getFileInfo("/user2/dir2");
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config, hdfs2.getUri());
|
||||
testNfsGetAttrResponse(status.getFileId(), namenodeId, Nfs3Status.NFS3_OK);
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testWrongNfsAccess() throws Exception {
|
||||
DFSTestUtil.createFile(viewFs, new Path("/hdfs1/file3"), 0, (short) 1, 0);
|
||||
HdfsFileStatus status = nn1.getRpcServer().getFileInfo("/user1/file3");
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config, hdfs2.getUri());
|
||||
testNfsGetAttrResponse(status.getFileId(), namenodeId,
|
||||
Nfs3Status.NFS3ERR_IO);
|
||||
}
|
||||
|
||||
// Test for write
|
||||
private void testNfsWriteResponse(long dirId, int namenodeId)
|
||||
throws Exception {
|
||||
FileHandle handle = new FileHandle(dirId, namenodeId);
|
||||
|
||||
byte[] buffer = new byte[10];
|
||||
for (int i = 0; i < 10; i++) {
|
||||
buffer[i] = (byte) i;
|
||||
}
|
||||
|
||||
WRITE3Request writeReq = new WRITE3Request(handle, 0, 10,
|
||||
Nfs3Constant.WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer));
|
||||
XDR xdrReq = new XDR();
|
||||
writeReq.serialize(xdrReq);
|
||||
|
||||
// Attempt by a priviledged user should pass.
|
||||
WRITE3Response response = nfsd.write(xdrReq.asReadOnlyWrap(),
|
||||
null, 1, securityHandler,
|
||||
new InetSocketAddress("localhost", 1234));
|
||||
Assert.assertEquals("Incorrect response:", null, response);
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testNfsWriteNN1() throws Exception {
|
||||
HdfsFileStatus status = nn1.getRpcServer().getFileInfo("/user1/write1");
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config, hdfs1.getUri());
|
||||
testNfsWriteResponse(status.getFileId(), namenodeId);
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testNfsWriteNN2() throws Exception {
|
||||
HdfsFileStatus status = nn2.getRpcServer().getFileInfo("/user2/write2");
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config, hdfs2.getUri());
|
||||
testNfsWriteResponse(status.getFileId(), namenodeId);
|
||||
}
|
||||
|
||||
// Test for rename
|
||||
private void testNfsRename(FileHandle fromDirHandle, String fromFileName,
|
||||
FileHandle toDirHandle, String toFileName,
|
||||
int expectedStatus) throws Exception {
|
||||
XDR xdrReq = new XDR();
|
||||
RENAME3Request req = new RENAME3Request(fromDirHandle, fromFileName,
|
||||
toDirHandle, toFileName);
|
||||
req.serialize(xdrReq);
|
||||
|
||||
// Attempt by a privileged user should pass.
|
||||
RENAME3Response response = nfsd.rename(xdrReq.asReadOnlyWrap(),
|
||||
securityHandler, new InetSocketAddress("localhost", 1234));
|
||||
assertEquals(expectedStatus, response.getStatus());
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testNfsRenameMultiNN() throws Exception {
|
||||
HdfsFileStatus fromFileStatus = nn1.getRpcServer().getFileInfo("/user1");
|
||||
int fromNNId = Nfs3Utils.getNamenodeId(config, hdfs1.getUri());
|
||||
FileHandle fromHandle =
|
||||
new FileHandle(fromFileStatus.getFileId(), fromNNId);
|
||||
|
||||
HdfsFileStatus toFileStatus = nn2.getRpcServer().getFileInfo("/user2");
|
||||
int toNNId = Nfs3Utils.getNamenodeId(config, hdfs2.getUri());
|
||||
FileHandle toHandle = new FileHandle(toFileStatus.getFileId(), toNNId);
|
||||
|
||||
HdfsFileStatus statusBeforeRename =
|
||||
nn1.getRpcServer().getFileInfo("/user1/renameMultiNN");
|
||||
Assert.assertEquals(statusBeforeRename.isDirectory(), false);
|
||||
|
||||
testNfsRename(fromHandle, "renameMultiNN",
|
||||
toHandle, "renameMultiNNFail", Nfs3Status.NFS3ERR_INVAL);
|
||||
|
||||
HdfsFileStatus statusAfterRename =
|
||||
nn2.getRpcServer().getFileInfo("/user2/renameMultiNNFail");
|
||||
Assert.assertEquals(statusAfterRename, null);
|
||||
|
||||
statusAfterRename = nn1.getRpcServer().getFileInfo("/user1/renameMultiNN");
|
||||
Assert.assertEquals(statusAfterRename.isDirectory(), false);
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testNfsRenameSingleNN() throws Exception {
|
||||
HdfsFileStatus fromFileStatus = nn1.getRpcServer().getFileInfo("/user1");
|
||||
int fromNNId = Nfs3Utils.getNamenodeId(config, hdfs1.getUri());
|
||||
FileHandle fromHandle =
|
||||
new FileHandle(fromFileStatus.getFileId(), fromNNId);
|
||||
|
||||
HdfsFileStatus statusBeforeRename =
|
||||
nn1.getRpcServer().getFileInfo("/user1/renameSingleNN");
|
||||
Assert.assertEquals(statusBeforeRename.isDirectory(), false);
|
||||
|
||||
testNfsRename(fromHandle, "renameSingleNN",
|
||||
fromHandle, "renameSingleNNSucess", Nfs3Status.NFS3_OK);
|
||||
|
||||
HdfsFileStatus statusAfterRename =
|
||||
nn1.getRpcServer().getFileInfo("/user1/renameSingleNNSucess");
|
||||
Assert.assertEquals(statusAfterRename.isDirectory(), false);
|
||||
|
||||
statusAfterRename =
|
||||
nn1.getRpcServer().getFileInfo("/user1/renameSingleNN");
|
||||
Assert.assertEquals(statusAfterRename, null);
|
||||
}
|
||||
}
|
@ -481,6 +481,7 @@ public void testWriteStableHow() throws IOException, InterruptedException {
|
||||
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
client = new DFSClient(DFSUtilClient.getNNAddress(config), config);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
|
||||
// Use emphral port in case tests are running in parallel
|
||||
config.setInt("nfs3.mountd.port", 0);
|
||||
@ -492,7 +493,7 @@ public void testWriteStableHow() throws IOException, InterruptedException {
|
||||
nfsd = (RpcProgramNfs3) nfs3.getRpcProgram();
|
||||
|
||||
HdfsFileStatus status = client.getFileInfo("/");
|
||||
FileHandle rootHandle = new FileHandle(status.getFileId());
|
||||
FileHandle rootHandle = new FileHandle(status.getFileId(), namenodeId);
|
||||
// Create file1
|
||||
CREATE3Request createReq = new CREATE3Request(rootHandle, "file1",
|
||||
Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0);
|
||||
@ -598,8 +599,9 @@ public void testOOOWrites() throws IOException, InterruptedException {
|
||||
|
||||
DFSClient dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config),
|
||||
config);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
HdfsFileStatus status = dfsClient.getFileInfo("/");
|
||||
FileHandle rootHandle = new FileHandle(status.getFileId());
|
||||
FileHandle rootHandle = new FileHandle(status.getFileId(), namenodeId);
|
||||
|
||||
CREATE3Request createReq = new CREATE3Request(rootHandle,
|
||||
"out-of-order-write" + System.currentTimeMillis(),
|
||||
@ -674,8 +676,9 @@ public void testOverlappingWrites() throws IOException, InterruptedException {
|
||||
|
||||
DFSClient dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config),
|
||||
config);
|
||||
int namenodeId = Nfs3Utils.getNamenodeId(config);
|
||||
HdfsFileStatus status = dfsClient.getFileInfo("/");
|
||||
FileHandle rootHandle = new FileHandle(status.getFileId());
|
||||
FileHandle rootHandle = new FileHandle(status.getFileId(), namenodeId);
|
||||
|
||||
CREATE3Request createReq = new CREATE3Request(rootHandle,
|
||||
"overlapping-writes" + System.currentTimeMillis(),
|
||||
|
Loading…
Reference in New Issue
Block a user