From 30b8ef91a32ddf1fe3756bae6d7dc538a150bdc4 Mon Sep 17 00:00:00 2001 From: Brandon Li Date: Wed, 28 Aug 2013 17:23:55 +0000 Subject: [PATCH] HDFS-5078 Support file append in NFSv3 gateway to enable data streaming to HDFS. Contributed by Brandon Li git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1518292 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/nfs/nfs3/Nfs3Constant.java | 4 ++ .../hadoop/hdfs/nfs/nfs3/OpenFileCtx.java | 13 ++++-- .../hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java | 14 ++++-- .../hadoop/hdfs/nfs/nfs3/WriteManager.java | 45 ++++++++++++++++--- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ 5 files changed, 65 insertions(+), 14 deletions(-) diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Constant.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Constant.java index 1701cc12dd..8e9a8f1076 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Constant.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Constant.java @@ -201,4 +201,8 @@ public static WriteStableHow fromValue(int id) { public static final String EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY = "hdfs.nfs.exports.cache.expirytime.millis"; public static final long EXPORTS_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 15 * 60 * 1000; // 15 min + public static final String FILE_DUMP_DIR_KEY = "dfs.nfs3.dump.dir"; + public static final String FILE_DUMP_DIR_DEFAULT = "/tmp/.hdfs-nfs"; + public static final String ENABLE_FILE_DUMP_KEY = "dfs.nfs3.enableDump"; + public static final boolean ENABLE_FILE_DUMP_DEFAULT = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index e13bebcc6f..12de05e058 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -126,6 +126,9 @@ private long updateNonSequentialWriteInMemory(long count) { nonSequentialWriteInMemory = 0; this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath == null ? false: true; + nextOffset = latestAttr.getSize(); + assert(nextOffset == this.fos.getPos()); + ctxLock = new ReentrantLock(true); } @@ -685,12 +688,14 @@ private void doSingleWrite(final WriteCtx writeCtx) { try { fos.write(data, 0, count); - - if (fos.getPos() != (offset + count)) { + + long flushedOffset = getFlushedOffset(); + if (flushedOffset != (offset + count)) { throw new IOException("output stream is out of sync, pos=" - + fos.getPos() + " and nextOffset should be" + (offset + count)); + + flushedOffset + " and nextOffset should be" + + (offset + count)); } - nextOffset = fos.getPos(); + nextOffset = flushedOffset; // Reduce memory occupation size if request was allowed dumped if (writeCtx.getDataState() == DataState.ALLOW_DUMP) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index d8694198b9..1f39ace973 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Options; @@ -123,7 +124,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { private final Configuration config = new Configuration(); private final WriteManager writeManager; - private final IdUserGroup iug;// = new IdUserGroup(); + private final IdUserGroup iug; private final DFSClientCache clientCache; private final NfsExports exports; @@ -161,10 +162,14 @@ public RpcProgramNfs3(Configuration config) DFSConfigKeys.DFS_REPLICATION_DEFAULT); blockSize = config.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); - bufferSize = config.getInt("io.file.buffer.size", 4096); + bufferSize = config.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); - writeDumpDir = config.get("dfs.nfs3.dump.dir", "/tmp/.hdfs-nfs"); - boolean enableDump = config.getBoolean("dfs.nfs3.enableDump", true); + writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY, + Nfs3Constant.FILE_DUMP_DIR_DEFAULT); + boolean enableDump = config.getBoolean(Nfs3Constant.ENABLE_FILE_DUMP_KEY, + Nfs3Constant.ENABLE_FILE_DUMP_DEFAULT); if (!enableDump) { writeDumpDir = null; } else { @@ -1112,6 +1117,7 @@ public RENAME3Response rename(XDR xdr, RpcAuthSys authSys, InetAddress client) { } } + @Override public SYMLINK3Response symlink(XDR xdr, RpcAuthSys authSys, InetAddress client) { return new SYMLINK3Response(Nfs3Status.NFS3ERR_NOTSUPP); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java index 0e96c506d2..70e9bc396c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java @@ -25,7 +25,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.nfs.NfsFileType; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.IdUserGroup; @@ -48,6 +50,7 @@ public class WriteManager { public static final Log LOG = LogFactory.getLog(WriteManager.class); + private final Configuration config; private final IdUserGroup iug; private final ConcurrentMap openFileMap = Maps .newConcurrentMap(); @@ -76,6 +79,7 @@ void addOpenFileStream(FileHandle h, OpenFileCtx ctx) { WriteManager(IdUserGroup iug, final Configuration config) { this.iug = iug; + this.config = config; streamTimeout = config.getLong("dfs.nfs3.stream.timeout", DEFAULT_STREAM_TIMEOUT); @@ -129,12 +133,41 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, OpenFileCtx openFileCtx = openFileMap.get(fileHandle); if (openFileCtx == null) { LOG.info("No opened stream for fileId:" + fileHandle.getFileId()); - WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), preOpAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, - fileWcc, count, request.getStableHow(), - Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); - return; + + String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle.getFileId()); + HdfsDataOutputStream fos = null; + Nfs3FileAttributes latestAttr = null; + try { + int bufferSize = config.getInt( + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + + fos = dfsClient.append(fileIdPath, bufferSize, null, null); + + latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); + } catch (IOException e) { + LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e); + if (fos != null) { + fos.close(); + } + WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), + preOpAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, + fileWcc, count, request.getStableHow(), + Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid)); + return; + } + + // Add open stream + String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY, + Nfs3Constant.FILE_DUMP_DIR_DEFAULT); + openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/" + + fileHandle.getFileId()); + addOpenFileStream(fileHandle, openFileCtx); + if (LOG.isDebugEnabled()) { + LOG.debug("opened stream for file:" + fileHandle.getFileId()); + } } // Add write into the async job queue diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 48ebda3437..90ba9465e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -307,6 +307,9 @@ Release 2.1.1-beta - UNRELEASED HDFS-4947 Add NFS server export table to control export by hostname or IP range (Jing Zhao via brandonli) + HDFS-5078 Support file append in NFSv3 gateway to enable data streaming + to HDFS (brandonli) + IMPROVEMENTS HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may