diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index cc773ab777..1896e15d27 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -555,6 +555,7 @@
**/azurebfs/ITestAzureBlobFileSystemListStatus.java
**/azurebfs/extensions/ITestAbfsDelegationTokens.java
**/azurebfs/ITestSmallWriteOptimization.java
+ **/azurebfs/services/ITestReadBufferManager.java
@@ -595,6 +596,7 @@
**/azurebfs/ITestAzureBlobFileSystemListStatus.java
**/azurebfs/extensions/ITestAbfsDelegationTokens.java
**/azurebfs/ITestSmallWriteOptimization.java
+ **/azurebfs/services/ITestReadBufferManager.java
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index c98819cadc..7033ae9a4a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -693,9 +693,10 @@ public boolean seekToNewSource(long l) throws IOException {
@Override
public synchronized void close() throws IOException {
+ LOG.debug("Closing {}", this);
closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner
- LOG.debug("Closing {}", this);
+ ReadBufferManager.getBufferManager().purgeBuffersForStream(this);
}
/**
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index db4ec941c5..456ca077ca 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -24,7 +24,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
@@ -456,18 +458,23 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i
buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead);
}
synchronized (this) {
- inProgressList.remove(buffer);
- if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
- buffer.setStatus(ReadBufferStatus.AVAILABLE);
- buffer.setLength(bytesActuallyRead);
- } else {
- freeList.push(buffer.getBufferindex());
- // buffer will be deleted as per the eviction policy.
+ // If this buffer has already been purged during
+ // close of InputStream then we don't update the lists.
+ if (inProgressList.contains(buffer)) {
+ inProgressList.remove(buffer);
+ if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+ buffer.setStatus(ReadBufferStatus.AVAILABLE);
+ buffer.setLength(bytesActuallyRead);
+ } else {
+ freeList.push(buffer.getBufferindex());
+ // buffer will be deleted as per the eviction policy.
+ }
+ // completed list also contains FAILED read buffers
+ // for sending exception message to clients.
+ buffer.setStatus(result);
+ buffer.setTimeStamp(currentTimeMillis());
+ completedReadList.add(buffer);
}
-
- buffer.setStatus(result);
- buffer.setTimeStamp(currentTimeMillis());
- completedReadList.add(buffer);
}
//outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
@@ -502,11 +509,67 @@ int getCompletedReadListSize() {
return completedReadList.size();
}
+ @VisibleForTesting
+ public synchronized List getCompletedReadListCopy() {
+ return new ArrayList<>(completedReadList);
+ }
+
+ @VisibleForTesting
+ public synchronized List getFreeListCopy() {
+ return new ArrayList<>(freeList);
+ }
+
+ @VisibleForTesting
+ public synchronized List getReadAheadQueueCopy() {
+ return new ArrayList<>(readAheadQueue);
+ }
+
+ @VisibleForTesting
+ public synchronized List getInProgressCopiedList() {
+ return new ArrayList<>(inProgressList);
+ }
+
@VisibleForTesting
void callTryEvict() {
tryEvict();
}
+
+ /**
+ * Purging the buffers associated with an {@link AbfsInputStream}
+ * from {@link ReadBufferManager} when stream is closed.
+ * @param stream input stream.
+ */
+ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+ LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
+ readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
+ purgeList(stream, completedReadList);
+ purgeList(stream, inProgressList);
+ }
+
+ /**
+ * Method to remove buffers associated with a {@link AbfsInputStream}
+ * when its close method is called.
+ * NOTE: This method is not threadsafe and must be called inside a
+ * synchronised block. See caller.
+ * @param stream associated input stream.
+ * @param list list of buffers like {@link this#completedReadList}
+ * or {@link this#inProgressList}.
+ */
+ private void purgeList(AbfsInputStream stream, LinkedList list) {
+ for (Iterator it = list.iterator(); it.hasNext();) {
+ ReadBuffer readBuffer = it.next();
+ if (readBuffer.getStream() == stream) {
+ it.remove();
+ // As failed ReadBuffers (bufferIndex = -1) are already pushed to free
+ // list in doneReading method, we will skip adding those here again.
+ if (readBuffer.getBufferindex() != -1) {
+ freeList.push(readBuffer.getBufferindex());
+ }
+ }
+ }
+ }
+
/**
* Test method that can clean up the current state of readAhead buffers and
* the lists. Will also trigger a fresh init.
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
new file mode 100644
index 0000000000..705cc2530d
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.io.IOUtils;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_BLOCK_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
+
+ public ITestReadBufferManager() throws Exception {
+ }
+
+ @Test
+ public void testPurgeBufferManagerForParallelStreams() throws Exception {
+ describe("Testing purging of buffers from ReadBufferManager for "
+ + "parallel input streams");
+ final int numBuffers = 16;
+ final LinkedList freeList = new LinkedList<>();
+ for (int i=0; i < numBuffers; i++) {
+ freeList.add(i);
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(4);
+ AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+ try {
+ for (int i = 0; i < 4; i++) {
+ final String fileName = methodName.getMethodName() + i;
+ executorService.submit((Callable) () -> {
+ byte[] fileContent = getRandomBytesArray(ONE_MB);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ try (FSDataInputStream iStream = fs.open(testFilePath)) {
+ iStream.read();
+ }
+ return null;
+ });
+ }
+ } finally {
+ executorService.shutdown();
+ }
+
+ ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+ assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
+ assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
+ assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
+ Assertions.assertThat(bufferManager.getFreeListCopy())
+ .describedAs("After closing all streams free list contents should match with " + freeList)
+ .hasSize(numBuffers)
+ .containsExactlyInAnyOrderElementsOf(freeList);
+
+ }
+
+ private void assertListEmpty(String listName, List list) {
+ Assertions.assertThat(list)
+ .describedAs("After closing all streams %s should be empty", listName)
+ .hasSize(0);
+ }
+
+ @Test
+ public void testPurgeBufferManagerForSequentialStream() throws Exception {
+ describe("Testing purging of buffers in ReadBufferManager for "
+ + "sequential input streams");
+ AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+ final String fileName = methodName.getMethodName();
+ byte[] fileContent = getRandomBytesArray(ONE_MB);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+
+ AbfsInputStream iStream1 = null;
+ // stream1 will be closed right away.
+ try {
+ iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
+ // Just reading one byte will trigger all read ahead calls.
+ iStream1.read();
+ } finally {
+ IOUtils.closeStream(iStream1);
+ }
+ ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+ AbfsInputStream iStream2 = null;
+ try {
+ iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
+ iStream2.read();
+ // After closing stream1, none of the buffers associated with stream1 should be present.
+ assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1);
+ assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1);
+ assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
+ } finally {
+ // closing the stream later.
+ IOUtils.closeStream(iStream2);
+ }
+ // After closing stream2, none of the buffers associated with stream2 should be present.
+ assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2);
+ assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2);
+ assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);
+
+ // After closing both the streams, all lists should be empty.
+ assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
+ assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
+ assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
+
+ }
+
+
+ private void assertListDoesnotContainBuffersForIstream(List list,
+ AbfsInputStream inputStream) {
+ for (ReadBuffer buffer : list) {
+ Assertions.assertThat(buffer.getStream())
+ .describedAs("Buffers associated with closed input streams shouldn't be present")
+ .isNotEqualTo(inputStream);
+ }
+ }
+
+ private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception {
+ Configuration conf = getRawConfiguration();
+ conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8);
+ conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
+ conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE);
+ return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ }
+
+ protected byte[] getRandomBytesArray(int length) {
+ final byte[] b = new byte[length];
+ new Random().nextBytes(b);
+ return b;
+ }
+
+ protected Path createFileWithContent(FileSystem fs, String fileName,
+ byte[] fileContent) throws IOException {
+ Path testFilePath = path(fileName);
+ try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+ oStream.write(fileContent);
+ oStream.flush();
+ }
+ return testFilePath;
+ }
+}