HDFS-7387. NFS may only do partial commit due to a race between COMMIT and write. Contributed by Brandon Li
This commit is contained in:
parent
571e9c6232
commit
99d9d0c2d1
@ -816,6 +816,42 @@ public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if the to-commit range is sequential
|
||||||
|
@VisibleForTesting
|
||||||
|
synchronized boolean checkSequential(final long commitOffset,
|
||||||
|
final long nextOffset) {
|
||||||
|
Preconditions.checkState(commitOffset >= nextOffset, "commitOffset "
|
||||||
|
+ commitOffset + " less than nextOffset " + nextOffset);
|
||||||
|
long offset = nextOffset;
|
||||||
|
Iterator<OffsetRange> it = pendingWrites.descendingKeySet().iterator();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
OffsetRange range = it.next();
|
||||||
|
if (range.getMin() != offset) {
|
||||||
|
// got a hole
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
offset = range.getMax();
|
||||||
|
if (offset > commitOffset) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// there is gap between the last pending write and commitOffset
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private COMMIT_STATUS handleSpecialWait(boolean fromRead, long commitOffset,
|
||||||
|
Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
|
||||||
|
if (!fromRead) {
|
||||||
|
// let client retry the same request, add pending commit to sync later
|
||||||
|
CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, preOpAttr);
|
||||||
|
pendingCommits.put(commitOffset, commitCtx);
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("return COMMIT_SPECIAL_WAIT");
|
||||||
|
}
|
||||||
|
return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
|
synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
|
||||||
Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) {
|
Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) {
|
||||||
@ -827,17 +863,34 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
|
|||||||
return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
|
return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pendingWrites.isEmpty()) {
|
|
||||||
// Note that, there is no guarantee data is synced. Caller should still
|
|
||||||
// do a sync here though the output stream might be closed.
|
|
||||||
return COMMIT_STATUS.COMMIT_FINISHED;
|
|
||||||
}
|
|
||||||
|
|
||||||
long flushed = getFlushedOffset();
|
long flushed = getFlushedOffset();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
|
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset
|
||||||
|
+ "nextOffset=" + nextOffset.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pendingWrites.isEmpty()) {
|
||||||
|
if (aixCompatMode) {
|
||||||
|
// Note that, there is no guarantee data is synced. Caller should still
|
||||||
|
// do a sync here though the output stream might be closed.
|
||||||
|
return COMMIT_STATUS.COMMIT_FINISHED;
|
||||||
|
} else {
|
||||||
|
if (flushed < nextOffset.get()) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("get commit while still writing to the requested offset,"
|
||||||
|
+ " with empty queue");
|
||||||
|
}
|
||||||
|
return handleSpecialWait(fromRead, nextOffset.get(), channel, xid,
|
||||||
|
preOpAttr);
|
||||||
|
} else {
|
||||||
|
return COMMIT_STATUS.COMMIT_FINISHED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Preconditions.checkState(flushed <= nextOffset.get(), "flushed " + flushed
|
||||||
|
+ " is larger than nextOffset " + nextOffset.get());
|
||||||
// Handle large file upload
|
// Handle large file upload
|
||||||
if (uploadLargeFile && !aixCompatMode) {
|
if (uploadLargeFile && !aixCompatMode) {
|
||||||
long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry()
|
long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry()
|
||||||
@ -846,16 +899,14 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
|
|||||||
if (co <= flushed) {
|
if (co <= flushed) {
|
||||||
return COMMIT_STATUS.COMMIT_DO_SYNC;
|
return COMMIT_STATUS.COMMIT_DO_SYNC;
|
||||||
} else if (co < nextOffset.get()) {
|
} else if (co < nextOffset.get()) {
|
||||||
if (!fromRead) {
|
|
||||||
// let client retry the same request, add pending commit to sync later
|
|
||||||
CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
|
|
||||||
preOpAttr);
|
|
||||||
pendingCommits.put(commitOffset, commitCtx);
|
|
||||||
}
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("return COMMIT_SPECIAL_WAIT");
|
LOG.debug("get commit while still writing to the requested offset");
|
||||||
}
|
}
|
||||||
return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
|
return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
|
||||||
|
} else {
|
||||||
|
// co >= nextOffset
|
||||||
|
if (checkSequential(co, nextOffset.get())) {
|
||||||
|
return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("return COMMIT_SPECIAL_SUCCESS");
|
LOG.debug("return COMMIT_SPECIAL_SUCCESS");
|
||||||
@ -863,6 +914,7 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
|
|||||||
return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
|
return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (commitOffset > 0) {
|
if (commitOffset > 0) {
|
||||||
if (aixCompatMode) {
|
if (aixCompatMode) {
|
||||||
|
@ -217,14 +217,14 @@ public void testCheckCommitLargeFileUpload() throws IOException {
|
|||||||
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
|
||||||
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);
|
||||||
|
|
||||||
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
|
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
|
||||||
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
||||||
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
|
||||||
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);
|
||||||
|
|
||||||
// Test request with non zero commit offset
|
// Test request with non zero commit offset
|
||||||
ctx.setActiveStatusForTest(true);
|
ctx.setActiveStatusForTest(true);
|
||||||
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
Mockito.when(fos.getPos()).thenReturn((long) 8);
|
||||||
ctx.setNextOffsetForTest(10);
|
ctx.setNextOffsetForTest(10);
|
||||||
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
|
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
|
||||||
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
||||||
@ -232,35 +232,40 @@ public void testCheckCommitLargeFileUpload() throws IOException {
|
|||||||
ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
|
ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
|
||||||
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
||||||
|
|
||||||
|
// Test commit sequential writes
|
||||||
status = ctx.checkCommitInternal(10, ch, 1, attr, false);
|
status = ctx.checkCommitInternal(10, ch, 1, attr, false);
|
||||||
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
|
||||||
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
|
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
|
||||||
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
|
||||||
|
|
||||||
|
// Test commit non-sequential writes
|
||||||
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
|
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
|
||||||
.getPendingCommitsForTest();
|
.getPendingCommitsForTest();
|
||||||
Assert.assertTrue(commits.size() == 0);
|
Assert.assertTrue(commits.size() == 1);
|
||||||
ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
|
ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
|
||||||
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
|
||||||
Assert.assertTrue(commits.size() == 0);
|
Assert.assertTrue(commits.size() == 1);
|
||||||
|
|
||||||
// Test request with zero commit offset
|
// Test request with zero commit offset
|
||||||
commits.remove(new Long(11));
|
commits.remove(new Long(10));
|
||||||
// There is one pending write [5,10]
|
// There is one pending write [10,15]
|
||||||
ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
|
ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
|
||||||
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC);
|
|
||||||
|
|
||||||
Mockito.when(fos.getPos()).thenReturn((long) 6);
|
|
||||||
ret = ctx.checkCommitInternal(8, ch, 1, attr, false);
|
|
||||||
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
|
||||||
Assert.assertTrue(commits.size() == 1);
|
|
||||||
long key = commits.firstKey();
|
ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
|
||||||
Assert.assertTrue(key == 8);
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
|
||||||
|
Assert.assertTrue(commits.size() == 2);
|
||||||
|
|
||||||
|
// Empty pending writes. nextOffset=10, flushed pos=8
|
||||||
|
ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
|
||||||
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
|
||||||
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
|
||||||
|
|
||||||
// Empty pending writes
|
// Empty pending writes
|
||||||
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
|
ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
|
||||||
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
|
||||||
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -286,6 +291,7 @@ public void testCheckCommitAixCompatMode() throws IOException {
|
|||||||
ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
|
ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
|
||||||
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
||||||
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
||||||
|
ctx.setNextOffsetForTest((long)10);
|
||||||
status = ctx.checkCommitInternal(5, null, 1, attr, false);
|
status = ctx.checkCommitInternal(5, null, 1, attr, false);
|
||||||
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
||||||
}
|
}
|
||||||
@ -317,7 +323,7 @@ public void testCheckCommitFromRead() throws IOException {
|
|||||||
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
|
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
|
||||||
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
||||||
|
|
||||||
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
|
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
|
||||||
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
||||||
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
||||||
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
|
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
|
||||||
@ -326,6 +332,7 @@ public void testCheckCommitFromRead() throws IOException {
|
|||||||
// Test request with non zero commit offset
|
// Test request with non zero commit offset
|
||||||
ctx.setActiveStatusForTest(true);
|
ctx.setActiveStatusForTest(true);
|
||||||
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
||||||
|
ctx.setNextOffsetForTest((long)10);
|
||||||
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
|
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
|
||||||
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
|
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
|
||||||
// Do_SYNC state will be updated to FINISHED after data sync
|
// Do_SYNC state will be updated to FINISHED after data sync
|
||||||
@ -355,7 +362,7 @@ public void testCheckCommitFromRead() throws IOException {
|
|||||||
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
|
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
|
||||||
|
|
||||||
// Empty pending writes
|
// Empty pending writes
|
||||||
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
|
ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
|
||||||
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
||||||
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
|
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
|
||||||
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
||||||
@ -386,7 +393,7 @@ public void testCheckCommitFromReadLargeFileUpload() throws IOException {
|
|||||||
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
|
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
|
||||||
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
||||||
|
|
||||||
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
|
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
|
||||||
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
||||||
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
||||||
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
|
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
|
||||||
@ -394,7 +401,8 @@ public void testCheckCommitFromReadLargeFileUpload() throws IOException {
|
|||||||
|
|
||||||
// Test request with non zero commit offset
|
// Test request with non zero commit offset
|
||||||
ctx.setActiveStatusForTest(true);
|
ctx.setActiveStatusForTest(true);
|
||||||
Mockito.when(fos.getPos()).thenReturn((long) 10);
|
Mockito.when(fos.getPos()).thenReturn((long) 6);
|
||||||
|
ctx.setNextOffsetForTest((long)10);
|
||||||
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
|
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
|
||||||
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
|
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
|
||||||
// Do_SYNC state will be updated to FINISHED after data sync
|
// Do_SYNC state will be updated to FINISHED after data sync
|
||||||
@ -402,32 +410,34 @@ public void testCheckCommitFromReadLargeFileUpload() throws IOException {
|
|||||||
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
|
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
|
||||||
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));
|
||||||
|
|
||||||
status = ctx.checkCommitInternal(10, ch, 1, attr, true);
|
// Test request with sequential writes
|
||||||
assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
|
status = ctx.checkCommitInternal(9, ch, 1, attr, true);
|
||||||
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
|
assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
|
||||||
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
|
ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
|
||||||
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));
|
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
|
||||||
|
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));
|
||||||
|
|
||||||
|
// Test request with non-sequential writes
|
||||||
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
|
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
|
||||||
.getPendingCommitsForTest();
|
.getPendingCommitsForTest();
|
||||||
assertTrue(commits.size() == 0);
|
assertTrue(commits.size() == 0);
|
||||||
ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
|
ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
|
||||||
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
|
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
|
||||||
assertEquals(0, commits.size()); // commit triggered by read doesn't wait
|
assertEquals(0, commits.size()); // commit triggered by read doesn't wait
|
||||||
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 11));
|
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));
|
||||||
|
|
||||||
// Test request with zero commit offset
|
// Test request with zero commit offset
|
||||||
// There is one pending write [5,10]
|
// There is one pending write [10,15]
|
||||||
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
||||||
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
|
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
|
||||||
assertEquals(0, commits.size());
|
assertEquals(0, commits.size());
|
||||||
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
|
||||||
|
|
||||||
// Empty pending writes
|
// Empty pending writes
|
||||||
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
|
ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
|
||||||
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
|
||||||
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
|
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
|
||||||
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
|
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
|
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
|
||||||
@ -629,4 +639,33 @@ securityHandler, new InetSocketAddress("localhost", config.getInt(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCheckSequential() throws IOException {
|
||||||
|
DFSClient dfsClient = Mockito.mock(DFSClient.class);
|
||||||
|
Nfs3FileAttributes attr = new Nfs3FileAttributes();
|
||||||
|
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
|
||||||
|
Mockito.when(fos.getPos()).thenReturn((long) 0);
|
||||||
|
NfsConfiguration config = new NfsConfiguration();
|
||||||
|
|
||||||
|
config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
|
||||||
|
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
|
||||||
|
new ShellBasedIdMapping(config), false, config);
|
||||||
|
|
||||||
|
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
|
||||||
|
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
||||||
|
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
|
||||||
|
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
||||||
|
ctx.getPendingWritesForTest().put(new OffsetRange(20, 25),
|
||||||
|
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
|
||||||
|
|
||||||
|
assertTrue(!ctx.checkSequential(5, 4));
|
||||||
|
assertTrue(ctx.checkSequential(9, 5));
|
||||||
|
assertTrue(ctx.checkSequential(10, 5));
|
||||||
|
assertTrue(ctx.checkSequential(14, 5));
|
||||||
|
assertTrue(!ctx.checkSequential(15, 5));
|
||||||
|
assertTrue(!ctx.checkSequential(20, 5));
|
||||||
|
assertTrue(!ctx.checkSequential(25, 5));
|
||||||
|
assertTrue(!ctx.checkSequential(999, 5));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -410,6 +410,9 @@ Release 2.7.0 - UNRELEASED
|
|||||||
HDFS-7366. BlockInfo should take replication as an short in the constructor.
|
HDFS-7366. BlockInfo should take replication as an short in the constructor.
|
||||||
(Li Lu via wheat9)
|
(Li Lu via wheat9)
|
||||||
|
|
||||||
|
HDFS-7387. NFS may only do partial commit due to a race between COMMIT and write
|
||||||
|
(brandonli)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-15
|
Release 2.6.0 - 2014-11-15
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
Loading…
Reference in New Issue
Block a user