HADOOP-18546. Followup: ITestReadBufferManager fix (#5198)

This is a followup to the original HADOOP-18546
patch; cherry-picks of that should include this
or follow up with it.

Removes risk of race conditions in assertions
of ITestReadBufferManager on the state of the in-progress
and completed queues by removing assertions brittle
to race conditions in scheduling/network IO

* Waits for all the executor pool shutdown to complete before
  making any assertions
* Assertions that there are no in progress reads MUST be
  cut as there may be some and they won't be cancelled.
* Assertions that the completed list is without buffers
  of a closed stream are brittle because if there was
  an in progress stream which completed after stream.close()
  then it will end up in the list.

Contributed by Steve Loughran
This commit is contained in:
Steve Loughran 2022-12-09 13:47:11 +00:00 committed by GitHub
parent e76616f690
commit 0a7dfcc332
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -25,6 +25,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@ -74,17 +75,14 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception {
}
} finally {
executorService.shutdown();
// wait for all tasks to finish
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
// verify there is no work in progress or the readahead queue.
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
Assertions.assertThat(bufferManager.getFreeListCopy())
.describedAs("After closing all streams free list contents should match with " + freeList)
.hasSize(numBuffers)
.containsExactlyInAnyOrderElementsOf(freeList);
}
private void assertListEmpty(String listName, List<ReadBuffer> list) {
@ -116,22 +114,18 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception {
try {
iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
iStream2.read();
// After closing stream1, none of the buffers associated with stream1 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1);
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1);
// After closing stream1, no queued buffers of stream1 should be present
// assertions can't be made about the state of the other lists as it is
// too prone to race conditions.
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
} finally {
// closing the stream later.
IOUtils.closeStream(iStream2);
}
// After closing stream2, none of the buffers associated with stream2 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2);
assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2);
// After closing stream2, no queued buffers of stream2 should be present.
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);
// After closing both the streams, all lists should be empty.
assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
// After closing both the streams, read queue should be empty.
assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
}