From b6f9d5538cf2b425652687e99503f3d566b2056a Mon Sep 17 00:00:00 2001 From: Brandon Li Date: Tue, 21 Oct 2014 10:20:29 -0700 Subject: [PATCH] HDFS-7259. Unresponseive NFS mount point due to deferred COMMIT response. Contributed by Brandon Li --- .../apache/hadoop/nfs/nfs3/IdUserGroup.java | 4 +- .../hadoop/hdfs/nfs/conf/NfsConfigKeys.java | 3 + .../hadoop/hdfs/nfs/nfs3/OpenFileCtx.java | 83 ++++++++-- .../hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java | 2 +- .../apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java | 2 +- .../hadoop/hdfs/nfs/nfs3/WriteManager.java | 16 +- .../hadoop/hdfs/nfs/nfs3/TestWrites.java | 153 +++++++++++++++++- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + 8 files changed, 238 insertions(+), 28 deletions(-) diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/IdUserGroup.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/IdUserGroup.java index b0374130e1..c174533e8b 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/IdUserGroup.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/IdUserGroup.java @@ -372,7 +372,7 @@ public int getUidAllowingUnknown(String user) { uid = getUid(user); } catch (IOException e) { uid = user.hashCode(); - LOG.info("Can't map user " + user + ". Use its string hashcode:" + uid, e); + LOG.info("Can't map user " + user + ". Use its string hashcode:" + uid); } return uid; } @@ -385,7 +385,7 @@ public int getGidAllowingUnknown(String group) { gid = getGid(group); } catch (IOException e) { gid = group.hashCode(); - LOG.info("Can't map group " + group + ". Use its string hashcode:" + gid, e); + LOG.info("Can't map group " + group + ". Use its string hashcode:" + gid); } return gid; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java index 2f65ce4cba..178d855cb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfigKeys.java @@ -57,4 +57,7 @@ public class NfsConfigKeys { public static final String AIX_COMPAT_MODE_KEY = "nfs.aix.compatibility.mode.enabled"; public static final boolean AIX_COMPAT_MODE_DEFAULT = false; + + public final static String LARGE_FILE_UPLOAD = "nfs.large.file.upload"; + public final static boolean LARGE_FILE_UPLOAD_DEFAULT = true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index 4bf8d27d58..c2866ddb8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; +import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; import org.apache.hadoop.hdfs.nfs.nfs3.WriteCtx.DataState; import org.apache.hadoop.io.BytesWritable.Comparator; import org.apache.hadoop.io.IOUtils; @@ -77,7 +78,26 @@ static enum COMMIT_STATUS { COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, - COMMIT_DO_SYNC; + COMMIT_DO_SYNC, + /** + * Deferred COMMIT response could fail file uploading. The following two + * status are introduced as a solution. 1. if client asks to commit + * non-sequential trunk of data, NFS gateway return success with the hope + * that client will send the prerequisite writes. 2. if client asks to + * commit a sequential trunk(means it can be flushed to HDFS), NFS gateway + * return a special error NFS3ERR_JUKEBOX indicating the client needs to + * retry. Meanwhile, NFS gateway keeps flush data to HDFS and do sync + * eventually. + * + * The reason to let client wait is that, we want the client to wait for the + * last commit. Otherwise, client thinks file upload finished (e.g., cp + * command returns success) but NFS could be still flushing staged data to + * HDFS. However, we don't know which one is the last commit. We make the + * assumption that a commit after sequential writes may be the last. + * Referring HDFS-7259 for more details. + * */ + COMMIT_SPECIAL_WAIT, // scoped pending writes is sequential + COMMIT_SPECIAL_SUCCESS;// scoped pending writes is not sequential } private final DFSClient client; @@ -159,6 +179,7 @@ public String toString() { private RandomAccessFile raf; private final String dumpFilePath; private Daemon dumpThread; + private final boolean uploadLargeFile; private void updateLastAccessTime() { lastAccessTime = Time.monotonicNow(); @@ -200,12 +221,13 @@ private long updateNonSequentialWriteInMemory(long count) { OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdUserGroup iug) { - this(fos, latestAttr, dumpFilePath, client, iug, false); + this(fos, latestAttr, dumpFilePath, client, iug, false, + new NfsConfiguration()); } OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdUserGroup iug, - boolean aixCompatMode) { + boolean aixCompatMode, NfsConfiguration config) { this.fos = fos; this.latestAttr = latestAttr; this.aixCompatMode = aixCompatMode; @@ -233,6 +255,8 @@ private long updateNonSequentialWriteInMemory(long count) { dumpThread = null; this.client = client; this.iug = iug; + this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, + NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT); } public Nfs3FileAttributes getLatestAttr() { @@ -781,12 +805,43 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE; } } - + if (pendingWrites.isEmpty()) { + // Note that, there is no guarantee data is synced. Caller should still + // do a sync here though the output stream might be closed. + return COMMIT_STATUS.COMMIT_FINISHED; + } + long flushed = getFlushedOffset(); if (LOG.isDebugEnabled()) { LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset); } + // Handle large file upload + if (uploadLargeFile && !aixCompatMode) { + long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry() + .getKey().getMax() - 1; + + if (co <= flushed) { + return COMMIT_STATUS.COMMIT_DO_SYNC; + } else if (co < nextOffset.get()) { + if (!fromRead) { + // let client retry the same request, add pending commit to sync later + CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, + preOpAttr); + pendingCommits.put(commitOffset, commitCtx); + } + if (LOG.isDebugEnabled()) { + LOG.debug("return COMMIT_SPECIAL_WAIT"); + } + return COMMIT_STATUS.COMMIT_SPECIAL_WAIT; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("return COMMIT_SPECIAL_SUCCESS"); + } + return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS; + } + } + if (commitOffset > 0) { if (aixCompatMode) { // The AIX NFS client misinterprets RFC-1813 and will always send 4096 @@ -817,20 +872,14 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, Entry key = pendingWrites.firstEntry(); // Commit whole file, commitOffset == 0 - if (pendingWrites.isEmpty()) { - // Note that, there is no guarantee data is synced. TODO: We could still - // do a sync here though the output stream might be closed. - return COMMIT_STATUS.COMMIT_FINISHED; - } else { - if (!fromRead) { - // Insert commit - long maxOffset = key.getKey().getMax() - 1; - Preconditions.checkState(maxOffset > 0); - CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr); - pendingCommits.put(maxOffset, commitCtx); - } - return COMMIT_STATUS.COMMIT_WAIT; + if (!fromRead) { + // Insert commit + long maxOffset = key.getKey().getMax() - 1; + Preconditions.checkState(maxOffset > 0); + CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr); + pendingCommits.put(maxOffset, commitCtx); } + return COMMIT_STATUS.COMMIT_WAIT; } private void addWrite(WriteCtx writeCtx) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 6012b9ba9f..40a9043e7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -944,7 +944,7 @@ preOpDirAttr, new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), // Add open stream OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug, - aixCompatMode); + aixCompatMode, config); fileHandle = new FileHandle(postOpObjAttr.getFileId()); if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) { LOG.warn("Can't add more stream, close it." diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java index 05e0fb7c2c..3b5885ed89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java @@ -41,7 +41,7 @@ class WriteCtx { /** * In memory write data has 3 states. ALLOW_DUMP: not sequential write, still - * wait for prerequisit writes. NO_DUMP: sequential write, no need to dump + * wait for prerequisite writes. NO_DUMP: sequential write, no need to dump * since it will be written to HDFS soon. DUMPED: already dumped to a file. */ public static enum DataState { diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java index 7bddc44610..9bded49ad4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java @@ -178,7 +178,7 @@ void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, String writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY, NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT); openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/" - + fileHandle.getFileId(), dfsClient, iug, aixCompatMode); + + fileHandle.getFileId(), dfsClient, iug, aixCompatMode, config); if (!addOpenFileStream(fileHandle, openFileCtx)) { LOG.info("Can't add new stream. Close it. Tell client to retry."); @@ -236,6 +236,7 @@ int commitBeforeRead(DFSClient dfsClient, FileHandle fileHandle, status = Nfs3Status.NFS3ERR_IO; break; case COMMIT_WAIT: + case COMMIT_SPECIAL_WAIT: /** * This should happen rarely in some possible cases, such as read * request arrives before DFSClient is able to quickly flush data to DN, @@ -244,6 +245,10 @@ int commitBeforeRead(DFSClient dfsClient, FileHandle fileHandle, */ status = Nfs3Status.NFS3ERR_JUKEBOX; break; + case COMMIT_SPECIAL_SUCCESS: + // Read beyond eof could result in partial read + status = Nfs3Status.NFS3_OK; + break; default: LOG.error("Should not get commit return code:" + ret.name()); throw new RuntimeException("Should not get commit return code:" @@ -278,6 +283,12 @@ void handleCommit(DFSClient dfsClient, FileHandle fileHandle, case COMMIT_WAIT: // Do nothing. Commit is async now. return; + case COMMIT_SPECIAL_WAIT: + status = Nfs3Status.NFS3ERR_JUKEBOX; + break; + case COMMIT_SPECIAL_SUCCESS: + status = Nfs3Status.NFS3_OK; + break; default: LOG.error("Should not get commit return code:" + ret.name()); throw new RuntimeException("Should not get commit return code:" @@ -288,8 +299,7 @@ void handleCommit(DFSClient dfsClient, FileHandle fileHandle, // Send out the response Nfs3FileAttributes postOpAttr = null; try { - String fileIdPath = Nfs3Utils.getFileIdPath(preOpAttr.getFileId()); - postOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); + postOpAttr = getFileAttr(dfsClient, new FileHandle(preOpAttr.getFileId()), iug); } catch (IOException e1) { LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileId(), e1); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index 363bc1205e..c156fc0e25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -138,8 +138,10 @@ public void testCheckCommit() throws IOException { HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); Mockito.when(fos.getPos()).thenReturn((long) 0); + NfsConfiguration conf = new NfsConfiguration(); + conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, - new IdUserGroup(new NfsConfiguration())); + new IdUserGroup(conf), false, conf); COMMIT_STATUS ret; @@ -157,6 +159,7 @@ public void testCheckCommit() throws IOException { // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 10); + ctx.setNextOffsetForTest(10); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); // Do_SYNC state will be updated to FINISHED after data sync @@ -192,15 +195,85 @@ public void testCheckCommit() throws IOException { Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); } + @Test + // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with + // large file upload option. + public void testCheckCommitLargeFileUpload() throws IOException { + DFSClient dfsClient = Mockito.mock(DFSClient.class); + Nfs3FileAttributes attr = new Nfs3FileAttributes(); + HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); + Mockito.when(fos.getPos()).thenReturn((long) 0); + + NfsConfiguration conf = new NfsConfiguration(); + conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true); + OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, + new IdUserGroup(conf), false, conf); + + COMMIT_STATUS ret; + + // Test inactive open file context + ctx.setActiveStatusForTest(false); + Channel ch = Mockito.mock(Channel.class); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX); + + ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), + new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE); + + // Test request with non zero commit offset + ctx.setActiveStatusForTest(true); + Mockito.when(fos.getPos()).thenReturn((long) 10); + ctx.setNextOffsetForTest(10); + COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); + Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); + // Do_SYNC state will be updated to FINISHED after data sync + ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); + + status = ctx.checkCommitInternal(10, ch, 1, attr, false); + Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); + ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); + + ConcurrentNavigableMap commits = ctx + .getPendingCommitsForTest(); + Assert.assertTrue(commits.size() == 0); + ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS); + Assert.assertTrue(commits.size() == 0); + + // Test request with zero commit offset + commits.remove(new Long(11)); + // There is one pending write [5,10] + ret = ctx.checkCommitInternal(0, ch, 1, attr, false); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC); + + Mockito.when(fos.getPos()).thenReturn((long) 6); + ret = ctx.checkCommitInternal(8, ch, 1, attr, false); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); + Assert.assertTrue(commits.size() == 1); + long key = commits.firstKey(); + Assert.assertTrue(key == 8); + + // Empty pending writes + ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); + } + @Test public void testCheckCommitAixCompatMode() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); Nfs3FileAttributes attr = new Nfs3FileAttributes(); HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); - // Last argument "true" here to enable AIX compatibility mode. + NfsConfiguration conf = new NfsConfiguration(); + conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); + // Enable AIX compatibility mode. OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, - new IdUserGroup(new NfsConfiguration()), true); + new IdUserGroup(new NfsConfiguration()), true, conf); // Test fall-through to pendingWrites check in the event that commitOffset // is greater than the number of bytes we've so far flushed. @@ -210,6 +283,8 @@ public void testCheckCommitAixCompatMode() throws IOException { // Test the case when we actually have received more bytes than we're trying // to commit. + ctx.getPendingWritesForTest().put(new OffsetRange(0, 10), + new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); Mockito.when(fos.getPos()).thenReturn((long) 10); status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); @@ -226,8 +301,9 @@ public void testCheckCommitFromRead() throws IOException { Mockito.when(fos.getPos()).thenReturn((long) 0); NfsConfiguration config = new NfsConfiguration(); + config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, - new IdUserGroup(config)); + new IdUserGroup(config), false, config); FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath" COMMIT_STATUS ret; @@ -285,6 +361,75 @@ public void testCheckCommitFromRead() throws IOException { assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); } + @Test + // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option + public void testCheckCommitFromReadLargeFileUpload() throws IOException { + DFSClient dfsClient = Mockito.mock(DFSClient.class); + Nfs3FileAttributes attr = new Nfs3FileAttributes(); + HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); + Mockito.when(fos.getPos()).thenReturn((long) 0); + NfsConfiguration config = new NfsConfiguration(); + + config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true); + OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, + new IdUserGroup(config), false, config); + + FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath" + COMMIT_STATUS ret; + WriteManager wm = new WriteManager(new IdUserGroup(config), config, false); + assertTrue(wm.addOpenFileStream(h, ctx)); + + // Test inactive open file context + ctx.setActiveStatusForTest(false); + Channel ch = Mockito.mock(Channel.class); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); + assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret); + assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); + + ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), + new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret); + assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0)); + + // Test request with non zero commit offset + ctx.setActiveStatusForTest(true); + Mockito.when(fos.getPos()).thenReturn((long) 10); + COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false); + assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status); + // Do_SYNC state will be updated to FINISHED after data sync + ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); + assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5)); + + status = ctx.checkCommitInternal(10, ch, 1, attr, true); + assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); + ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); + assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10)); + + ConcurrentNavigableMap commits = ctx + .getPendingCommitsForTest(); + assertTrue(commits.size() == 0); + ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret); + assertEquals(0, commits.size()); // commit triggered by read doesn't wait + assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 11)); + + // Test request with zero commit offset + // There is one pending write [5,10] + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); + assertEquals(0, commits.size()); + assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); + + // Empty pending writes + ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); + assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); + } + private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime) throws InterruptedException { int waitedTime = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ac5010f3ea..dd11cf526e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -976,6 +976,9 @@ Release 2.6.0 - UNRELEASED HDFS-7260. Change DFSOutputStream.MAX_PACKETS to be configurable. (szetszwo) + HDFS-7259. Unresponseive NFS mount point due to deferred COMMIT response. + (brandonli) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an