HADOOP-17195. ABFS: OutOfMemory error while uploading huge files (#3446)

Addresses the problem of processes running out of memory when
there are many ABFS output streams queuing data to upload,
especially when the network upload bandwidth is less than the rate
data is generated.

ABFS Output streams now buffer their blocks of data to
"disk", "bytebuffer" or "array", as set in
"fs.azure.data.blocks.buffer"

When buffering via disk, the location for temporary storage
is set in "fs.azure.buffer.dir"

For safe scaling: use "disk" (default); for performance, when
confident that upload bandwidth will never be a bottleneck,
experiment with the memory options.

The number of blocks a single stream can have queued for uploading
is set in "fs.azure.block.upload.active.blocks".
The default value is 20.

Contributed by Mehakmeet Singh.
This commit is contained in:
Mehakmeet Singh 2021-09-22 15:49:16 +05:30 committed by GitHub
parent dd30db78e7
commit 8e5620cd9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 2230 additions and 280 deletions

View File

@ -461,4 +461,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Default value for IOStatistics logging level. */
public static final String IOSTATISTICS_LOGGING_LEVEL_DEFAULT
= IOSTATISTICS_LOGGING_LEVEL_DEBUG;
/**
* default hadoop temp dir on local system: {@value}.
*/
public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";
}

View File

