HADOOP-18457. ABFS: Support account level throttling (#5034)
This allows abfs request throttling to be shared across all abfs connections talking to containers belonging to the same abfs storage account -as that is the level at which IO throttling is applied. The option is enabled/disabled in the configuration option "fs.azure.account.throttling.enabled"; The default is "true" Contributed by Anmol Asrani
This commit is contained in:
parent
8c7f2ddc10
commit
1cc8cb68f2
@ -117,6 +117,10 @@ public class AbfsConfiguration{
|
||||
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
|
||||
private boolean optimizeFooterRead;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
|
||||
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
|
||||
private boolean accountThrottlingEnabled;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
|
||||
MinValue = MIN_BUFFER_SIZE,
|
||||
MaxValue = MAX_BUFFER_SIZE,
|
||||
@ -260,6 +264,14 @@ public class AbfsConfiguration{
|
||||
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
|
||||
private boolean enableAutoThrottling;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
|
||||
DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
|
||||
private int accountOperationIdleTimeout;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ANALYSIS_PERIOD,
|
||||
DefaultValue = DEFAULT_ANALYSIS_PERIOD_MS)
|
||||
private int analysisPeriod;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT,
|
||||
MinValue = 0,
|
||||
DefaultValue = RATE_LIMIT_DEFAULT)
|
||||
@ -694,6 +706,10 @@ public String getAppendBlobDirs() {
|
||||
return this.azureAppendBlobDirs;
|
||||
}
|
||||
|
||||
public boolean accountThrottlingEnabled() {
|
||||
return accountThrottlingEnabled;
|
||||
}
|
||||
|
||||
public String getAzureInfiniteLeaseDirs() {
|
||||
return this.azureInfiniteLeaseDirs;
|
||||
}
|
||||
@ -736,6 +752,14 @@ public boolean isAutoThrottlingEnabled() {
|
||||
return this.enableAutoThrottling;
|
||||
}
|
||||
|
||||
public int getAccountOperationIdleTimeout() {
|
||||
return accountOperationIdleTimeout;
|
||||
}
|
||||
|
||||
public int getAnalysisPeriod() {
|
||||
return analysisPeriod;
|
||||
}
|
||||
|
||||
public int getRateLimit() {
|
||||
return rateLimit;
|
||||
}
|
||||
|
@ -55,7 +55,6 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
@ -225,7 +224,6 @@ public void initialize(URI uri, Configuration configuration)
|
||||
}
|
||||
}
|
||||
|
||||
AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
|
||||
rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());
|
||||
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
|
||||
}
|
||||
|
@ -38,6 +38,7 @@ public final class ConfigurationKeys {
|
||||
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
|
||||
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
|
||||
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
|
||||
public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled";
|
||||
|
||||
// Retry strategy defined by the user
|
||||
public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
|
||||
@ -116,6 +117,8 @@ public final class ConfigurationKeys {
|
||||
public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
|
||||
public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
|
||||
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
|
||||
public static final String FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT = "fs.azure.account.operation.idle.timeout";
|
||||
public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
|
||||
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
|
||||
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
|
||||
/** This config ensures that during create overwrite an existing file will be
|
||||
|
@ -94,6 +94,9 @@ public final class FileSystemConfigurations {
|
||||
public static final boolean DEFAULT_ENABLE_FLUSH = true;
|
||||
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
|
||||
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
|
||||
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
|
||||
public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000;
|
||||
public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000;
|
||||
|
||||
public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
|
||||
= DelegatingSSLSocketFactory.SSLChannelMode.Default;
|
||||
|
@ -101,6 +101,7 @@ public class AbfsClient implements Closeable {
|
||||
private AccessTokenProvider tokenProvider;
|
||||
private SASTokenProvider sasTokenProvider;
|
||||
private final AbfsCounters abfsCounters;
|
||||
private final AbfsThrottlingIntercept intercept;
|
||||
|
||||
private final ListeningScheduledExecutorService executorService;
|
||||
|
||||
@ -120,6 +121,7 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
|
||||
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
|
||||
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
|
||||
this.authType = abfsConfiguration.getAuthType(accountName);
|
||||
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
|
||||
|
||||
String encryptionKey = this.abfsConfiguration
|
||||
.getClientProvidedEncryptionKey();
|
||||
@ -216,6 +218,10 @@ SharedKeyCredentials getSharedKeyCredentials() {
|
||||
return sharedKeyCredentials;
|
||||
}
|
||||
|
||||
AbfsThrottlingIntercept getIntercept() {
|
||||
return intercept;
|
||||
}
|
||||
|
||||
List<AbfsHttpHeader> createDefaultHeaders() {
|
||||
final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
|
||||
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
|
||||
@ -1277,6 +1283,14 @@ protected AbfsCounters getAbfsCounters() {
|
||||
return abfsCounters;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter for abfsConfiguration from AbfsClient.
|
||||
* @return AbfsConfiguration instance
|
||||
*/
|
||||
protected AbfsConfiguration getAbfsConfiguration() {
|
||||
return abfsConfiguration;
|
||||
}
|
||||
|
||||
public int getNumLeaseThreads() {
|
||||
return abfsConfiguration.getNumLeaseThreads();
|
||||
}
|
||||
|
@ -20,20 +20,23 @@
|
||||
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.util.Preconditions;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
class AbfsClientThrottlingAnalyzer {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
AbfsClientThrottlingAnalyzer.class);
|
||||
private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
|
||||
private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
|
||||
private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
|
||||
private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
|
||||
@ -50,42 +53,38 @@ class AbfsClientThrottlingAnalyzer {
|
||||
private String name = null;
|
||||
private Timer timer = null;
|
||||
private AtomicReference<AbfsOperationMetrics> blobMetrics = null;
|
||||
private AtomicLong lastExecutionTime = null;
|
||||
private final AtomicBoolean isOperationOnAccountIdle = new AtomicBoolean(false);
|
||||
private AbfsConfiguration abfsConfiguration = null;
|
||||
private boolean accountLevelThrottlingEnabled = true;
|
||||
|
||||
private AbfsClientThrottlingAnalyzer() {
|
||||
// hide default constructor
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
|
||||
* the specified name.
|
||||
*
|
||||
* @param name a name used to identify this instance.
|
||||
* @throws IllegalArgumentException if name is null or empty.
|
||||
*/
|
||||
AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
|
||||
this(name, DEFAULT_ANALYSIS_PERIOD_MS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
|
||||
* the specified name and period.
|
||||
*
|
||||
* @param name A name used to identify this instance.
|
||||
* @param period The frequency, in milliseconds, at which metrics are
|
||||
* analyzed.
|
||||
* @param abfsConfiguration The configuration set.
|
||||
* @throws IllegalArgumentException If name is null or empty.
|
||||
* If period is less than 1000 or greater than 30000 milliseconds.
|
||||
*/
|
||||
AbfsClientThrottlingAnalyzer(String name, int period)
|
||||
AbfsClientThrottlingAnalyzer(String name, AbfsConfiguration abfsConfiguration)
|
||||
throws IllegalArgumentException {
|
||||
Preconditions.checkArgument(
|
||||
StringUtils.isNotEmpty(name),
|
||||
"The argument 'name' cannot be null or empty.");
|
||||
int period = abfsConfiguration.getAnalysisPeriod();
|
||||
Preconditions.checkArgument(
|
||||
period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
|
||||
"The argument 'period' must be between 1000 and 30000.");
|
||||
this.name = name;
|
||||
this.analysisPeriodMs = period;
|
||||
this.abfsConfiguration = abfsConfiguration;
|
||||
this.accountLevelThrottlingEnabled = abfsConfiguration.accountThrottlingEnabled();
|
||||
this.analysisPeriodMs = abfsConfiguration.getAnalysisPeriod();
|
||||
this.lastExecutionTime = new AtomicLong(now());
|
||||
this.blobMetrics = new AtomicReference<AbfsOperationMetrics>(
|
||||
new AbfsOperationMetrics(System.currentTimeMillis()));
|
||||
this.timer = new Timer(
|
||||
@ -95,6 +94,47 @@ private AbfsClientThrottlingAnalyzer() {
|
||||
analysisPeriodMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resumes the timer if it was stopped.
|
||||
*/
|
||||
private void resumeTimer() {
|
||||
blobMetrics = new AtomicReference<AbfsOperationMetrics>(
|
||||
new AbfsOperationMetrics(System.currentTimeMillis()));
|
||||
timer.schedule(new TimerTaskImpl(),
|
||||
analysisPeriodMs,
|
||||
analysisPeriodMs);
|
||||
isOperationOnAccountIdle.set(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronized method to suspend or resume timer.
|
||||
* @param timerFunctionality resume or suspend.
|
||||
* @param timerTask The timertask object.
|
||||
* @return true or false.
|
||||
*/
|
||||
private synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality,
|
||||
TimerTask timerTask) {
|
||||
switch (timerFunctionality) {
|
||||
case RESUME:
|
||||
if (isOperationOnAccountIdle.get()) {
|
||||
resumeTimer();
|
||||
}
|
||||
break;
|
||||
case SUSPEND:
|
||||
if (accountLevelThrottlingEnabled && (System.currentTimeMillis()
|
||||
- lastExecutionTime.get() >= getOperationIdleTimeout())) {
|
||||
isOperationOnAccountIdle.set(true);
|
||||
timerTask.cancel();
|
||||
timer.purge();
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates metrics with results from the current storage operation.
|
||||
*
|
||||
@ -104,12 +144,13 @@ private AbfsClientThrottlingAnalyzer() {
|
||||
public void addBytesTransferred(long count, boolean isFailedOperation) {
|
||||
AbfsOperationMetrics metrics = blobMetrics.get();
|
||||
if (isFailedOperation) {
|
||||
metrics.bytesFailed.addAndGet(count);
|
||||
metrics.operationsFailed.incrementAndGet();
|
||||
metrics.addBytesFailed(count);
|
||||
metrics.incrementOperationsFailed();
|
||||
} else {
|
||||
metrics.bytesSuccessful.addAndGet(count);
|
||||
metrics.operationsSuccessful.incrementAndGet();
|
||||
metrics.addBytesSuccessful(count);
|
||||
metrics.incrementOperationsSuccessful();
|
||||
}
|
||||
blobMetrics.set(metrics);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -117,6 +158,8 @@ public void addBytesTransferred(long count, boolean isFailedOperation) {
|
||||
* @return true if Thread sleeps(Throttling occurs) else false.
|
||||
*/
|
||||
public boolean suspendIfNecessary() {
|
||||
lastExecutionTime.set(now());
|
||||
timerOrchestrator(TimerFunctionality.RESUME, null);
|
||||
int duration = sleepDuration;
|
||||
if (duration > 0) {
|
||||
try {
|
||||
@ -134,19 +177,27 @@ int getSleepDuration() {
|
||||
return sleepDuration;
|
||||
}
|
||||
|
||||
int getOperationIdleTimeout() {
|
||||
return abfsConfiguration.getAccountOperationIdleTimeout();
|
||||
}
|
||||
|
||||
AtomicBoolean getIsOperationOnAccountIdle() {
|
||||
return isOperationOnAccountIdle;
|
||||
}
|
||||
|
||||
private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
|
||||
int sleepDuration) {
|
||||
final double percentageConversionFactor = 100;
|
||||
double bytesFailed = metrics.bytesFailed.get();
|
||||
double bytesSuccessful = metrics.bytesSuccessful.get();
|
||||
double operationsFailed = metrics.operationsFailed.get();
|
||||
double operationsSuccessful = metrics.operationsSuccessful.get();
|
||||
double bytesFailed = metrics.getBytesFailed().get();
|
||||
double bytesSuccessful = metrics.getBytesSuccessful().get();
|
||||
double operationsFailed = metrics.getOperationsFailed().get();
|
||||
double operationsSuccessful = metrics.getOperationsSuccessful().get();
|
||||
double errorPercentage = (bytesFailed <= 0)
|
||||
? 0
|
||||
: (percentageConversionFactor
|
||||
* bytesFailed
|
||||
/ (bytesFailed + bytesSuccessful));
|
||||
long periodMs = metrics.endTime - metrics.startTime;
|
||||
long periodMs = metrics.getEndTime() - metrics.getStartTime();
|
||||
|
||||
double newSleepDuration;
|
||||
|
||||
@ -238,10 +289,13 @@ public void run() {
|
||||
}
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
|
||||
if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
|
||||
return;
|
||||
}
|
||||
if (now - blobMetrics.get().getStartTime() >= analysisPeriodMs) {
|
||||
AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet(
|
||||
new AbfsOperationMetrics(now));
|
||||
oldMetrics.endTime = now;
|
||||
oldMetrics.setEndTime(now);
|
||||
sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
|
||||
sleepDuration);
|
||||
}
|
||||
@ -252,24 +306,4 @@ public void run() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores Abfs operation metrics during each analysis period.
|
||||
*/
|
||||
static class AbfsOperationMetrics {
|
||||
private AtomicLong bytesFailed;
|
||||
private AtomicLong bytesSuccessful;
|
||||
private AtomicLong operationsFailed;
|
||||
private AtomicLong operationsSuccessful;
|
||||
private long endTime;
|
||||
private long startTime;
|
||||
|
||||
AbfsOperationMetrics(long startTime) {
|
||||
this.startTime = startTime;
|
||||
this.bytesFailed = new AtomicLong();
|
||||
this.bytesSuccessful = new AtomicLong();
|
||||
this.operationsFailed = new AtomicLong();
|
||||
this.operationsSuccessful = new AtomicLong();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,10 +19,12 @@
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.net.HttpURLConnection;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
||||
|
||||
@ -38,35 +40,89 @@
|
||||
* and sleeps just enough to minimize errors, allowing optimal ingress and/or
|
||||
* egress throughput.
|
||||
*/
|
||||
public final class AbfsClientThrottlingIntercept {
|
||||
public final class AbfsClientThrottlingIntercept implements AbfsThrottlingIntercept {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
AbfsClientThrottlingIntercept.class);
|
||||
private static final String RANGE_PREFIX = "bytes=";
|
||||
private static AbfsClientThrottlingIntercept singleton = null;
|
||||
private AbfsClientThrottlingAnalyzer readThrottler = null;
|
||||
private AbfsClientThrottlingAnalyzer writeThrottler = null;
|
||||
private static boolean isAutoThrottlingEnabled = false;
|
||||
private static AbfsClientThrottlingIntercept singleton; // singleton, initialized in static initialization block
|
||||
private static final ReentrantLock LOCK = new ReentrantLock();
|
||||
private final AbfsClientThrottlingAnalyzer readThrottler;
|
||||
private final AbfsClientThrottlingAnalyzer writeThrottler;
|
||||
private final String accountName;
|
||||
|
||||
// Hide default constructor
|
||||
private AbfsClientThrottlingIntercept() {
|
||||
readThrottler = new AbfsClientThrottlingAnalyzer("read");
|
||||
writeThrottler = new AbfsClientThrottlingAnalyzer("write");
|
||||
public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsConfiguration) {
|
||||
this.accountName = accountName;
|
||||
this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
|
||||
this.writeThrottler = setAnalyzer("write " + accountName, abfsConfiguration);
|
||||
LOG.debug("Client-side throttling is enabled for the ABFS file system for the account : {}", accountName);
|
||||
}
|
||||
|
||||
public static synchronized void initializeSingleton(boolean enableAutoThrottling) {
|
||||
if (!enableAutoThrottling) {
|
||||
return;
|
||||
}
|
||||
// Hide default constructor
|
||||
private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
|
||||
//Account name is kept as empty as same instance is shared across all accounts
|
||||
this.accountName = "";
|
||||
this.readThrottler = setAnalyzer("read", abfsConfiguration);
|
||||
this.writeThrottler = setAnalyzer("write", abfsConfiguration);
|
||||
LOG.debug("Client-side throttling is enabled for the ABFS file system using singleton intercept");
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the analyzer for the intercept.
|
||||
* @param name Name of the analyzer.
|
||||
* @param abfsConfiguration The configuration.
|
||||
* @return AbfsClientThrottlingAnalyzer instance.
|
||||
*/
|
||||
private AbfsClientThrottlingAnalyzer setAnalyzer(String name, AbfsConfiguration abfsConfiguration) {
|
||||
return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the analyzer for read operations.
|
||||
* @return AbfsClientThrottlingAnalyzer for read.
|
||||
*/
|
||||
AbfsClientThrottlingAnalyzer getReadThrottler() {
|
||||
return readThrottler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the analyzer for write operations.
|
||||
* @return AbfsClientThrottlingAnalyzer for write.
|
||||
*/
|
||||
AbfsClientThrottlingAnalyzer getWriteThrottler() {
|
||||
return writeThrottler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a singleton object of the AbfsClientThrottlingIntercept.
|
||||
* which is shared across all filesystem instances.
|
||||
* @param abfsConfiguration configuration set.
|
||||
* @return singleton object of intercept.
|
||||
*/
|
||||
static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsConfiguration) {
|
||||
if (singleton == null) {
|
||||
singleton = new AbfsClientThrottlingIntercept();
|
||||
isAutoThrottlingEnabled = true;
|
||||
LOG.debug("Client-side throttling is enabled for the ABFS file system.");
|
||||
LOCK.lock();
|
||||
try {
|
||||
if (singleton == null) {
|
||||
singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
|
||||
LOG.debug("Client-side throttling is enabled for the ABFS file system.");
|
||||
}
|
||||
} finally {
|
||||
LOCK.unlock();
|
||||
}
|
||||
}
|
||||
return singleton;
|
||||
}
|
||||
|
||||
static void updateMetrics(AbfsRestOperationType operationType,
|
||||
AbfsHttpOperation abfsHttpOperation) {
|
||||
if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
|
||||
/**
|
||||
* Updates the metrics for successful and failed read and write operations.
|
||||
* @param operationType Only applicable for read and write operations.
|
||||
* @param abfsHttpOperation Used for status code and data transferred.
|
||||
*/
|
||||
@Override
|
||||
public void updateMetrics(AbfsRestOperationType operationType,
|
||||
AbfsHttpOperation abfsHttpOperation) {
|
||||
if (abfsHttpOperation == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -82,7 +138,7 @@ static void updateMetrics(AbfsRestOperationType operationType,
|
||||
case Append:
|
||||
contentLength = abfsHttpOperation.getBytesSent();
|
||||
if (contentLength > 0) {
|
||||
singleton.writeThrottler.addBytesTransferred(contentLength,
|
||||
writeThrottler.addBytesTransferred(contentLength,
|
||||
isFailedOperation);
|
||||
}
|
||||
break;
|
||||
@ -90,7 +146,7 @@ static void updateMetrics(AbfsRestOperationType operationType,
|
||||
String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE);
|
||||
contentLength = getContentLengthIfKnown(range);
|
||||
if (contentLength > 0) {
|
||||
singleton.readThrottler.addBytesTransferred(contentLength,
|
||||
readThrottler.addBytesTransferred(contentLength,
|
||||
isFailedOperation);
|
||||
}
|
||||
break;
|
||||
@ -104,21 +160,18 @@ static void updateMetrics(AbfsRestOperationType operationType,
|
||||
* uses this to suspend the request, if necessary, to minimize errors and
|
||||
* maximize throughput.
|
||||
*/
|
||||
static void sendingRequest(AbfsRestOperationType operationType,
|
||||
@Override
|
||||
public void sendingRequest(AbfsRestOperationType operationType,
|
||||
AbfsCounters abfsCounters) {
|
||||
if (!isAutoThrottlingEnabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
switch (operationType) {
|
||||
case ReadFile:
|
||||
if (singleton.readThrottler.suspendIfNecessary()
|
||||
if (readThrottler.suspendIfNecessary()
|
||||
&& abfsCounters != null) {
|
||||
abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
|
||||
}
|
||||
break;
|
||||
case Append:
|
||||
if (singleton.writeThrottler.suspendIfNecessary()
|
||||
if (writeThrottler.suspendIfNecessary()
|
||||
&& abfsCounters != null) {
|
||||
abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
|
||||
}
|
||||
|
@ -0,0 +1,37 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
final class AbfsNoOpThrottlingIntercept implements AbfsThrottlingIntercept {
|
||||
|
||||
public static final AbfsNoOpThrottlingIntercept INSTANCE = new AbfsNoOpThrottlingIntercept();
|
||||
|
||||
private AbfsNoOpThrottlingIntercept() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateMetrics(final AbfsRestOperationType operationType,
|
||||
final AbfsHttpOperation abfsHttpOperation) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendingRequest(final AbfsRestOperationType operationType,
|
||||
final AbfsCounters abfsCounters) {
|
||||
}
|
||||
}
|
@ -0,0 +1,139 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Stores Abfs operation metrics during each analysis period.
|
||||
*/
|
||||
class AbfsOperationMetrics {
|
||||
|
||||
/**
|
||||
* No of bytes which could not be transferred due to a failed operation.
|
||||
*/
|
||||
private final AtomicLong bytesFailed;
|
||||
|
||||
/**
|
||||
* No of bytes successfully transferred during a successful operation.
|
||||
*/
|
||||
private final AtomicLong bytesSuccessful;
|
||||
|
||||
/**
|
||||
* Total no of failed operations.
|
||||
*/
|
||||
private final AtomicLong operationsFailed;
|
||||
|
||||
/**
|
||||
* Total no of successful operations.
|
||||
*/
|
||||
private final AtomicLong operationsSuccessful;
|
||||
|
||||
/**
|
||||
* Time when collection of metrics ended.
|
||||
*/
|
||||
private long endTime;
|
||||
|
||||
/**
|
||||
* Time when the collection of metrics started.
|
||||
*/
|
||||
private final long startTime;
|
||||
|
||||
AbfsOperationMetrics(long startTime) {
|
||||
this.startTime = startTime;
|
||||
this.bytesFailed = new AtomicLong();
|
||||
this.bytesSuccessful = new AtomicLong();
|
||||
this.operationsFailed = new AtomicLong();
|
||||
this.operationsSuccessful = new AtomicLong();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return bytes failed to transfer.
|
||||
*/
|
||||
AtomicLong getBytesFailed() {
|
||||
return bytesFailed;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return bytes successfully transferred.
|
||||
*/
|
||||
AtomicLong getBytesSuccessful() {
|
||||
return bytesSuccessful;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return no of operations failed.
|
||||
*/
|
||||
AtomicLong getOperationsFailed() {
|
||||
return operationsFailed;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return no of successful operations.
|
||||
*/
|
||||
AtomicLong getOperationsSuccessful() {
|
||||
return operationsSuccessful;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return end time of metric collection.
|
||||
*/
|
||||
long getEndTime() {
|
||||
return endTime;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param endTime sets the end time.
|
||||
*/
|
||||
void setEndTime(final long endTime) {
|
||||
this.endTime = endTime;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return start time of metric collection.
|
||||
*/
|
||||
long getStartTime() {
|
||||
return startTime;
|
||||
}
|
||||
|
||||
void addBytesFailed(long bytes) {
|
||||
this.getBytesFailed().addAndGet(bytes);
|
||||
}
|
||||
|
||||
void addBytesSuccessful(long bytes) {
|
||||
this.getBytesSuccessful().addAndGet(bytes);
|
||||
}
|
||||
|
||||
void incrementOperationsFailed() {
|
||||
this.getOperationsFailed().incrementAndGet();
|
||||
}
|
||||
|
||||
void incrementOperationsSuccessful() {
|
||||
this.getOperationsSuccessful().incrementAndGet();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -45,6 +45,8 @@ public class AbfsRestOperation {
|
||||
private final AbfsRestOperationType operationType;
|
||||
// Blob FS client, which has the credentials, retry policy, and logs.
|
||||
private final AbfsClient client;
|
||||
// Return intercept instance
|
||||
private final AbfsThrottlingIntercept intercept;
|
||||
// the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE)
|
||||
private final String method;
|
||||
// full URL including query parameters
|
||||
@ -145,6 +147,7 @@ String getSasToken() {
|
||||
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
|
||||
this.sasToken = sasToken;
|
||||
this.abfsCounters = client.getAbfsCounters();
|
||||
this.intercept = client.getIntercept();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -241,7 +244,8 @@ private void completeExecute(TracingContext tracingContext)
|
||||
*/
|
||||
private boolean executeHttpOperation(final int retryCount,
|
||||
TracingContext tracingContext) throws AzureBlobFileSystemException {
|
||||
AbfsHttpOperation httpOperation = null;
|
||||
AbfsHttpOperation httpOperation;
|
||||
|
||||
try {
|
||||
// initialize the HTTP request and open the connection
|
||||
httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
|
||||
@ -278,8 +282,7 @@ private boolean executeHttpOperation(final int retryCount,
|
||||
// dump the headers
|
||||
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
|
||||
httpOperation.getConnection().getRequestProperties());
|
||||
AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters);
|
||||
|
||||
intercept.sendingRequest(operationType, abfsCounters);
|
||||
if (hasRequestBody) {
|
||||
// HttpUrlConnection requires
|
||||
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
|
||||
@ -317,7 +320,7 @@ private boolean executeHttpOperation(final int retryCount,
|
||||
|
||||
return false;
|
||||
} finally {
|
||||
AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
|
||||
intercept.updateMetrics(operationType, httpOperation);
|
||||
}
|
||||
|
||||
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString());
|
||||
|
@ -0,0 +1,49 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* An interface for Abfs Throttling Interface.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public interface AbfsThrottlingIntercept {
|
||||
|
||||
/**
|
||||
* Updates the metrics for successful and failed read and write operations.
|
||||
* @param operationType Only applicable for read and write operations.
|
||||
* @param abfsHttpOperation Used for status code and data transferred.
|
||||
*/
|
||||
void updateMetrics(AbfsRestOperationType operationType,
|
||||
AbfsHttpOperation abfsHttpOperation);
|
||||
|
||||
/**
|
||||
* Called before the request is sent. Client-side throttling
|
||||
* uses this to suspend the request, if necessary, to minimize errors and
|
||||
* maximize throughput.
|
||||
* @param operationType Only applicable for read and write operations.
|
||||
* @param abfsCounters Used for counters.
|
||||
*/
|
||||
void sendingRequest(AbfsRestOperationType operationType,
|
||||
AbfsCounters abfsCounters);
|
||||
|
||||
}
|
@ -0,0 +1,102 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.util.WeakReferenceMap;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Class to get an instance of throttling intercept class per account.
|
||||
*/
|
||||
final class AbfsThrottlingInterceptFactory {
|
||||
|
||||
private AbfsThrottlingInterceptFactory() {
|
||||
}
|
||||
|
||||
private static AbfsConfiguration abfsConfig;
|
||||
|
||||
/**
|
||||
* List of references notified of loss.
|
||||
*/
|
||||
private static List<String> lostReferences = new ArrayList<>();
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
AbfsThrottlingInterceptFactory.class);
|
||||
|
||||
/**
|
||||
* Map which stores instance of ThrottlingIntercept class per account.
|
||||
*/
|
||||
private static WeakReferenceMap<String, AbfsThrottlingIntercept>
|
||||
interceptMap = new WeakReferenceMap<>(
|
||||
AbfsThrottlingInterceptFactory::factory,
|
||||
AbfsThrottlingInterceptFactory::referenceLost);
|
||||
|
||||
/**
|
||||
* Returns instance of throttling intercept.
|
||||
* @param accountName Account name.
|
||||
* @return instance of throttling intercept.
|
||||
*/
|
||||
private static AbfsClientThrottlingIntercept factory(final String accountName) {
|
||||
return new AbfsClientThrottlingIntercept(accountName, abfsConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reference lost callback.
|
||||
* @param accountName key lost.
|
||||
*/
|
||||
private static void referenceLost(String accountName) {
|
||||
lostReferences.add(accountName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an instance of AbfsThrottlingIntercept.
|
||||
*
|
||||
* @param accountName The account for which we need instance of throttling intercept.
|
||||
@param abfsConfiguration The object of abfsconfiguration class.
|
||||
* @return Instance of AbfsThrottlingIntercept.
|
||||
*/
|
||||
static synchronized AbfsThrottlingIntercept getInstance(String accountName,
|
||||
AbfsConfiguration abfsConfiguration) {
|
||||
abfsConfig = abfsConfiguration;
|
||||
AbfsThrottlingIntercept intercept;
|
||||
if (!abfsConfiguration.isAutoThrottlingEnabled()) {
|
||||
return AbfsNoOpThrottlingIntercept.INSTANCE;
|
||||
}
|
||||
// If singleton is enabled use a static instance of the intercept class for all accounts
|
||||
if (!abfsConfiguration.accountThrottlingEnabled()) {
|
||||
intercept = AbfsClientThrottlingIntercept.initializeSingleton(
|
||||
abfsConfiguration);
|
||||
} else {
|
||||
// Return the instance from the map
|
||||
intercept = interceptMap.get(accountName);
|
||||
if (intercept == null) {
|
||||
intercept = new AbfsClientThrottlingIntercept(accountName,
|
||||
abfsConfiguration);
|
||||
interceptMap.put(accountName, intercept);
|
||||
}
|
||||
}
|
||||
return intercept;
|
||||
}
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
public enum TimerFunctionality {
|
||||
RESUME,
|
||||
|
||||
SUSPEND
|
||||
}
|
||||
|
@ -767,6 +767,12 @@ Hflush() being the only documented API that can provide persistent data
|
||||
transfer, Flush() also attempting to persist buffered data will lead to
|
||||
performance issues.
|
||||
|
||||
|
||||
### <a name="accountlevelthrottlingoptions"></a> Account level throttling Options
|
||||
|
||||
`fs.azure.account.operation.idle.timeout`: This value specifies the time after which the timer for the analyzer (read or
|
||||
write) should be paused until no new request is made again. The default value for the same is 60 seconds.
|
||||
|
||||
### <a name="hnscheckconfigoptions"></a> HNS Check Options
|
||||
Config `fs.azure.account.hns.enabled` provides an option to specify whether
|
||||
the storage account is HNS enabled or not. In case the config is not provided,
|
||||
@ -877,6 +883,9 @@ when there are too many writes from the same process.
|
||||
tuned with this config considering each queued request holds a buffer. Set
|
||||
the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests.
|
||||
|
||||
`fs.azure.analysis.period`: The time after which sleep duration is recomputed after analyzing metrics. The default value
|
||||
for the same is 10 seconds.
|
||||
|
||||
### <a name="securityconfigoptions"></a> Security Options
|
||||
`fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
|
||||
is made true. Irrespective of the flag, `AbfsClient` will use HTTPS if the secure
|
||||
|
@ -24,6 +24,9 @@
|
||||
public final class TestConfigurationKeys {
|
||||
public static final String FS_AZURE_ACCOUNT_NAME = "fs.azure.account.name";
|
||||
public static final String FS_AZURE_ABFS_ACCOUNT_NAME = "fs.azure.abfs.account.name";
|
||||
public static final String FS_AZURE_ABFS_ACCOUNT1_NAME = "fs.azure.abfs.account1.name";
|
||||
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
|
||||
public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
|
||||
public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key";
|
||||
public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
|
||||
public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";
|
||||
|
@ -306,6 +306,11 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
|
||||
when(client.getAccessToken()).thenCallRealMethod();
|
||||
when(client.getSharedKeyCredentials()).thenCallRealMethod();
|
||||
when(client.createDefaultHeaders()).thenCallRealMethod();
|
||||
when(client.getAbfsConfiguration()).thenReturn(abfsConfig);
|
||||
when(client.getIntercept()).thenReturn(
|
||||
AbfsThrottlingInterceptFactory.getInstance(
|
||||
abfsConfig.getAccountName().substring(0,
|
||||
abfsConfig.getAccountName().indexOf(DOT)), abfsConfig));
|
||||
|
||||
// override baseurl
|
||||
client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
|
||||
|
@ -18,9 +18,15 @@
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ANALYSIS_PERIOD;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@ -33,6 +39,15 @@ public class TestAbfsClientThrottlingAnalyzer {
|
||||
+ ANALYSIS_PERIOD / 10;
|
||||
private static final long MEGABYTE = 1024 * 1024;
|
||||
private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20;
|
||||
private AbfsConfiguration abfsConfiguration;
|
||||
|
||||
public TestAbfsClientThrottlingAnalyzer() throws IOException, IllegalAccessException {
|
||||
final Configuration configuration = new Configuration();
|
||||
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
|
||||
configuration.setInt(FS_AZURE_ANALYSIS_PERIOD, 1000);
|
||||
this.abfsConfiguration = new AbfsConfiguration(configuration,
|
||||
"dummy");
|
||||
}
|
||||
|
||||
private void sleep(long milliseconds) {
|
||||
try {
|
||||
@ -82,8 +97,7 @@ private void validateLessThanOrEqual(long maxExpected, long actual) {
|
||||
@Test
|
||||
public void testNoMetricUpdatesThenNoWaiting() {
|
||||
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
|
||||
"test",
|
||||
ANALYSIS_PERIOD);
|
||||
"test", abfsConfiguration);
|
||||
validate(0, analyzer.getSleepDuration());
|
||||
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
|
||||
validate(0, analyzer.getSleepDuration());
|
||||
@ -96,8 +110,7 @@ public void testNoMetricUpdatesThenNoWaiting() {
|
||||
@Test
|
||||
public void testOnlySuccessThenNoWaiting() {
|
||||
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
|
||||
"test",
|
||||
ANALYSIS_PERIOD);
|
||||
"test", abfsConfiguration);
|
||||
analyzer.addBytesTransferred(8 * MEGABYTE, false);
|
||||
validate(0, analyzer.getSleepDuration());
|
||||
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
|
||||
@ -112,8 +125,7 @@ public void testOnlySuccessThenNoWaiting() {
|
||||
@Test
|
||||
public void testOnlyErrorsAndWaiting() {
|
||||
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
|
||||
"test",
|
||||
ANALYSIS_PERIOD);
|
||||
"test", abfsConfiguration);
|
||||
validate(0, analyzer.getSleepDuration());
|
||||
analyzer.addBytesTransferred(4 * MEGABYTE, true);
|
||||
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
|
||||
@ -132,8 +144,7 @@ public void testOnlyErrorsAndWaiting() {
|
||||
@Test
|
||||
public void testSuccessAndErrorsAndWaiting() {
|
||||
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
|
||||
"test",
|
||||
ANALYSIS_PERIOD);
|
||||
"test", abfsConfiguration);
|
||||
validate(0, analyzer.getSleepDuration());
|
||||
analyzer.addBytesTransferred(8 * MEGABYTE, false);
|
||||
analyzer.addBytesTransferred(2 * MEGABYTE, true);
|
||||
@ -157,8 +168,7 @@ public void testSuccessAndErrorsAndWaiting() {
|
||||
@Test
|
||||
public void testManySuccessAndErrorsAndWaiting() {
|
||||
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
|
||||
"test",
|
||||
ANALYSIS_PERIOD);
|
||||
"test", abfsConfiguration);
|
||||
validate(0, analyzer.getSleepDuration());
|
||||
final int numberOfRequests = 20;
|
||||
for (int i = 0; i < numberOfRequests; i++) {
|
||||
|
@ -18,13 +18,35 @@
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_BACKOFF_INTERVAL;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_AUTOTHROTTLING;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT1_NAME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_NAME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
|
||||
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Assume;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -41,6 +63,9 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
|
||||
private final int noRetryCount = 0;
|
||||
private final int retryCount = new Random().nextInt(maxRetryCount);
|
||||
private final int retryCountBeyondMax = maxRetryCount + 1;
|
||||
private static final String TEST_PATH = "/testfile";
|
||||
private static final double MULTIPLYING_FACTOR = 1.5;
|
||||
private static final int ANALYSIS_PERIOD = 10000;
|
||||
|
||||
|
||||
public TestExponentialRetryPolicy() throws Exception {
|
||||
@ -67,6 +92,173 @@ public void testDefaultMaxIORetryCount() throws Exception {
|
||||
testMaxIOConfig(abfsConfig);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThrottlingIntercept() throws Exception {
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
final Configuration configuration = new Configuration();
|
||||
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
|
||||
configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, false);
|
||||
|
||||
// On disabling throttling AbfsNoOpThrottlingIntercept object is returned
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
|
||||
"dummy.dfs.core.windows.net");
|
||||
AbfsThrottlingIntercept intercept;
|
||||
AbfsClient abfsClient = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration);
|
||||
intercept = abfsClient.getIntercept();
|
||||
Assertions.assertThat(intercept)
|
||||
.describedAs("AbfsNoOpThrottlingIntercept instance expected")
|
||||
.isInstanceOf(AbfsNoOpThrottlingIntercept.class);
|
||||
|
||||
configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, true);
|
||||
configuration.setBoolean(FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED, true);
|
||||
// On disabling throttling AbfsClientThrottlingIntercept object is returned
|
||||
AbfsConfiguration abfsConfiguration1 = new AbfsConfiguration(configuration,
|
||||
"dummy1.dfs.core.windows.net");
|
||||
AbfsClient abfsClient1 = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1);
|
||||
intercept = abfsClient1.getIntercept();
|
||||
Assertions.assertThat(intercept)
|
||||
.describedAs("AbfsClientThrottlingIntercept instance expected")
|
||||
.isInstanceOf(AbfsClientThrottlingIntercept.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateMultipleAccountThrottling() throws Exception {
|
||||
Configuration config = new Configuration(getRawConfiguration());
|
||||
String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
|
||||
if (accountName == null) {
|
||||
// check if accountName is set using different config key
|
||||
accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
|
||||
}
|
||||
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
|
||||
accountName != null && !accountName.isEmpty());
|
||||
|
||||
Configuration rawConfig1 = new Configuration();
|
||||
rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
|
||||
|
||||
AbfsRestOperation successOp = mock(AbfsRestOperation.class);
|
||||
AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
|
||||
when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
|
||||
when(successOp.getResult()).thenReturn(http500Op);
|
||||
|
||||
AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
|
||||
when(configuration.getAnalysisPeriod()).thenReturn(ANALYSIS_PERIOD);
|
||||
when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
|
||||
when(configuration.accountThrottlingEnabled()).thenReturn(false);
|
||||
|
||||
AbfsThrottlingIntercept instance1 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
|
||||
String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
|
||||
|
||||
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
|
||||
accountName1 != null && !accountName1.isEmpty());
|
||||
|
||||
AbfsThrottlingIntercept instance2 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
|
||||
//if singleton is enabled, for different accounts both the instances should return same value
|
||||
Assertions.assertThat(instance1)
|
||||
.describedAs(
|
||||
"if singleton is enabled, for different accounts both the instances should return same value")
|
||||
.isEqualTo(instance2);
|
||||
|
||||
when(configuration.accountThrottlingEnabled()).thenReturn(true);
|
||||
AbfsThrottlingIntercept instance3 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
|
||||
AbfsThrottlingIntercept instance4 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
|
||||
AbfsThrottlingIntercept instance5 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
|
||||
//if singleton is not enabled, for different accounts instances should return different value
|
||||
Assertions.assertThat(instance3)
|
||||
.describedAs(
|
||||
"iff singleton is not enabled, for different accounts instances should return different value")
|
||||
.isNotEqualTo(instance4);
|
||||
|
||||
//if singleton is not enabled, for same accounts instances should return same value
|
||||
Assertions.assertThat(instance3)
|
||||
.describedAs(
|
||||
"if singleton is not enabled, for same accounts instances should return same value")
|
||||
.isEqualTo(instance5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOperationOnAccountIdle() throws Exception {
|
||||
//Get the filesystem.
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
AbfsClient client = fs.getAbfsStore().getClient();
|
||||
AbfsConfiguration configuration1 = client.getAbfsConfiguration();
|
||||
Assume.assumeTrue(configuration1.isAutoThrottlingEnabled());
|
||||
Assume.assumeTrue(configuration1.accountThrottlingEnabled());
|
||||
|
||||
AbfsClientThrottlingIntercept accountIntercept
|
||||
= (AbfsClientThrottlingIntercept) client.getIntercept();
|
||||
final byte[] b = new byte[2 * MIN_BUFFER_SIZE];
|
||||
new Random().nextBytes(b);
|
||||
|
||||
Path testPath = path(TEST_PATH);
|
||||
|
||||
//Do an operation on the filesystem.
|
||||
try (FSDataOutputStream stream = fs.create(testPath)) {
|
||||
stream.write(b);
|
||||
}
|
||||
|
||||
//Don't perform any operation on the account.
|
||||
int sleepTime = (int) ((getAbfsConfig().getAccountOperationIdleTimeout()) * MULTIPLYING_FACTOR);
|
||||
Thread.sleep(sleepTime);
|
||||
|
||||
try (FSDataInputStream streamRead = fs.open(testPath)) {
|
||||
streamRead.read(b);
|
||||
}
|
||||
|
||||
//Perform operations on another account.
|
||||
AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
|
||||
Configuration config = new Configuration(getRawConfiguration());
|
||||
String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
|
||||
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
|
||||
accountName1 != null && !accountName1.isEmpty());
|
||||
final String abfsUrl1 = this.getFileSystemName() + "12" + "@" + accountName1;
|
||||
URI defaultUri1 = null;
|
||||
defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
|
||||
fs1.initialize(defaultUri1, getRawConfiguration());
|
||||
AbfsClient client1 = fs1.getAbfsStore().getClient();
|
||||
AbfsClientThrottlingIntercept accountIntercept1
|
||||
= (AbfsClientThrottlingIntercept) client1.getIntercept();
|
||||
try (FSDataOutputStream stream1 = fs1.create(testPath)) {
|
||||
stream1.write(b);
|
||||
}
|
||||
|
||||
//Verify the write analyzer for first account is idle but the read analyzer is not idle.
|
||||
Assertions.assertThat(accountIntercept.getWriteThrottler()
|
||||
.getIsOperationOnAccountIdle()
|
||||
.get())
|
||||
.describedAs("Write analyzer for first account should be idle the first time")
|
||||
.isTrue();
|
||||
|
||||
Assertions.assertThat(
|
||||
accountIntercept.getReadThrottler()
|
||||
.getIsOperationOnAccountIdle()
|
||||
.get())
|
||||
.describedAs("Read analyzer for first account should not be idle")
|
||||
.isFalse();
|
||||
|
||||
//Verify the write analyzer for second account is not idle.
|
||||
Assertions.assertThat(
|
||||
accountIntercept1.getWriteThrottler()
|
||||
.getIsOperationOnAccountIdle()
|
||||
.get())
|
||||
.describedAs("Write analyzer for second account should not be idle")
|
||||
.isFalse();
|
||||
|
||||
//Again perform an operation on the first account.
|
||||
try (FSDataOutputStream stream2 = fs.create(testPath)) {
|
||||
stream2.write(b);
|
||||
}
|
||||
|
||||
//Verify the write analyzer on first account is not idle.
|
||||
Assertions.assertThat(
|
||||
|
||||
accountIntercept.getWriteThrottler()
|
||||
.getIsOperationOnAccountIdle()
|
||||
.get())
|
||||
.describedAs(
|
||||
"Write analyzer for first account should not be idle second time")
|
||||
.isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbfsConfigConstructor() throws Exception {
|
||||
// Ensure we choose expected values that are not defaults
|
||||
|
Loading…
Reference in New Issue
Block a user