From f24b73e5f3ac640f491231f02d9d8afaf1855b5c Mon Sep 17 00:00:00 2001 From: Pranav Saxena <108325433+saxenapranav@users.noreply.github.com> Date: Wed, 20 Sep 2023 01:54:36 -0700 Subject: [PATCH] HADOOP-18873. ABFS: AbfsOutputStream doesnt close DataBlocks object. (#6010) AbfsOutputStream to close the dataBlock object created for the upload. Contributed By: Pranav Saxena --- .../apache/hadoop/fs/store/DataBlocks.java | 4 +- .../fs/azurebfs/AzureBlobFileSystem.java | 2 +- .../fs/azurebfs/AzureBlobFileSystemStore.java | 7 ++- .../azurebfs/services/AbfsOutputStream.java | 2 +- .../ITestAzureBlobFileSystemAppend.java | 59 +++++++++++++++++++ 5 files changed, 69 insertions(+), 5 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java index c70d0ee91e..0ae9ee6378 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java @@ -329,7 +329,7 @@ public String getKeyToBufferDir() { */ public static abstract class DataBlock implements Closeable { - enum DestState {Writing, Upload, Closed} + public enum DestState {Writing, Upload, Closed} private volatile DestState state = Writing; private final long index; @@ -375,7 +375,7 @@ protected final void verifyState(DestState expected) * * @return the current state. */ - final DestState getState() { + public final DestState getState() { return state; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 426ad8ca1e..8f7fbc5702 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -330,7 +330,7 @@ public FSDataOutputStream create(final Path f, try { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.CREATE, overwrite, tracingHeaderFormat, listener); - OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite, + OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, overwrite, permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()), tracingContext); statIncrement(FILES_CREATED); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 49dff33608..ff37ee5ee9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -707,7 +707,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount()) .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue()) .withLease(lease) - .withBlockFactory(blockFactory) + .withBlockFactory(getBlockFactory()) .withBlockOutputActiveBlocks(blockOutputActiveBlocks) .withClient(client) .withPosition(position) @@ -1940,6 +1940,11 @@ void setClient(AbfsClient client) { this.client = client; } + @VisibleForTesting + DataBlocks.BlockFactory getBlockFactory() { + return blockFactory; + } + @VisibleForTesting void setNamespaceEnabled(Trilean isNamespaceEnabled){ this.isNamespaceEnabled = isNamespaceEnabled; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 4268dc3f91..89b0fe2040 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -344,7 +344,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload, outputStreamStatistics.uploadSuccessful(bytesLength); return null; } finally { - IOUtils.close(blockUploadData); + IOUtils.close(blockUploadData, blockToUpload); } }); writeOperations.add(new WriteOperation(job, offset, bytesLength)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index dbe4b42a67..7d182f936b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -20,15 +20,31 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; +import java.util.HashSet; import java.util.Random; +import java.util.Set; +import org.assertj.core.api.Assertions; import org.junit.Test; +import org.mockito.Mockito; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.store.BlockUploadStatistics; +import org.apache.hadoop.fs.store.DataBlocks; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing; /** * Test append operations. @@ -90,4 +106,47 @@ public void testTracingForAppend() throws IOException { fs.getFileSystemId(), FSOperationType.APPEND, false, 0)); fs.append(testPath, 10); } + + @Test + public void testCloseOfDataBlockOnAppendComplete() throws Exception { + Set blockBufferTypes = new HashSet<>(); + blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK); + blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER); + blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY); + for (String blockBufferType : blockBufferTypes) { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(DATA_BLOCKS_BUFFER, blockBufferType); + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.newInstance(configuration)); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1]; + Mockito.doAnswer(getBlobFactoryInvocation -> { + DataBlocks.BlockFactory factory = Mockito.spy( + (DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod()); + Mockito.doAnswer(factoryCreateInvocation -> { + dataBlock[0] = Mockito.spy( + (DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod()); + return dataBlock[0]; + }) + .when(factory) + .create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any( + BlockUploadStatistics.class)); + return factory; + }).when(store).getBlockFactory(); + try (OutputStream os = fs.create( + new Path(getMethodName() + "_" + blockBufferType))) { + os.write(new byte[1]); + Assertions.assertThat(dataBlock[0].getState()) + .describedAs( + "On write of data in outputStream, state should become Writing") + .isEqualTo(Writing); + os.close(); + Mockito.verify(dataBlock[0], Mockito.times(1)).close(); + Assertions.assertThat(dataBlock[0].getState()) + .describedAs("On close of outputStream, state should become Closed") + .isEqualTo(Closed); + } + } + } }