HADOOP-18873. ABFS: AbfsOutputStream doesnt close DataBlocks object. (#6010)

AbfsOutputStream to close the dataBlock object created for the upload.

Contributed By: Pranav Saxena
This commit is contained in:
Pranav Saxena 2023-09-20 01:54:36 -07:00 committed by GitHub
parent 39f36d9071
commit f24b73e5f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 69 additions and 5 deletions

View File

@ -329,7 +329,7 @@ public String getKeyToBufferDir() {
*/
public static abstract class DataBlock implements Closeable {
enum DestState {Writing, Upload, Closed}
public enum DestState {Writing, Upload, Closed}
private volatile DestState state = Writing;
private final long index;
@ -375,7 +375,7 @@ protected final void verifyState(DestState expected)
*
* @return the current state.
*/
final DestState getState() {
public final DestState getState() {
return state;
}

View File

@ -330,7 +330,7 @@ public FSDataOutputStream create(final Path f,
try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE, overwrite, tracingHeaderFormat, listener);
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, overwrite,
permission == null ? FsPermission.getFileDefault() : permission,
FsPermission.getUMask(getConf()), tracingContext);
statIncrement(FILES_CREATED);

View File

@ -707,7 +707,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
.withBlockFactory(blockFactory)
.withBlockFactory(getBlockFactory())
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withClient(client)
.withPosition(position)
@ -1940,6 +1940,11 @@ void setClient(AbfsClient client) {
this.client = client;
}
@VisibleForTesting
DataBlocks.BlockFactory getBlockFactory() {
return blockFactory;
}
@VisibleForTesting
void setNamespaceEnabled(Trilean isNamespaceEnabled){
this.isNamespaceEnabled = isNamespaceEnabled;

View File

@ -344,7 +344,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
outputStreamStatistics.uploadSuccessful(bytesLength);
return null;
} finally {
IOUtils.close(blockUploadData);
IOUtils.close(blockUploadData, blockToUpload);
}
});
writeOperations.add(new WriteOperation(job, offset, bytesLength));

View File

@ -20,15 +20,31 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.store.BlockUploadStatistics;
import org.apache.hadoop.fs.store.DataBlocks;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER;
import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
/**
* Test append operations.
@ -90,4 +106,47 @@ public void testTracingForAppend() throws IOException {
fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
fs.append(testPath, 10);
}
@Test
public void testCloseOfDataBlockOnAppendComplete() throws Exception {
Set<String> blockBufferTypes = new HashSet<>();
blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK);
blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER);
blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY);
for (String blockBufferType : blockBufferTypes) {
Configuration configuration = new Configuration(getRawConfiguration());
configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(configuration));
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(store).when(fs).getAbfsStore();
DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
Mockito.doAnswer(getBlobFactoryInvocation -> {
DataBlocks.BlockFactory factory = Mockito.spy(
(DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod());
Mockito.doAnswer(factoryCreateInvocation -> {
dataBlock[0] = Mockito.spy(
(DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod());
return dataBlock[0];
})
.when(factory)
.create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any(
BlockUploadStatistics.class));
return factory;
}).when(store).getBlockFactory();
try (OutputStream os = fs.create(
new Path(getMethodName() + "_" + blockBufferType))) {
os.write(new byte[1]);
Assertions.assertThat(dataBlock[0].getState())
.describedAs(
"On write of data in outputStream, state should become Writing")
.isEqualTo(Writing);
os.close();
Mockito.verify(dataBlock[0], Mockito.times(1)).close();
Assertions.assertThat(dataBlock[0].getState())
.describedAs("On close of outputStream, state should become Closed")
.isEqualTo(Closed);
}
}
}
}