@ -358,6 +358,18 @@ public final class StreamStatisticNames {
public static final String REMOTE_BYTES_READ
= "remote_bytes_read";
/**
* Total number of Data blocks allocated by an outputStream.
*/
public static final String BLOCKS_ALLOCATED
= "blocks_allocated";
/**
* Total number of Data blocks released by an outputStream.
*/
public static final String BLOCKS_RELEASED
= "blocks_released";
private StreamStatisticNames() {
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.store;
public interface BlockUploadStatistics {
/**
* A block has been allocated.
*/
void blockAllocated();
/**
* A block has been released.
*/
void blockReleased();
}

View File

@ -2291,6 +2291,13 @@
</description>
</property>
<property>
<name>fs.azure.buffer.dir</name>
<value>${hadoop.tmp.dir}/abfs</value>
<description>Directory path for buffer files needed to upload data blocks
in AbfsOutputStream.</description>
</property>
<property>
<name>fs.AbstractFileSystem.gs.impl</name>
<value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.store;
import java.io.IOException;
import java.util.Random;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* UTs to test {@link DataBlocks} functionalities.
*/
public class TestDataBlocks {
private final Configuration configuration = new Configuration();
private static final int ONE_KB = 1024;
private static final Logger LOG =
LoggerFactory.getLogger(TestDataBlocks.class);
/**
* Test to verify different DataBlocks factories, different operations.
*/
@Test
public void testDataBlocksFactory() throws Exception {
testCreateFactory(DATA_BLOCKS_BUFFER_DISK);
testCreateFactory(DATA_BLOCKS_BUFFER_ARRAY);
testCreateFactory(DATA_BLOCKS_BYTEBUFFER);
}
/**
* Verify creation of a data block factory and its operations.
*
* @param nameOfFactory Name of the DataBlock factory to be created.
* @throws IOException Throw IOE in case of failure while creating a block.
*/
public void testCreateFactory(String nameOfFactory) throws Exception {
LOG.info("Testing: {}", nameOfFactory);
DataBlocks.BlockFactory blockFactory =
DataBlocks.createFactory("Dir", configuration, nameOfFactory);
DataBlocks.DataBlock dataBlock = blockFactory.create(0, ONE_KB, null);
assertWriteBlock(dataBlock);
assertToByteArray(dataBlock);
assertCloseBlock(dataBlock);
}
/**
* Verify Writing of a dataBlock.
*
* @param dataBlock DataBlock to be tested.
* @throws IOException Throw Exception in case of failures.
*/
private void assertWriteBlock(DataBlocks.DataBlock dataBlock)
throws IOException {
byte[] oneKbBuff = new byte[ONE_KB];
new Random().nextBytes(oneKbBuff);
dataBlock.write(oneKbBuff, 0, ONE_KB);
// Verify DataBlock state is at Writing.
dataBlock.verifyState(DataBlocks.DataBlock.DestState.Writing);
// Verify that the DataBlock has data written.
assertTrue("Expected Data block to have data", dataBlock.hasData());
// Verify the size of data.
assertEquals("Mismatch in data size in block", ONE_KB,
dataBlock.dataSize());
// Verify that no capacity is left in the data block to write more.
assertFalse("Expected the data block to have no capacity to write 1 byte "
+ "of data", dataBlock.hasCapacity(1));
}
/**
* Verify the Conversion of Data blocks into byte[].
*
* @param dataBlock data block to be tested.
* @throws Exception Throw Exception in case of failures.
*/
private void assertToByteArray(DataBlocks.DataBlock dataBlock)
throws Exception {
DataBlocks.BlockUploadData blockUploadData = dataBlock.startUpload();
// Verify that the current state is in upload.
dataBlock.verifyState(DataBlocks.DataBlock.DestState.Upload);
// Convert the DataBlock upload to byteArray.
byte[] bytesWritten = blockUploadData.toByteArray();
// Verify that we can call toByteArray() more than once and gives the
// same byte[].
assertEquals("Mismatch in byteArray provided by toByteArray() the second "
+ "time", bytesWritten, blockUploadData.toByteArray());
IOUtils.close(blockUploadData);
// Verify that after closing blockUploadData, we can't call toByteArray().
LambdaTestUtils.intercept(IllegalStateException.class,
"Block is closed",
"Expected to throw IllegalStateException.java after closing "
+ "blockUploadData and trying to call toByteArray()",
() -> {
blockUploadData.toByteArray();
});
}
/**
* Verify the close() of data blocks.
*
* @param dataBlock data block to be tested.
* @throws IOException Throw Exception in case of failures.
*/
private void assertCloseBlock(DataBlocks.DataBlock dataBlock)
throws IOException {
dataBlock.close();
// Verify that the current state is in Closed.
dataBlock.verifyState(DataBlocks.DataBlock.DestState.Closed);
}
}

View File

@ -90,6 +90,7 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
@ -101,6 +102,11 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
@ -125,6 +131,13 @@ public class AzureBlobFileSystem extends FileSystem
private TracingHeaderFormat tracingHeaderFormat;
private Listener listener;
/** Name of blockFactory to be used by AbfsOutputStream. */
private String blockOutputBuffer;
/** BlockFactory instance to be used. */
private DataBlocks.BlockFactory blockFactory;
/** Maximum Active blocks per OutputStream. */
private int blockOutputActiveBlocks;
@Override
public void initialize(URI uri, Configuration configuration)
throws IOException {
@ -136,8 +149,33 @@ public void initialize(URI uri, Configuration configuration)
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
abfsCounters = new AbfsCountersImpl(uri);
this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(),
configuration, abfsCounters);
// name of the blockFactory to be used.
this.blockOutputBuffer = configuration.getTrimmed(DATA_BLOCKS_BUFFER,
DATA_BLOCKS_BUFFER_DEFAULT);
// blockFactory used for this FS instance.
this.blockFactory =
DataBlocks.createFactory(FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR,
configuration, blockOutputBuffer);
this.blockOutputActiveBlocks =
configuration.getInt(FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS,
BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT);
if (blockOutputActiveBlocks < 1) {
blockOutputActiveBlocks = 1;
}
// AzureBlobFileSystemStore with params in builder.
AzureBlobFileSystemStore.AzureBlobFileSystemStoreBuilder
systemStoreBuilder =
new AzureBlobFileSystemStore.AzureBlobFileSystemStoreBuilder()
.withUri(uri)
.withSecureScheme(this.isSecureScheme())
.withConfiguration(configuration)
.withAbfsCounters(abfsCounters)
.withBlockFactory(blockFactory)
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.build();
this.abfsStore = new AzureBlobFileSystemStore(systemStoreBuilder);
LOG.trace("AzureBlobFileSystemStore init complete");
final AbfsConfiguration abfsConfiguration = abfsStore

View File

@ -51,6 +51,8 @@
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
@ -119,8 +121,12 @@
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.http.client.utils.URIBuilder;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
@ -169,10 +175,23 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
*/
private Set<String> appendBlobDirSet;
public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
Configuration configuration,
AbfsCounters abfsCounters) throws IOException {
this.uri = uri;
/** BlockFactory being used by this instance.*/
private DataBlocks.BlockFactory blockFactory;
/** Number of active data blocks per AbfsOutputStream */
private int blockOutputActiveBlocks;
/** Bounded ThreadPool for this instance. */
private ExecutorService boundedThreadPool;
/**
* FileSystem Store for {@link AzureBlobFileSystem} for Abfs operations.
* Built using the {@link AzureBlobFileSystemStoreBuilder} with parameters
* required.
* @param abfsStoreBuilder Builder for AzureBlobFileSystemStore.
* @throws IOException Throw IOE in case of failure during constructing.
*/
public AzureBlobFileSystemStore(
AzureBlobFileSystemStoreBuilder abfsStoreBuilder) throws IOException {
this.uri = abfsStoreBuilder.uri;
String[] authorityParts = authorityParts(uri);
final String fileSystemName = authorityParts[0];
final String accountName = authorityParts[1];
@ -180,7 +199,7 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
try {
this.abfsConfiguration = new AbfsConfiguration(configuration, accountName);
this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration, accountName);
} catch (IllegalAccessException exception) {
throw new FileSystemOperationUnhandledException(exception);
}
@ -210,16 +229,16 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
updateInfiniteLeaseDirs();
this.authType = abfsConfiguration.getAuthType(accountName);
boolean usingOauth = (authType == AuthType.OAuth);
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : abfsStoreBuilder.isSecureScheme;
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
this.abfsCounters = abfsCounters;
this.abfsCounters = abfsStoreBuilder.abfsCounters;
initializeClient(uri, fileSystemName, accountName, useHttps);
final Class<? extends IdentityTransformerInterface> identityTransformerClass =
configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
abfsStoreBuilder.configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
IdentityTransformerInterface.class);
try {
this.identityTransformer =
identityTransformerClass.getConstructor(Configuration.class).newInstance(configuration);
identityTransformerClass.getConstructor(Configuration.class).newInstance(abfsStoreBuilder.configuration);
} catch (IllegalAccessException | InstantiationException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException e) {
throw new IOException(e);
}
@ -233,6 +252,13 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
this.appendBlobDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA)));
}
this.blockFactory = abfsStoreBuilder.blockFactory;
this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
abfsConfiguration.getWriteMaxConcurrentRequestCount(),
abfsConfiguration.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-bounded");
}
/**
@ -269,6 +295,10 @@ public void close() throws IOException {
}
try {
Futures.allAsList(futures).get();
// shutdown the threadPool and set it to null.
HadoopExecutors.shutdown(boundedThreadPool, LOG,
30, TimeUnit.SECONDS);
boundedThreadPool = null;
} catch (InterruptedException e) {
LOG.error("Interrupted freeing leases", e);
Thread.currentThread().interrupt();
@ -495,7 +525,7 @@ public void deleteFilesystem(TracingContext tracingContext)
public OutputStream createFile(final Path path,
final FileSystem.Statistics statistics, final boolean overwrite,
final FsPermission permission, final FsPermission umask,
TracingContext tracingContext) throws AzureBlobFileSystemException {
TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
@ -546,12 +576,14 @@ public OutputStream createFile(final Path path,
AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
return new AbfsOutputStream(
client,
statistics,
relativePath,
0,
populateAbfsOutputStreamContext(isAppendBlob, lease),
tracingContext);
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
client,
statistics,
relativePath,
0,
tracingContext));
}
}
@ -625,8 +657,29 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
return op;
}
private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob,
AbfsLease lease) {
/**
* Method to populate AbfsOutputStreamContext with different parameters to
* be used to construct {@link AbfsOutputStream}.
*
* @param isAppendBlob is Append blob support enabled?
* @param lease instance of AbfsLease for this AbfsOutputStream.
* @param client AbfsClient.
* @param statistics FileSystem statistics.
* @param path Path for AbfsOutputStream.
* @param position Position or offset of the file being opened, set to 0
* when creating a new file, but needs to be set for APPEND
* calls on the same file.
* @param tracingContext instance of TracingContext for this AbfsOutputStream.
* @return AbfsOutputStreamContext instance with the desired parameters.
*/
private AbfsOutputStreamContext populateAbfsOutputStreamContext(
boolean isAppendBlob,
AbfsLease lease,
AbfsClient client,
FileSystem.Statistics statistics,
String path,
long position,
TracingContext tracingContext) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
@ -641,6 +694,15 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppend
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
.withBlockFactory(blockFactory)
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withClient(client)
.withPosition(position)
.withFsStatistics(statistics)
.withPath(path)
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
.withTracingContext(tracingContext)
.build();
}
@ -732,7 +794,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext(
public OutputStream openFileForWrite(final Path path,
final FileSystem.Statistics statistics, final boolean overwrite,
TracingContext tracingContext) throws AzureBlobFileSystemException {
TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
LOG.debug("openFileForWrite filesystem: {} path: {} overwrite: {}",
client.getFileSystem(),
@ -768,12 +830,14 @@ public OutputStream openFileForWrite(final Path path,
AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
return new AbfsOutputStream(
client,
statistics,
relativePath,
offset,
populateAbfsOutputStreamContext(isAppendBlob, lease),
tracingContext);
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
client,
statistics,
relativePath,
offset,
tracingContext));
}
}
@ -1721,6 +1785,57 @@ public String toString() {
}
}
/**
* A builder class for AzureBlobFileSystemStore.
*/
public static final class AzureBlobFileSystemStoreBuilder {
private URI uri;
private boolean isSecureScheme;
private Configuration configuration;
private AbfsCounters abfsCounters;
private DataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks;
public AzureBlobFileSystemStoreBuilder withUri(URI value) {
this.uri = value;
return this;
}
public AzureBlobFileSystemStoreBuilder withSecureScheme(boolean value) {
this.isSecureScheme = value;
return this;
}
public AzureBlobFileSystemStoreBuilder withConfiguration(
Configuration value) {
this.configuration = value;
return this;
}
public AzureBlobFileSystemStoreBuilder withAbfsCounters(
AbfsCounters value) {
this.abfsCounters = value;
return this;
}
public AzureBlobFileSystemStoreBuilder withBlockFactory(
DataBlocks.BlockFactory value) {
this.blockFactory = value;
return this;
}
public AzureBlobFileSystemStoreBuilder withBlockOutputActiveBlocks(
int value) {
this.blockOutputActiveBlocks = value;
return this;
}
public AzureBlobFileSystemStoreBuilder build() {
return this;
}
}
@VisibleForTesting
AbfsClient getClient() {
return this.client;

View File

@ -56,6 +56,37 @@ public final class ConfigurationKeys {
public static final String AZURE_WRITE_MAX_CONCURRENT_REQUESTS = "fs.azure.write.max.concurrent.requests";
public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
/**
* Maximum Number of blocks a single output stream can have
* active (uploading, or queued to the central FileSystem
* instance's pool of queued operations.
* This stops a single stream overloading the shared thread pool.
* {@value}
* <p>
* Default is {@link FileSystemConfigurations#BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT}
*/
public static final String FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS =
"fs.azure.block.upload.active.blocks";
/**
* Buffer directory path for uploading AbfsOutputStream data blocks.
* Value: {@value}
*/
public static final String FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR =
"fs.azure.buffer.dir";
/**
* What data block buffer to use.
* <br>
* Options include: "disk"(Default), "array", and "bytebuffer".
* <br>
* Default is {@link FileSystemConfigurations#DATA_BLOCKS_BUFFER_DEFAULT}.
* Value: {@value}
*/
public static final String DATA_BLOCKS_BUFFER =
"fs.azure.data.blocks.buffer";
/** If the data size written by Hadoop app is small, i.e. data size :
* (a) before any of HFlush/HSync call is made or
* (b) between 2 HFlush/Hsync API calls

View File

@ -115,5 +115,23 @@ public final class FileSystemConfigurations {
public static final int STREAM_ID_LEN = 12;
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
/**
* Limit of queued block upload operations before writes
* block for an OutputStream. Value: {@value}
*/
public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20;
/**
* Buffer blocks to disk.
* Capacity is limited to available disk space.
*/
public static final String DATA_BLOCKS_BUFFER_DISK = "disk";
/**
* Default buffer option: {@value}.
*/
public static final String DATA_BLOCKS_BUFFER_DEFAULT =
DATA_BLOCKS_BUFFER_DISK;
private FileSystemConfigurations() {}
}

View File

@ -20,24 +20,20 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.UUID;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
@ -47,10 +43,9 @@
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.fs.azurebfs.utils.Listener;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
@ -63,6 +58,7 @@
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_CLOSE_MODE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkState;
/**
* The BlobFsOutputStream for Rest AbfsClient.
@ -72,6 +68,12 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
private final AbfsClient client;
private final String path;
/** The position in the file being uploaded, where the next block would be
* uploaded.
* This is used in constructing the AbfsClient requests to ensure that,
* even if blocks are uploaded out of order, they are reassembled in
* correct order.
* */
private long position;
private boolean closed;
private boolean supportFlush;
@ -91,8 +93,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
private final int maxRequestsThatCanBeQueued;
private ConcurrentLinkedDeque<WriteOperation> writeOperations;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
@ -103,15 +103,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
private AbfsLease lease;
private String leaseId;
/**
* Queue storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
* blocks. After the data is sent to the service, the buffer is returned
* back to the queue
*/
private ElasticByteBufferPool byteBufferPool
= new ElasticByteBufferPool();
private final Statistics statistics;
private final AbfsOutputStreamStatistics outputStreamStatistics;
private IOStatistics ioStatistics;
@ -119,17 +110,27 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
private static final Logger LOG =
LoggerFactory.getLogger(AbfsOutputStream.class);
public AbfsOutputStream(
final AbfsClient client,
final Statistics statistics,
final String path,
final long position,
AbfsOutputStreamContext abfsOutputStreamContext,
TracingContext tracingContext) {
this.client = client;
this.statistics = statistics;
this.path = path;
this.position = position;
/** Factory for blocks. */
private final DataBlocks.BlockFactory blockFactory;
/** Current data block. Null means none currently active. */
private DataBlocks.DataBlock activeBlock;
/** Count of blocks uploaded. */
private long blockCount = 0;
/** The size of a single block. */
private final int blockSize;
/** Executor service to carry out the parallel upload requests. */
private final ListeningExecutorService executorService;
public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
throws IOException {
this.client = abfsOutputStreamContext.getClient();
this.statistics = abfsOutputStreamContext.getStatistics();
this.path = abfsOutputStreamContext.getPath();
this.position = abfsOutputStreamContext.getPosition();
this.closed = false;
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
this.disableOutputStreamFlush = abfsOutputStreamContext
@ -140,7 +141,6 @@ public AbfsOutputStream(
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
this.bufferIndex = 0;
this.numOfAppendsToServerSinceLastFlush = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
@ -157,23 +157,20 @@ public AbfsOutputStream(
this.lease = abfsOutputStreamContext.getLease();
this.leaseId = abfsOutputStreamContext.getLeaseId();
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
10L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.executorService =
MoreExecutors.listeningDecorator(abfsOutputStreamContext.getExecutorService());
this.cachedSasToken = new CachedSASToken(
abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
if (outputStreamStatistics != null) {
this.ioStatistics = outputStreamStatistics.getIOStatistics();
}
this.outputStreamId = createOutputStreamId();
this.tracingContext = new TracingContext(tracingContext);
this.tracingContext = new TracingContext(abfsOutputStreamContext.getTracingContext());
this.tracingContext.setStreamID(outputStreamId);
this.tracingContext.setOperation(FSOperationType.WRITE);
this.ioStatistics = outputStreamStatistics.getIOStatistics();
this.blockFactory = abfsOutputStreamContext.getBlockFactory();
this.blockSize = bufferSize;
// create that first block. This guarantees that an open + close sequence
// writes a 0-byte entry.
createBlockIfNeeded();
}
private String createOutputStreamId() {
@ -219,10 +216,10 @@ public void write(final int byteVal) throws IOException {
@Override
public synchronized void write(final byte[] data, final int off, final int length)
throws IOException {
// validate if data is not null and index out of bounds.
DataBlocks.validateWriteArgs(data, off, length);
maybeThrowLastError();
Preconditions.checkArgument(data != null, "null data");
if (off < 0 || length < 0 || length > data.length - off) {
throw new IndexOutOfBoundsException();
}
@ -230,29 +227,184 @@ public synchronized void write(final byte[] data, final int off, final int lengt
if (hasLease() && isLeaseFreed()) {
throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
}
DataBlocks.DataBlock block = createBlockIfNeeded();
int written = block.write(data, off, length);
int remainingCapacity = block.remainingCapacity();
int currentOffset = off;
int writableBytes = bufferSize - bufferIndex;
int numberOfBytesToWrite = length;
while (numberOfBytesToWrite > 0) {
if (writableBytes <= numberOfBytesToWrite) {
System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
bufferIndex += writableBytes;
writeCurrentBufferToService();
currentOffset += writableBytes;
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
} else {
System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
bufferIndex += numberOfBytesToWrite;
numberOfBytesToWrite = 0;
if (written < length) {
// Number of bytes to write is more than the data block capacity,
// trigger an upload and then write on the next block.
LOG.debug("writing more data than block capacity -triggering upload");
uploadCurrentBlock();
// tail recursion is mildly expensive, but given buffer sizes must be MB.
// it's unlikely to recurse very deeply.
this.write(data, off + written, length - written);
} else {
if (remainingCapacity == 0) {
// the whole buffer is done, trigger an upload
uploadCurrentBlock();
}
writableBytes = bufferSize - bufferIndex;
}
incrementWriteOps();
}
/**
* Demand create a destination block.
*
* @return the active block; null if there isn't one.
* @throws IOException on any failure to create
*/
private synchronized DataBlocks.DataBlock createBlockIfNeeded()
throws IOException {
if (activeBlock == null) {
blockCount++;
activeBlock = blockFactory
.create(blockCount, this.blockSize, outputStreamStatistics);
}
return activeBlock;
}
/**
* Start an asynchronous upload of the current block.
*
* @throws IOException Problems opening the destination for upload,
* initializing the upload, or if a previous operation has failed.
*/
private synchronized void uploadCurrentBlock() throws IOException {
checkState(hasActiveBlock(), "No active block");
LOG.debug("Writing block # {}", blockCount);
try {
uploadBlockAsync(getActiveBlock(), false, false);
} finally {
// set the block to null, so the next write will create a new block.
clearActiveBlock();
}
}
/**
* Upload a block of data.
* This will take the block.
*
* @param blockToUpload block to upload.
* @throws IOException upload failure
*/
private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
boolean isFlush, boolean isClose)
throws IOException {
if (this.isAppendBlob) {
writeAppendBlobCurrentBufferToService();
return;
}
if (!blockToUpload.hasData()) {
return;
}
numOfAppendsToServerSinceLastFlush++;
final int bytesLength = blockToUpload.dataSize();
final long offset = position;
position += bytesLength;
outputStreamStatistics.bytesToUpload(bytesLength);
outputStreamStatistics.writeCurrentBuffer();
DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload();
final Future<Void> job =
executorService.submit(() -> {
AbfsPerfTracker tracker =
client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
AppendRequestParameters.Mode
mode = APPEND_MODE;
if (isFlush & isClose) {
mode = FLUSH_CLOSE_MODE;
} else if (isFlush) {
mode = FLUSH_MODE;
}
/*
* Parameters Required for an APPEND call.
* offset(here) - refers to the position in the file.
* bytesLength - Data to be uploaded from the block.
* mode - If it's append, flush or flush_close.
* leaseId - The AbfsLeaseId for this request.
*/
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false, leaseId);
AbfsRestOperation op =
client.append(path, blockUploadData.toByteArray(), reqParams,
cachedSasToken.get(), new TracingContext(tracingContext));
cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult());
perfInfo.registerSuccess(true);
outputStreamStatistics.uploadSuccessful(bytesLength);
return null;
} finally {
IOUtils.close(blockUploadData);
}
});
writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue
shrinkWriteOperationQueue();
}
/**
* A method to set the lastError if an exception is caught.
* @param ex Exception caught.
* @throws IOException Throws the lastError.
*/
private void failureWhileSubmit(Exception ex) throws IOException {
if (ex instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex).getStatusCode()
== HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ex.getMessage());
}
}
if (ex instanceof IOException) {
lastError = (IOException) ex;
} else {
lastError = new IOException(ex);
}
throw lastError;
}
/**
* Synchronized accessor to the active block.
*
* @return the active block; null if there isn't one.
*/
private synchronized DataBlocks.DataBlock getActiveBlock() {
return activeBlock;
}
/**
* Predicate to query whether or not there is an active block.
*
* @return true if there is an active block.
*/
private synchronized boolean hasActiveBlock() {
return activeBlock != null;
}
/**
* Is there an active block and is there any data in it to upload?
*
* @return true if there is some data to upload in an active block else false.
*/
private boolean hasActiveBlockDataToUpload() {
return hasActiveBlock() && getActiveBlock().hasData();
}
/**
* Clear the active block.
*/
private void clearActiveBlock() {
if (activeBlock != null) {
LOG.debug("Clearing active block");
}
synchronized (this) {
activeBlock = null;
}
}
/**
* Increment Write Operations.
*/
@ -335,7 +487,6 @@ public synchronized void close() throws IOException {
try {
flushInternal(true);
threadExecutor.shutdown();
} catch (IOException e) {
// Problems surface in try-with-resources clauses if
// the exception thrown in a close == the one already thrown
@ -352,9 +503,8 @@ public synchronized void close() throws IOException {
bufferIndex = 0;
closed = true;
writeOperations.clear();
byteBufferPool = null;
if (!threadExecutor.isShutdown()) {
threadExecutor.shutdownNow();
if (hasActiveBlock()) {
clearActiveBlock();
}
}
LOG.debug("Closing AbfsOutputStream : {}", this);
@ -368,19 +518,22 @@ private synchronized void flushInternal(boolean isClose) throws IOException {
&& enableSmallWriteOptimization
&& (numOfAppendsToServerSinceLastFlush == 0) // there are no ongoing store writes
&& (writeOperations.size() == 0) // double checking no appends in progress
&& (bufferIndex > 0)) { // there is some data that is pending to be written
&& hasActiveBlockDataToUpload()) { // there is
// some data that is pending to be written
smallWriteOptimizedflushInternal(isClose);
return;
}
writeCurrentBufferToService();
if (hasActiveBlockDataToUpload()) {
uploadCurrentBlock();
}
flushWrittenBytesToService(isClose);
numOfAppendsToServerSinceLastFlush = 0;
}
private synchronized void smallWriteOptimizedflushInternal(boolean isClose) throws IOException {
// writeCurrentBufferToService will increment numOfAppendsToServerSinceLastFlush
writeCurrentBufferToService(true, isClose);
uploadBlockAsync(getActiveBlock(), true, isClose);
waitForAppendsToComplete();
shrinkWriteOperationQueue();
maybeThrowLastError();
@ -389,131 +542,60 @@ private synchronized void smallWriteOptimizedflushInternal(boolean isClose) thro
private synchronized void flushInternalAsync() throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
if (hasActiveBlockDataToUpload()) {
uploadCurrentBlock();
}
waitForAppendsToComplete();
flushWrittenBytesToServiceAsync();
}
/**
* Appending the current active data block to service. Clearing the active
* data block and releasing all buffered data.
* @throws IOException if there is any failure while starting an upload for
* the dataBlock or while closing the BlockUploadData.
*/
private void writeAppendBlobCurrentBufferToService() throws IOException {
if (bufferIndex == 0) {
DataBlocks.DataBlock activeBlock = getActiveBlock();
// No data, return.
if (!hasActiveBlockDataToUpload()) {
return;
}
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
if (outputStreamStatistics != null) {
outputStreamStatistics.writeCurrentBuffer();
outputStreamStatistics.bytesToUpload(bytesLength);
}
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
bufferIndex = 0;
final int bytesLength = activeBlock.dataSize();
DataBlocks.BlockUploadData uploadData = activeBlock.startUpload();
clearActiveBlock();
outputStreamStatistics.writeCurrentBuffer();
outputStreamStatistics.bytesToUpload(bytesLength);
final long offset = position;
position += bytesLength;
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
"writeCurrentBufferToService", "append")) {
AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
bytesLength, APPEND_MODE, true, leaseId);
AbfsRestOperation op = client
.append(path, bytes, reqParams, cachedSasToken.get(),
new TracingContext(tracingContext));
AbfsRestOperation op = client.append(path, uploadData.toByteArray(), reqParams,
cachedSasToken.get(), new TracingContext(tracingContext));
cachedSasToken.update(op.getSasToken());
if (outputStreamStatistics != null) {
outputStreamStatistics.uploadSuccessful(bytesLength);
}
outputStreamStatistics.uploadSuccessful(bytesLength);
perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
perfInfo.registerSuccess(true);
return;
} catch (Exception ex) {
if (ex instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ex.getMessage());
}
}
if (ex instanceof AzureBlobFileSystemException) {
ex = (AzureBlobFileSystemException) ex;
}
lastError = new IOException(ex);
throw lastError;
outputStreamStatistics.uploadFailed(bytesLength);
failureWhileSubmit(ex);
} finally {
IOUtils.close(uploadData);
}
}
private synchronized void writeCurrentBufferToService() throws IOException {
writeCurrentBufferToService(false, false);
}
private synchronized void writeCurrentBufferToService(boolean isFlush, boolean isClose) throws IOException {
if (this.isAppendBlob) {
writeAppendBlobCurrentBufferToService();
return;
}
if (bufferIndex == 0) {
return;
}
numOfAppendsToServerSinceLastFlush++;
final byte[] bytes = buffer;
final int bytesLength = bufferIndex;
if (outputStreamStatistics != null) {
outputStreamStatistics.writeCurrentBuffer();
outputStreamStatistics.bytesToUpload(bytesLength);
}
buffer = byteBufferPool.getBuffer(false, bufferSize).array();
bufferIndex = 0;
final long offset = position;
position += bytesLength;
if (threadExecutor.getQueue().size() >= maxRequestsThatCanBeQueued) {
//Tracking time spent on waiting for task to complete.
if (outputStreamStatistics != null) {
try (DurationTracker ignored = outputStreamStatistics.timeSpentTaskWait()) {
waitForTaskToComplete();
}
} else {
waitForTaskToComplete();
}
}
final Future<Void> job = completionService.submit(() -> {
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
AppendRequestParameters.Mode
mode = APPEND_MODE;
if (isFlush & isClose) {
mode = FLUSH_CLOSE_MODE;
} else if (isFlush) {
mode = FLUSH_MODE;
}
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false, leaseId);
AbfsRestOperation op = client.append(path, bytes, reqParams,
cachedSasToken.get(), new TracingContext(tracingContext));
cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult());
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
perfInfo.registerSuccess(true);
return null;
}
});
if (outputStreamStatistics != null) {
if (job.isCancelled()) {
outputStreamStatistics.uploadFailed(bytesLength);
} else {
outputStreamStatistics.uploadSuccessful(bytesLength);
}
}
writeOperations.add(new WriteOperation(job, offset, bytesLength));
// Try to shrink the queue
shrinkWriteOperationQueue();
}
private synchronized void waitForAppendsToComplete() throws IOException {
for (WriteOperation writeOperation : writeOperations) {
try {
writeOperation.task.get();
} catch (Exception ex) {
outputStreamStatistics.uploadFailed(writeOperation.length);
if (ex.getCause() instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(ex.getMessage());
@ -563,7 +645,8 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset,
throw new FileNotFoundException(ex.getMessage());
}
}
throw new IOException(ex);
lastError = new IOException(ex);
throw lastError;
}
this.lastFlushOffset = offset;
}
@ -574,14 +657,14 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset,
*/
private synchronized void shrinkWriteOperationQueue() throws IOException {
try {
while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
writeOperations.peek().task.get();
lastTotalAppendOffset += writeOperations.peek().length;
WriteOperation peek = writeOperations.peek();
while (peek != null && peek.task.isDone()) {
peek.task.get();
lastTotalAppendOffset += peek.length;
writeOperations.remove();
peek = writeOperations.peek();
// Incrementing statistics to indicate queue has been shrunk.
if (outputStreamStatistics != null) {
outputStreamStatistics.queueShrunk();
}
outputStreamStatistics.queueShrunk();
}
} catch (Exception e) {
if (e.getCause() instanceof AzureBlobFileSystemException) {
@ -593,26 +676,6 @@ private synchronized void shrinkWriteOperationQueue() throws IOException {
}
}
private void waitForTaskToComplete() throws IOException {
boolean completed;
for (completed = false; completionService.poll() != null; completed = true) {
// keep polling until there is no data
}
// for AppendBLob, jobs are not submitted to completion service
if (isAppendBlob) {
completed = true;
}
if (!completed) {
try {
completionService.take();
} catch (InterruptedException e) {
lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
throw lastError;
}
}
}
private static class WriteOperation {
private final Future<Void> task;
private final long startOffset;
@ -631,7 +694,7 @@ private static class WriteOperation {
@VisibleForTesting
public synchronized void waitForPendingUploads() throws IOException {
waitForTaskToComplete();
waitForAppendsToComplete();
}
/**
@ -695,12 +758,10 @@ public boolean hasLease() {
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
if (outputStreamStatistics != null) {
sb.append("AbfsOutputStream@").append(this.hashCode());
sb.append("){");
sb.append(outputStreamStatistics.toString());
sb.append("}");
}
sb.append("AbfsOutputStream@").append(this.hashCode());
sb.append("){");
sb.append(outputStreamStatistics.toString());
sb.append("}");
return sb.toString();
}
}

View File

@ -18,6 +18,12 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.store.DataBlocks;
/**
* Class to hold extra output stream configs.
*/
@ -41,6 +47,22 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private AbfsLease lease;
private DataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks;
private AbfsClient client;
private long position;
private FileSystem.Statistics statistics;
private String path;
private ExecutorService executorService;
private TracingContext tracingContext;
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@ -79,11 +101,64 @@ public AbfsOutputStreamContext withAppendBlob(
return this;
}
public AbfsOutputStreamContext build() {
// Validation of parameters to be done here.
public AbfsOutputStreamContext withBlockFactory(
final DataBlocks.BlockFactory blockFactory) {
this.blockFactory = blockFactory;
return this;
}
public AbfsOutputStreamContext withBlockOutputActiveBlocks(
final int blockOutputActiveBlocks) {
this.blockOutputActiveBlocks = blockOutputActiveBlocks;
return this;
}
public AbfsOutputStreamContext withClient(
final AbfsClient client) {
this.client = client;
return this;
}
public AbfsOutputStreamContext withPosition(
final long position) {
this.position = position;
return this;
}
public AbfsOutputStreamContext withFsStatistics(
final FileSystem.Statistics statistics) {
this.statistics = statistics;
return this;
}
public AbfsOutputStreamContext withPath(
final String path) {
this.path = path;
return this;
}
public AbfsOutputStreamContext withExecutorService(
final ExecutorService executorService) {
this.executorService = executorService;
return this;
}
public AbfsOutputStreamContext withTracingContext(
final TracingContext tracingContext) {
this.tracingContext = tracingContext;
return this;
}
public AbfsOutputStreamContext build() {
// Validation of parameters to be done here.
if (streamStatistics == null) {
streamStatistics = new AbfsOutputStreamStatisticsImpl();
}
return this;
}
public AbfsOutputStreamContext withWriteMaxConcurrentRequestCount(
final int writeMaxConcurrentRequestCount) {
this.writeMaxConcurrentRequestCount = writeMaxConcurrentRequestCount;
@ -143,4 +218,36 @@ public String getLeaseId() {
}
return this.lease.getLeaseID();
}
public DataBlocks.BlockFactory getBlockFactory() {
return blockFactory;
}
public int getBlockOutputActiveBlocks() {
return blockOutputActiveBlocks;
}
public AbfsClient getClient() {
return client;
}
public FileSystem.Statistics getStatistics() {
return statistics;
}
public String getPath() {
return path;
}
public long getPosition() {
return position;
}
public ExecutorService getExecutorService() {
return executorService;
}
public TracingContext getTracingContext() {
return tracingContext;
}
}

View File

@ -22,12 +22,14 @@
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.store.BlockUploadStatistics;
/**
* Interface for {@link AbfsOutputStream} statistics.
*/
@InterfaceStability.Unstable
public interface AbfsOutputStreamStatistics extends IOStatisticsSource {
public interface AbfsOutputStreamStatistics extends IOStatisticsSource,
BlockUploadStatistics {
/**
* Number of bytes to be uploaded.

View File

@ -42,7 +42,9 @@ public class AbfsOutputStreamStatisticsImpl
StreamStatisticNames.BYTES_UPLOAD_SUCCESSFUL,
StreamStatisticNames.BYTES_UPLOAD_FAILED,
StreamStatisticNames.QUEUE_SHRUNK_OPS,
StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS
StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS,
StreamStatisticNames.BLOCKS_ALLOCATED,
StreamStatisticNames.BLOCKS_RELEASED
)
.withDurationTracking(
StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST,
@ -60,6 +62,11 @@ public class AbfsOutputStreamStatisticsImpl
private final AtomicLong writeCurrentBufferOps =
ioStatisticsStore.getCounterReference(StreamStatisticNames.WRITE_CURRENT_BUFFER_OPERATIONS);
private final AtomicLong blocksAllocated =
ioStatisticsStore.getCounterReference(StreamStatisticNames.BLOCKS_ALLOCATED);
private final AtomicLong blocksReleased =
ioStatisticsStore.getCounterReference(StreamStatisticNames.BLOCKS_RELEASED);
/**
* Records the need to upload bytes and increments the total bytes that
* needs to be uploaded.
@ -133,6 +140,22 @@ public void writeCurrentBuffer() {
writeCurrentBufferOps.incrementAndGet();
}
/**
* Increment the counter to indicate a block has been allocated.
*/
@Override
public void blockAllocated() {
blocksAllocated.incrementAndGet();
}
/**
* Increment the counter to indicate a block has been released.
*/
@Override
public void blockReleased() {
blocksReleased.incrementAndGet();
}
/**
* {@inheritDoc}
*

View File

@ -20,6 +20,8 @@
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
/**
* Constants for the Azure tests.
*/
@ -175,4 +177,15 @@ public interface AzureTestConstants {
* Base directory for page blobs.
*/
Path PAGE_BLOB_DIR = new Path("/" + DEFAULT_PAGE_BLOB_DIRECTORY);
/**
* Huge file for testing AbfsOutputStream uploads: {@value}
*/
String AZURE_SCALE_HUGE_FILE_UPLOAD = AZURE_SCALE_TEST + "huge.upload";
/**
* Default value for Huge file to be tested for AbfsOutputStream uploads:
* {@value}
*/
int AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT = 2 * DEFAULT_WRITE_BUFFER_SIZE;
}

View File

@ -463,7 +463,7 @@ protected AbfsDelegationTokenManager getDelegationTokenManager()
*/
protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled(
AzureBlobFileSystem fs,
Path path) throws AzureBlobFileSystemException {
Path path) throws IOException {
AzureBlobFileSystemStore abfss = fs.getAbfsStore();
abfss.getAbfsConfiguration().setDisableOutputStreamFlush(false);

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.store.DataBlocks;
import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.AZURE_SCALE_HUGE_FILE_UPLOAD;
import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT;
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assume;
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.getTestPropertyInt;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
/**
* Testing Huge file for AbfsOutputStream.
*/
@RunWith(Parameterized.class)
public class ITestAbfsHugeFiles extends AbstractAbfsScaleTest {
private static final int ONE_MB = 1024 * 1024;
private static final int EIGHT_MB = 8 * ONE_MB;
// Configurable huge file upload: "fs.azure.scale.test.huge.upload",
// default is 2 * DEFAULT_WRITE_BUFFER_SIZE(8M).
private static final int HUGE_FILE;
// Set the HUGE_FILE.
static {
HUGE_FILE = getTestPropertyInt(new Configuration(),
AZURE_SCALE_HUGE_FILE_UPLOAD, AZURE_SCALE_HUGE_FILE_UPLOAD_DEFAULT);
}
// Writing block size to be used in this test.
private int size;
// Block Factory to be used in this test.
private String blockFactoryName;
@Parameterized.Parameters(name = "size [{0}] ; blockFactoryName "
+ "[{1}]")
public static Collection<Object[]> sizes() {
return Arrays.asList(new Object[][] {
{ DEFAULT_WRITE_BUFFER_SIZE, DataBlocks.DATA_BLOCKS_BUFFER_DISK },
{ HUGE_FILE, DataBlocks.DATA_BLOCKS_BUFFER_DISK },
{ DEFAULT_WRITE_BUFFER_SIZE, DataBlocks.DATA_BLOCKS_BUFFER_ARRAY },
{ HUGE_FILE, DataBlocks.DATA_BLOCKS_BUFFER_ARRAY },
{ DEFAULT_WRITE_BUFFER_SIZE, DataBlocks.DATA_BLOCKS_BYTEBUFFER },
{ HUGE_FILE, DataBlocks.DATA_BLOCKS_BYTEBUFFER },
});
}
public ITestAbfsHugeFiles(int size, String blockFactoryName)
throws Exception {
this.size = size;
this.blockFactoryName = blockFactoryName;
}
@Before
public void setUp() throws Exception {
Configuration configuration = getRawConfiguration();
configuration.unset(DATA_BLOCKS_BUFFER);
configuration.set(DATA_BLOCKS_BUFFER, blockFactoryName);
super.setup();
}
/**
* Testing Huge files written at once on AbfsOutputStream.
*/
@Test
public void testHugeFileWrite() throws IOException {
AzureBlobFileSystem fs = getFileSystem();
Path filePath = path(getMethodName());
final byte[] b = new byte[size];
new Random().nextBytes(b);
try (FSDataOutputStream out = fs.create(filePath)) {
out.write(b);
}
// Verify correct length was uploaded. Don't want to verify contents
// here, as this would increase the test time significantly.
assertEquals("Mismatch in content length of file uploaded", size,
fs.getFileStatus(filePath).getLen());
}
/**
* Testing Huge files written in chunks of 8M in lots of writes.
*/
@Test
public void testLotsOfWrites() throws IOException {
assume("If the size isn't a multiple of 8M this test would not pass, so "
+ "skip",
size % EIGHT_MB == 0);
AzureBlobFileSystem fs = getFileSystem();
Path filePath = path(getMethodName());
final byte[] b = new byte[size];
new Random().nextBytes(b);
try (FSDataOutputStream out = fs.create(filePath)) {
int offset = 0;
for (int i = 0; i < size / EIGHT_MB; i++) {
out.write(b, offset, EIGHT_MB);
offset += EIGHT_MB;
}
}
// Verify correct length was uploaded. Don't want to verify contents
// here, as this would increase the test time significantly.
assertEquals("Mismatch in content length of file uploaded", size,
fs.getFileStatus(filePath).getLen());
}
}

View File

@ -48,7 +48,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_NOT_PRESENT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED;
@ -292,18 +291,14 @@ public void testFileSystemClose() throws Exception {
final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
fs.mkdirs(testFilePath.getParent());
FSDataOutputStream out = fs.create(testFilePath);
out.write(0);
Assert.assertFalse("Store leases should exist", fs.getAbfsStore().areLeasesFreed());
try (FSDataOutputStream out = fs.create(testFilePath)) {
out.write(0);
Assert.assertFalse("Store leases should exist",
fs.getAbfsStore().areLeasesFreed());
}
fs.close();
Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_LEASE_NOT_PRESENT
: ERR_LEASE_EXPIRED, () -> {
out.close();
return "Expected exception on close after closed FS but got " + out;
});
LambdaTestUtils.intercept(RejectedExecutionException.class, () -> {
try (FSDataOutputStream out2 = fs.append(testFilePath)) {
}

View File

@ -21,6 +21,8 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
@ -32,7 +34,14 @@
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.fs.store.DataBlocks;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.refEq;
@ -58,12 +67,26 @@ public final class TestAbfsOutputStream {
private final String accountKey1 = globalKey + "." + accountName1;
private final String accountValue1 = "one";
private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferSize,
boolean isFlushEnabled,
boolean disableOutputStreamFlush,
boolean isAppendBlob) throws IOException, IllegalAccessException {
private AbfsOutputStreamContext populateAbfsOutputStreamContext(
int writeBufferSize,
boolean isFlushEnabled,
boolean disableOutputStreamFlush,
boolean isAppendBlob,
AbfsClient client,
String path,
TracingContext tracingContext,
ExecutorService executorService) throws IOException,
IllegalAccessException {
AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
accountName1);
String blockFactoryName =
abfsConf.getRawConfiguration().getTrimmed(DATA_BLOCKS_BUFFER,
DATA_BLOCKS_BUFFER_DEFAULT);
DataBlocks.BlockFactory blockFactory =
DataBlocks.createFactory(FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR,
abfsConf.getRawConfiguration(),
blockFactoryName);
return new AbfsOutputStreamContext(2)
.withWriteBufferSize(writeBufferSize)
.enableFlush(isFlushEnabled)
@ -72,6 +95,11 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(int writeBufferS
.withAppendBlob(isAppendBlob)
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
.withClient(client)
.withPath(path)
.withTracingContext(tracingContext)
.withExecutorService(executorService)
.withBlockFactory(blockFactory)
.build();
}
@ -95,11 +123,18 @@ public void verifyShortWriteRequest() throws Exception {
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
isNull(), any(TracingContext.class))).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false),
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
null));
AbfsOutputStream out = new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
client,
PATH,
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
null),
createExecutorService(abfsConf)));
final byte[] b = new byte[WRITE_SIZE];
new Random().nextBytes(b);
out.write(b);
@ -149,9 +184,16 @@ public void verifyWriteRequest() throws Exception {
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(TracingContext.class))).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(TracingContext.class))).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false),
tracingContext);
AbfsOutputStream out = new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
client,
PATH,
tracingContext,
createExecutorService(abfsConf)));
final byte[] b = new byte[WRITE_SIZE];
new Random().nextBytes(b);
@ -216,9 +258,16 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false),
tracingContext);
AbfsOutputStream out = new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
client,
PATH,
tracingContext,
createExecutorService(abfsConf)));
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
@ -280,11 +329,18 @@ public void verifyWriteRequestOfBufferSize() throws Exception {
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false),
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
null));
AbfsOutputStream out = new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
client,
PATH,
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
null),
createExecutorService(abfsConf)));
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
@ -328,11 +384,18 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
isNull(), any(TracingContext.class))).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true),
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(),
null));
AbfsOutputStream out = new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
true,
client,
PATH,
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(),
null),
createExecutorService(abfsConf)));
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
@ -380,10 +443,18 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
isNull(), any(TracingContext.class))).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false), new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(),
null));
AbfsOutputStream out = new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
client,
PATH,
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(),
null),
createExecutorService(abfsConf)));
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
@ -441,11 +512,18 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(),
isNull(), any(TracingContext.class))).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false),
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
null));
AbfsOutputStream out = new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
true,
false,
false,
client,
PATH,
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
null),
createExecutorService(abfsConf)));
final byte[] b = new byte[BUFFER_SIZE];
new Random().nextBytes(b);
@ -469,4 +547,21 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
verify(client, times(2)).append(
eq(PATH), any(byte[].class), any(), any(), any(TracingContext.class));
}
/**
* Method to create an executor Service for AbfsOutputStream.
* @param abfsConf Configuration.
* @return ExecutorService.
*/
private ExecutorService createExecutorService(
AbfsConfiguration abfsConf) {
ExecutorService executorService =
new SemaphoredDelegatingExecutor(BlockingThreadPoolExecutorService.newInstance(
abfsConf.getWriteMaxConcurrentRequestCount(),
abfsConf.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-test-bounded"),
BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT, true);
return executorService;
}
}