HADOOP-18325: ABFS: Add correlated metric support for ABFS operations (#6314)

Adds support for metric collection at the filesystem instance level.
Metrics are pushed to the store upon the closure of a filesystem instance, encompassing all operations
that utilized that specific instance.

Collected Metrics:

- Number of successful requests without any retries.
- Count of requests that succeeded after a specified number of retries (x retries).
- Request count subjected to throttling.
- Number of requests that failed despite exhausting all retry attempts. etc.
Implementation Details:

Incorporated logic in the AbfsClient to facilitate metric pushing through an additional request.
This occurs in scenarios where no requests are sent to the backend for a defined idle period.
By implementing these enhancements, we ensure comprehensive monitoring and analysis of filesystem interactions, enabling a deeper understanding of success rates, retry scenarios, throttling instances, and exhaustive failure scenarios. Additionally, the AbfsClient logic ensures that metrics are proactively pushed even during idle periods, maintaining a continuous and accurate representation of filesystem performance.

Contributed by Anmol Asrani
This commit is contained in:
Anmol Asrani 2024-05-23 19:40:10 +05:30 committed by GitHub
parent d876505b67
commit d168d3ffee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 2042 additions and 72 deletions

View File

@ -0,0 +1,312 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THOUSAND;
public class AbfsBackoffMetrics {
private AtomicLong numberOfRequestsSucceeded;
private AtomicLong minBackoff;
private AtomicLong maxBackoff;
private AtomicLong totalRequests;
private AtomicLong totalBackoff;
private String retryCount;
private AtomicLong numberOfIOPSThrottledRequests;
private AtomicLong numberOfBandwidthThrottledRequests;
private AtomicLong numberOfOtherThrottledRequests;
private AtomicLong numberOfNetworkFailedRequests;
private AtomicLong maxRetryCount;
private AtomicLong totalNumberOfRequests;
private AtomicLong numberOfRequestsSucceededWithoutRetrying;
private AtomicLong numberOfRequestsFailed;
private final Map<String, AbfsBackoffMetrics> metricsMap
= new ConcurrentHashMap<>();
public AbfsBackoffMetrics() {
initializeMap();
this.numberOfIOPSThrottledRequests = new AtomicLong();
this.numberOfBandwidthThrottledRequests = new AtomicLong();
this.numberOfOtherThrottledRequests = new AtomicLong();
this.totalNumberOfRequests = new AtomicLong();
this.maxRetryCount = new AtomicLong();
this.numberOfRequestsSucceededWithoutRetrying = new AtomicLong();
this.numberOfRequestsFailed = new AtomicLong();
this.numberOfNetworkFailedRequests = new AtomicLong();
}
public AbfsBackoffMetrics(String retryCount) {
this.retryCount = retryCount;
this.numberOfRequestsSucceeded = new AtomicLong();
this.minBackoff = new AtomicLong(Long.MAX_VALUE);
this.maxBackoff = new AtomicLong();
this.totalRequests = new AtomicLong();
this.totalBackoff = new AtomicLong();
}
private void initializeMap() {
ArrayList<String> retryCountList = new ArrayList<String>(
Arrays.asList("1", "2", "3", "4", "5_15", "15_25", "25AndAbove"));
for (String s : retryCountList) {
metricsMap.put(s, new AbfsBackoffMetrics(s));
}
}
public long getNumberOfRequestsSucceeded() {
return this.numberOfRequestsSucceeded.get();
}
public void setNumberOfRequestsSucceeded(long numberOfRequestsSucceeded) {
this.numberOfRequestsSucceeded.set(numberOfRequestsSucceeded);
}
public void incrementNumberOfRequestsSucceeded() {
this.numberOfRequestsSucceeded.getAndIncrement();
}
public long getMinBackoff() {
return this.minBackoff.get();
}
public void setMinBackoff(long minBackoff) {
this.minBackoff.set(minBackoff);
}
public long getMaxBackoff() {
return this.maxBackoff.get();
}
public void setMaxBackoff(long maxBackoff) {
this.maxBackoff.set(maxBackoff);
}
public long getTotalRequests() {
return this.totalRequests.get();
}
public void incrementTotalRequests() {
this.totalRequests.incrementAndGet();
}
public void setTotalRequests(long totalRequests) {
this.totalRequests.set(totalRequests);
}
public long getTotalBackoff() {
return this.totalBackoff.get();
}
public void setTotalBackoff(long totalBackoff) {
this.totalBackoff.set(totalBackoff);
}
public String getRetryCount() {
return this.retryCount;
}
public long getNumberOfIOPSThrottledRequests() {
return this.numberOfIOPSThrottledRequests.get();
}
public void setNumberOfIOPSThrottledRequests(long numberOfIOPSThrottledRequests) {
this.numberOfIOPSThrottledRequests.set(numberOfIOPSThrottledRequests);
}
public void incrementNumberOfIOPSThrottledRequests() {
this.numberOfIOPSThrottledRequests.getAndIncrement();
}
public long getNumberOfBandwidthThrottledRequests() {
return this.numberOfBandwidthThrottledRequests.get();
}
public void setNumberOfBandwidthThrottledRequests(long numberOfBandwidthThrottledRequests) {
this.numberOfBandwidthThrottledRequests.set(numberOfBandwidthThrottledRequests);
}
public void incrementNumberOfBandwidthThrottledRequests() {
this.numberOfBandwidthThrottledRequests.getAndIncrement();
}
public long getNumberOfOtherThrottledRequests() {
return this.numberOfOtherThrottledRequests.get();
}
public void setNumberOfOtherThrottledRequests(long numberOfOtherThrottledRequests) {
this.numberOfOtherThrottledRequests.set(numberOfOtherThrottledRequests);
}
public void incrementNumberOfOtherThrottledRequests() {
this.numberOfOtherThrottledRequests.getAndIncrement();
}
public long getMaxRetryCount() {
return this.maxRetryCount.get();
}
public void setMaxRetryCount(long maxRetryCount) {
this.maxRetryCount.set(maxRetryCount);
}
public void incrementMaxRetryCount() {
this.maxRetryCount.getAndIncrement();
}
public long getTotalNumberOfRequests() {
return this.totalNumberOfRequests.get();
}
public void setTotalNumberOfRequests(long totalNumberOfRequests) {
this.totalNumberOfRequests.set(totalNumberOfRequests);
}
public void incrementTotalNumberOfRequests() {
this.totalNumberOfRequests.getAndIncrement();
}
public Map<String, AbfsBackoffMetrics> getMetricsMap() {
return metricsMap;
}
public long getNumberOfRequestsSucceededWithoutRetrying() {
return this.numberOfRequestsSucceededWithoutRetrying.get();
}
public void setNumberOfRequestsSucceededWithoutRetrying(long numberOfRequestsSucceededWithoutRetrying) {
this.numberOfRequestsSucceededWithoutRetrying.set(numberOfRequestsSucceededWithoutRetrying);
}
public void incrementNumberOfRequestsSucceededWithoutRetrying() {
this.numberOfRequestsSucceededWithoutRetrying.getAndIncrement();
}
public long getNumberOfRequestsFailed() {
return this.numberOfRequestsFailed.get();
}
public void setNumberOfRequestsFailed(long numberOfRequestsFailed) {
this.numberOfRequestsFailed.set(numberOfRequestsFailed);
}
public void incrementNumberOfRequestsFailed() {
this.numberOfRequestsFailed.getAndIncrement();
}
public long getNumberOfNetworkFailedRequests() {
return this.numberOfNetworkFailedRequests.get();
}
public void setNumberOfNetworkFailedRequests(long numberOfNetworkFailedRequests) {
this.numberOfNetworkFailedRequests.set(numberOfNetworkFailedRequests);
}
public void incrementNumberOfNetworkFailedRequests() {
this.numberOfNetworkFailedRequests.getAndIncrement();
}
/*
Acronyms :-
1.RCTSI :- Request count that succeeded in x retries
2.MMA :- Min Max Average (This refers to the backoff or sleep time between 2 requests)
3.s :- seconds
4.BWT :- Number of Bandwidth throttled requests
5.IT :- Number of IOPS throttled requests
6.OT :- Number of Other throttled requests
7.NFR :- Number of requests which failed due to network errors
8.%RT :- Percentage of requests that are throttled
9.TRNR :- Total number of requests which succeeded without retrying
10.TRF :- Total number of requests which failed
11.TR :- Total number of requests which were made
12.MRC :- Max retry count across all requests
*/
@Override
public String toString() {
StringBuilder metricString = new StringBuilder();
long totalRequestsThrottled = getNumberOfBandwidthThrottledRequests()
+ getNumberOfIOPSThrottledRequests()
+ getNumberOfOtherThrottledRequests();
double percentageOfRequestsThrottled =
((double) totalRequestsThrottled / getTotalNumberOfRequests()) * HUNDRED;
for (Map.Entry<String, AbfsBackoffMetrics> entry : metricsMap.entrySet()) {
metricString.append("$RCTSI$_").append(entry.getKey())
.append("R_").append("=")
.append(entry.getValue().getNumberOfRequestsSucceeded());
long totalRequests = entry.getValue().getTotalRequests();
if (totalRequests > 0) {
metricString.append("$MMA$_").append(entry.getKey())
.append("R_").append("=")
.append(String.format("%.3f",
(double) entry.getValue().getMinBackoff() / THOUSAND))
.append("s")
.append(String.format("%.3f",
(double) entry.getValue().getMaxBackoff() / THOUSAND))
.append("s")
.append(String.format("%.3f",
((double) entry.getValue().getTotalBackoff() / totalRequests)
/ THOUSAND))
.append("s");
} else {
metricString.append("$MMA$_").append(entry.getKey())
.append("R_").append("=0s");
}
}
metricString.append("$BWT=")
.append(getNumberOfBandwidthThrottledRequests())
.append("$IT=")
.append(getNumberOfIOPSThrottledRequests())
.append("$OT=")
.append(getNumberOfOtherThrottledRequests())
.append("$RT=")
.append(String.format("%.3f", percentageOfRequestsThrottled))
.append("$NFR=")
.append(getNumberOfNetworkFailedRequests())
.append("$TRNR=")
.append(getNumberOfRequestsSucceededWithoutRetrying())
.append("$TRF=")
.append(getNumberOfRequestsFailed())
.append("$TR=")
.append(getTotalNumberOfRequests())
.append("$MRC=")
.append(getMaxRetryCount());
return metricString + "";
}
}

View File

@ -22,6 +22,7 @@
import java.lang.reflect.Field;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.util.Preconditions;
import org.apache.commons.lang3.StringUtils;
@ -291,6 +292,26 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_IDLE_TIMEOUT,
DefaultValue = DEFAULT_METRIC_IDLE_TIMEOUT_MS)
private int metricIdleTimeout;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ANALYSIS_TIMEOUT,
DefaultValue = DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS)
private int metricAnalysisTimeout;
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_URI,
DefaultValue = EMPTY_STRING)
private String metricUri;
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ACCOUNT_NAME,
DefaultValue = EMPTY_STRING)
private String metricAccount;
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ACCOUNT_KEY,
DefaultValue = EMPTY_STRING)
private String metricAccountKey;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
private int accountOperationIdleTimeout;
@ -818,6 +839,26 @@ public boolean isAutoThrottlingEnabled() {
return this.enableAutoThrottling;
}
public int getMetricIdleTimeout() {
return this.metricIdleTimeout;
}
public int getMetricAnalysisTimeout() {
return this.metricAnalysisTimeout;
}
public String getMetricUri() {
return metricUri;
}
public String getMetricAccount() {
return metricAccount;
}
public String getMetricAccountKey() {
return metricAccountKey;
}
public int getAccountOperationIdleTimeout() {
return accountOperationIdleTimeout;
}
@ -854,6 +895,10 @@ public TracingHeaderFormat getTracingHeaderFormat() {
return getEnum(FS_AZURE_TRACINGHEADER_FORMAT, TracingHeaderFormat.ALL_ID_FORMAT);
}
public MetricFormat getMetricFormat() {
return getEnum(FS_AZURE_METRIC_FORMAT, MetricFormat.EMPTY);
}
public AuthType getAuthType(String accountName) {
return getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
}

View File

@ -21,10 +21,12 @@
import java.net.URI;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
@ -34,8 +36,42 @@
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableMetric;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_APPEND;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_CREATE;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_CREATE_NON_RECURSIVE;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_DELETE;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_EXIST;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_DELEGATION_TOKEN;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_FILE_STATUS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_LIST_STATUS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_MKDIRS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_OPEN;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_RENAME;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.DIRECTORIES_CREATED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.DIRECTORIES_DELETED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.ERROR_IGNORED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.FILES_CREATED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.FILES_DELETED;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_DELETE_REQUEST;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_HEAD_REQUEST;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PATCH_REQUEST;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_POST_REQUEST;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PUT_REQUEST;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.METADATA_INCOMPLETE_RENAME_FAILURES;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.READ_THROTTLES;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_RECOVERY;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SERVER_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.WRITE_THROTTLES;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
import static org.apache.hadoop.util.Time.now;
/**
* Instrumentation of Abfs counters.
@ -63,6 +99,12 @@ public class AbfsCountersImpl implements AbfsCounters {
private final IOStatisticsStore ioStatisticsStore;
private AbfsBackoffMetrics abfsBackoffMetrics = null;
private AbfsReadFooterMetrics abfsReadFooterMetrics = null;
private AtomicLong lastExecutionTime = null;
private static final AbfsStatistic[] STATISTIC_LIST = {
CALL_CREATE,
CALL_OPEN,
@ -91,7 +133,6 @@ public class AbfsCountersImpl implements AbfsCounters {
RENAME_RECOVERY,
METADATA_INCOMPLETE_RENAME_FAILURES,
RENAME_PATH_ATTEMPTS
};
private static final AbfsStatistic[] DURATION_TRACKER_LIST = {
@ -121,6 +162,25 @@ public AbfsCountersImpl(URI uri) {
ioStatisticsStoreBuilder.withDurationTracking(durationStats.getStatName());
}
ioStatisticsStore = ioStatisticsStoreBuilder.build();
lastExecutionTime = new AtomicLong(now());
}
@Override
public void initializeMetrics(MetricFormat metricFormat) {
switch (metricFormat) {
case INTERNAL_BACKOFF_METRIC_FORMAT:
abfsBackoffMetrics = new AbfsBackoffMetrics();
break;
case INTERNAL_FOOTER_METRIC_FORMAT:
abfsReadFooterMetrics = new AbfsReadFooterMetrics();
break;
case INTERNAL_METRIC_FORMAT:
abfsBackoffMetrics = new AbfsBackoffMetrics();
abfsReadFooterMetrics = new AbfsReadFooterMetrics();
break;
default:
break;
}
}
/**
@ -188,6 +248,21 @@ private MetricsRegistry getRegistry() {
return registry;
}
@Override
public AbfsBackoffMetrics getAbfsBackoffMetrics() {
return abfsBackoffMetrics != null ? abfsBackoffMetrics : null;
}
@Override
public AtomicLong getLastExecutionTime() {
return lastExecutionTime;
}
@Override
public AbfsReadFooterMetrics getAbfsReadFooterMetrics() {
return abfsReadFooterMetrics != null ? abfsReadFooterMetrics : null;
}
/**
* {@inheritDoc}
*
@ -244,4 +319,25 @@ public IOStatistics getIOStatistics() {
public DurationTracker trackDuration(String key) {
return ioStatisticsStore.trackDuration(key);
}
@Override
public String toString() {
String metric = "";
if (abfsBackoffMetrics != null) {
long totalNoRequests = getAbfsBackoffMetrics().getTotalNumberOfRequests();
if (totalNoRequests > 0) {
metric += "#BO:" + getAbfsBackoffMetrics().toString();
}
}
if (abfsReadFooterMetrics != null) {
Map<String, AbfsReadFooterMetrics> metricsMap = getAbfsReadFooterMetrics().getMetricsMap();
if (metricsMap != null && !(metricsMap.isEmpty())) {
String readFooterMetric = getAbfsReadFooterMetrics().toString();
if (!readFooterMetric.equals("")) {
metric += "#FO:" + getAbfsReadFooterMetrics().toString();
}
}
}
return metric;
}
}

View File

@ -41,7 +41,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.hadoop.classification.VisibleForTesting;
@ -50,7 +49,6 @@
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
@ -700,6 +698,18 @@ public synchronized void close() throws IOException {
if (isClosed) {
return;
}
if (abfsStore.getClient().isMetricCollectionEnabled()) {
TracingContext tracingMetricContext = new TracingContext(
clientCorrelationId,
fileSystemId, FSOperationType.GET_ATTR, true,
tracingHeaderFormat,
listener, abfsCounters.toString());
try {
getAbfsClient().getMetricCall(tracingMetricContext);
} catch (IOException e) {
throw new IOException(e);
}
}
// does all the delete-on-exit calls, and may be slow.
super.close();
LOG.debug("AzureBlobFileSystem.close");
@ -1680,3 +1690,4 @@ public IOStatistics getIOStatistics() {
return abfsCounters != null ? abfsCounters.getIOStatistics() : null;
}
}

View File

@ -41,6 +41,10 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = "fs.azure.account.expect.header.enabled";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
public static final String FS_AZURE_METRIC_ACCOUNT_NAME = "fs.azure.metric.account.name";
public static final String FS_AZURE_METRIC_ACCOUNT_KEY = "fs.azure.metric.account.key";
public static final String FS_AZURE_METRIC_URI = "fs.azure.metric.uri";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled";
@ -150,6 +154,8 @@ public final class ConfigurationKeys {
public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_METRIC_IDLE_TIMEOUT = "fs.azure.metric.idle.timeout";
public static final String FS_AZURE_METRIC_ANALYSIS_TIMEOUT = "fs.azure.metric.analysis.timeout";
public static final String FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT = "fs.azure.account.operation.idle.timeout";
public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
@ -190,6 +196,7 @@ public final class ConfigurationKeys {
* character constraints are not satisfied. **/
public static final String FS_AZURE_CLIENT_CORRELATIONID = "fs.azure.client.correlationid";
public static final String FS_AZURE_TRACINGHEADER_FORMAT = "fs.azure.tracingheader.format";
public static final String FS_AZURE_METRIC_FORMAT = "fs.azure.metric.format";
public static final String FS_AZURE_CLUSTER_NAME = "fs.azure.cluster.name";
public static final String FS_AZURE_CLUSTER_TYPE = "fs.azure.cluster.type";
public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";

View File

@ -108,6 +108,8 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
public static final int DEFAULT_METRIC_IDLE_TIMEOUT_MS = 60_000;
public static final int DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS = 60_000;
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000;
public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000;
@ -161,5 +163,9 @@ public final class FileSystemConfigurations {
*/
public static final int RATE_LIMIT_DEFAULT = 1_000;
public static final int ZERO = 0;
public static final int HUNDRED = 100;
public static final long THOUSAND = 1000L;
private FileSystemConfigurations() {}
}

View File

@ -43,6 +43,7 @@ public final class HttpHeaderConfigurations {
public static final String USER_AGENT = "User-Agent";
public static final String X_HTTP_METHOD_OVERRIDE = "X-HTTP-Method-Override";
public static final String X_MS_CLIENT_REQUEST_ID = "x-ms-client-request-id";
public static final String X_MS_FECLIENT_METRICS = "x-ms-feclient-metrics";
public static final String X_MS_EXISTING_RESOURCE_TYPE = "x-ms-existing-resource-type";
public static final String X_MS_DATE = "x-ms-date";
public static final String X_MS_REQUEST_ID = "x-ms-request-id";

View File

@ -21,7 +21,8 @@
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -60,6 +61,9 @@ public enum AzureServiceErrorCode {
private final String errorCode;
private final int httpStatusCode;
private final String errorMessage;
private static final Logger LOG1 = LoggerFactory.getLogger(AzureServiceErrorCode.class);
AzureServiceErrorCode(String errorCode, int httpStatusCodes, String errorMessage) {
this.errorCode = errorCode;
this.httpStatusCode = httpStatusCodes;
@ -105,7 +109,6 @@ public static AzureServiceErrorCode getAzureServiceCode(int httpStatusCode, Stri
return azureServiceErrorCode;
}
}
return UNKNOWN;
}
@ -113,16 +116,15 @@ public static AzureServiceErrorCode getAzureServiceCode(int httpStatusCode, Stri
if (errorCode == null || errorCode.isEmpty() || httpStatusCode == UNKNOWN.httpStatusCode || errorMessage == null || errorMessage.isEmpty()) {
return UNKNOWN;
}
String[] errorMessages = errorMessage.split(System.lineSeparator(), 2);
for (AzureServiceErrorCode azureServiceErrorCode : AzureServiceErrorCode.values()) {
if (azureServiceErrorCode.httpStatusCode == httpStatusCode
&& errorCode.equalsIgnoreCase(azureServiceErrorCode.errorCode)
&& errorMessage.equalsIgnoreCase(azureServiceErrorCode.errorMessage)
) {
if (azureServiceErrorCode.getStatusCode() == httpStatusCode
&& azureServiceErrorCode.getErrorCode().equalsIgnoreCase(errorCode)
&& azureServiceErrorCode.getErrorMessage()
.equalsIgnoreCase(errorMessages[0])) {
return azureServiceErrorCode;
}
}
return UNKNOWN;
}
}

View File

@ -22,24 +22,31 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.fs.azurebfs.utils.NamespaceUtil;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
@ -115,6 +122,13 @@ public class AbfsClient implements Closeable {
private AccessTokenProvider tokenProvider;
private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters;
private final Timer timer;
private final String abfsMetricUrl;
private boolean isMetricCollectionEnabled = false;
private final MetricFormat metricFormat;
private final AtomicBoolean isMetricCollectionStopped;
private final int metricAnalysisPeriod;
private final int metricIdlePeriod;
private EncryptionContextProvider encryptionContextProvider = null;
private EncryptionType encryptionType = EncryptionType.NONE;
private final AbfsThrottlingIntercept intercept;
@ -123,6 +137,9 @@ public class AbfsClient implements Closeable {
private Boolean isNamespaceEnabled;
private boolean renameResilience;
private TimerTask runningTimerTask;
private boolean isSendMetricCall;
private SharedKeyCredentials metricSharedkeyCredentials = null;
/**
* logging the rename failure if metadata is in an incomplete state.
@ -181,6 +198,35 @@ private AbfsClient(final URL baseUrl,
new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease Ops").setDaemon(true).build();
this.executorService = MoreExecutors.listeningDecorator(
HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(), tf));
this.metricFormat = abfsConfiguration.getMetricFormat();
this.isMetricCollectionStopped = new AtomicBoolean(false);
this.metricAnalysisPeriod = abfsConfiguration.getMetricAnalysisTimeout();
this.metricIdlePeriod = abfsConfiguration.getMetricIdleTimeout();
if (!metricFormat.toString().equals("")) {
isMetricCollectionEnabled = true;
abfsCounters.initializeMetrics(metricFormat);
String metricAccountName = abfsConfiguration.getMetricAccount();
int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT);
if (dotIndex <= 0) {
throw new InvalidUriException(
metricAccountName + " - account name is not fully qualified.");
}
String metricAccountKey = abfsConfiguration.getMetricAccountKey();
try {
metricSharedkeyCredentials = new SharedKeyCredentials(metricAccountName.substring(0, dotIndex),
metricAccountKey);
} catch (IllegalArgumentException e) {
throw new IOException("Exception while initializing metric credentials " + e);
}
}
this.timer = new Timer(
"abfs-timer-client", true);
if (isMetricCollectionEnabled) {
timer.schedule(new TimerTaskImpl(),
metricIdlePeriod,
metricIdlePeriod);
}
this.abfsMetricUrl = abfsConfiguration.getMetricUri();
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
@ -207,6 +253,10 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent
@Override
public void close() throws IOException {
if (runningTimerTask != null) {
runningTimerTask.cancel();
timer.purge();
}
if (tokenProvider instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG,
(Closeable) tokenProvider);
@ -246,6 +296,10 @@ SharedKeyCredentials getSharedKeyCredentials() {
return sharedKeyCredentials;
}
SharedKeyCredentials getMetricSharedkeyCredentials() {
return metricSharedkeyCredentials;
}
public void setEncryptionType(EncryptionType encryptionType) {
this.encryptionType = encryptionType;
}
@ -1057,7 +1111,6 @@ public AbfsRestOperation getPathStatus(final String path,
final ContextEncryptionAdapter contextEncryptionAdapter)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
String operation = SASTokenProvider.GET_PROPERTIES_OPERATION;
if (!includeProperties) {
@ -1318,7 +1371,6 @@ public AbfsRestOperation getAclStatus(final String path, TracingContext tracingC
public AbfsRestOperation getAclStatus(final String path, final boolean useUPN,
TracingContext tracingContext) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_ACCESS_CONTROL);
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(useUPN));
@ -1435,6 +1487,7 @@ private String appendSASTokenToQuery(String path,
return sasToken;
}
@VisibleForTesting
private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
return createRequestUrl(EMPTY_STRING, query);
}
@ -1442,7 +1495,12 @@ private URL createRequestUrl(final String query) throws AzureBlobFileSystemExcep
@VisibleForTesting
protected URL createRequestUrl(final String path, final String query)
throws AzureBlobFileSystemException {
final String base = baseUrl.toString();
return createRequestUrl(baseUrl, path, query);
}
@VisibleForTesting
protected URL createRequestUrl(final URL baseUrl, final String path, final String query)
throws AzureBlobFileSystemException {
String encodedPath = path;
try {
encodedPath = urlEncode(path);
@ -1452,7 +1510,10 @@ protected URL createRequestUrl(final String path, final String query)
}
final StringBuilder sb = new StringBuilder();
sb.append(base);
if (baseUrl == null) {
throw new InvalidUriException("URL provided is null");
}
sb.append(baseUrl.toString());
sb.append(encodedPath);
sb.append(query);
@ -1460,7 +1521,7 @@ protected URL createRequestUrl(final String path, final String query)
try {
url = new URL(sb.toString());
} catch (MalformedURLException ex) {
throw new InvalidUriException(sb.toString());
throw new InvalidUriException("URL is malformed" + sb.toString());
}
return url;
}
@ -1693,7 +1754,7 @@ void setIsNamespaceEnabled(final Boolean isNamespaceEnabled) {
* Getter for abfsCounters from AbfsClient.
* @return AbfsCounters instance.
*/
protected AbfsCounters getAbfsCounters() {
public AbfsCounters getAbfsCounters() {
return abfsCounters;
}
@ -1731,6 +1792,128 @@ protected AccessTokenProvider getTokenProvider() {
return tokenProvider;
}
/**
* Retrieves a TracingContext object configured for metric tracking.
* This method creates a TracingContext object with the validated client correlation ID,
* the host name of the local machine (or "UnknownHost" if unable to determine),
* the file system operation type set to GET_ATTR, and additional configuration parameters
* for metric tracking.
* The TracingContext is intended for use in tracking metrics related to Azure Blob FileSystem (ABFS) operations.
*
* @return A TracingContext object configured for metric tracking.
*/
private TracingContext getMetricTracingContext() {
String hostName;
try {
hostName = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
hostName = "UnknownHost";
}
return new TracingContext(TracingContext.validateClientCorrelationID(
abfsConfiguration.getClientCorrelationId()),
hostName, FSOperationType.GET_ATTR, true,
abfsConfiguration.getTracingHeaderFormat(),
null, abfsCounters.toString());
}
/**
* Synchronized method to suspend or resume timer.
* @param timerFunctionality resume or suspend.
* @param timerTask The timertask object.
* @return true or false.
*/
boolean timerOrchestrator(TimerFunctionality timerFunctionality, TimerTask timerTask) {
switch (timerFunctionality) {
case RESUME:
if (isMetricCollectionStopped.get()) {
synchronized (this) {
if (isMetricCollectionStopped.get()) {
resumeTimer();
}
}
}
break;
case SUSPEND:
long now = System.currentTimeMillis();
long lastExecutionTime = abfsCounters.getLastExecutionTime().get();
if (isMetricCollectionEnabled && (now - lastExecutionTime >= metricAnalysisPeriod)) {
synchronized (this) {
if (!isMetricCollectionStopped.get()) {
timerTask.cancel();
timer.purge();
isMetricCollectionStopped.set(true);
return true;
}
}
}
break;
default:
break;
}
return false;
}
private void resumeTimer() {
isMetricCollectionStopped.set(false);
timer.schedule(new TimerTaskImpl(),
metricIdlePeriod,
metricIdlePeriod);
}
/**
* Initiates a metric call to the Azure Blob FileSystem (ABFS) for retrieving file system properties.
* This method performs a HEAD request to the specified metric URL, using default headers and query parameters.
*
* @param tracingContext The tracing context to be used for capturing tracing information.
* @throws IOException throws IOException.
*/
public void getMetricCall(TracingContext tracingContext) throws IOException {
this.isSendMetricCall = true;
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(new URL(abfsMetricUrl), EMPTY_STRING, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.GetFileSystemProperties,
HTTP_METHOD_HEAD,
url,
requestHeaders);
try {
op.execute(tracingContext);
} finally {
this.isSendMetricCall = false;
}
}
public boolean isSendMetricCall() {
return isSendMetricCall;
}
public boolean isMetricCollectionEnabled() {
return isMetricCollectionEnabled;
}
class TimerTaskImpl extends TimerTask {
TimerTaskImpl() {
runningTimerTask = this;
}
@Override
public void run() {
try {
if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
try {
getMetricCall(getMetricTracingContext());
} finally {
abfsCounters.initializeMetrics(metricFormat);
}
}
} catch (IOException e) {
}
}
}
/**
* Creates an AbfsRestOperation with additional parameters for buffer and SAS token.
*

View File

@ -53,7 +53,7 @@ public AbfsPerfTracker getAbfsPerfTracker() {
return abfsPerfTracker;
}
public AbfsCounters getAbfsCounters() {
AbfsCounters getAbfsCounters() {
return abfsCounters;
}
}

View File

@ -19,12 +19,15 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.azurebfs.AbfsBackoffMetrics;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
@ -74,4 +77,12 @@ String formString(String prefix, String separator, String suffix,
*/
@Override
DurationTracker trackDuration(String key);
void initializeMetrics(MetricFormat metricFormat);
AbfsBackoffMetrics getAbfsBackoffMetrics();
AbfsReadFooterMetrics getAbfsReadFooterMetrics();
AtomicLong getLastExecutionTime();
}

View File

@ -110,15 +110,15 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private int bCursorBkp;
private long fCursorBkp;
private long fCursorAfterLastReadBkp;
private final AbfsReadFooterMetrics abfsReadFooterMetrics;
/** Stream statistics. */
private final AbfsInputStreamStatistics streamStatistics;
private long bytesFromReadAhead; // bytes read from readAhead; for testing
private long bytesFromRemoteRead; // bytes read remotely; for testing
private Listener listener;
private final AbfsInputStreamContext context;
private IOStatistics ioStatistics;
private String filePathIdentifier;
/**
* This is the actual position within the object, used by
* lazy seek to decide whether to seek on the next read or not.
@ -141,9 +141,6 @@ public AbfsInputStream(
this.path = path;
this.contentLength = contentLength;
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
/*
* FooterReadSize should not be more than bufferSize.
*/
this.footerReadSize = Math.min(bufferSize, abfsInputStreamContext.getFooterReadBufferSize());
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
@ -157,12 +154,19 @@ public AbfsInputStream(
this.cachedSasToken = new CachedSASToken(
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
this.abfsReadFooterMetrics = client.getAbfsCounters().getAbfsReadFooterMetrics();
this.inputStreamId = createInputStreamId();
this.tracingContext = new TracingContext(tracingContext);
this.tracingContext.setOperation(FSOperationType.READ);
this.tracingContext.setStreamID(inputStreamId);
this.context = abfsInputStreamContext;
readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
if (abfsReadFooterMetrics != null) {
this.filePathIdentifier = eTag + path;
synchronized (this) {
abfsReadFooterMetrics.updateMap(filePathIdentifier);
}
}
this.fsBackRef = abfsInputStreamContext.getFsBackRef();
contextEncryptionAdapter = abfsInputStreamContext.getEncryptionAdapter();
@ -253,6 +257,9 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
// go back and read from buffer is fCursor - limit.
// There maybe case that we read less than requested data.
long filePosAtStartOfBuffer = fCursor - limit;
if (abfsReadFooterMetrics != null) {
abfsReadFooterMetrics.checkMetricUpdate(filePathIdentifier, len, contentLength, nextReadPos);
}
if (nextReadPos >= filePosAtStartOfBuffer && nextReadPos <= fCursor) {
// Determining position in buffer from where data is to be read.
bCursor = (int) (nextReadPos - filePosAtStartOfBuffer);
@ -339,7 +346,6 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
if (firstRead) {
firstRead = false;
}
if (bytesRead == -1) {
return -1;
}

View File

@ -0,0 +1,549 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
public class AbfsReadFooterMetrics {
private final AtomicBoolean isParquetFile;
private final AtomicBoolean isParquetEvaluated;
private final AtomicBoolean isLenUpdated;
private String sizeReadByFirstRead;
private String offsetDiffBetweenFirstAndSecondRead;
private final AtomicLong fileLength;
private double avgFileLength;
private double avgReadLenRequested;
private final AtomicBoolean collectMetrics;
private final AtomicBoolean collectMetricsForNextRead;
private final AtomicBoolean collectLenMetrics;
private final AtomicLong dataLenRequested;
private final AtomicLong offsetOfFirstRead;
private final AtomicInteger readCount;
private final ConcurrentSkipListMap<String, AbfsReadFooterMetrics> metricsMap;
private static final String FOOTER_LENGTH = "20";
public AbfsReadFooterMetrics() {
this.isParquetFile = new AtomicBoolean(false);
this.isParquetEvaluated = new AtomicBoolean(false);
this.isLenUpdated = new AtomicBoolean(false);
this.fileLength = new AtomicLong();
this.readCount = new AtomicInteger(0);
this.offsetOfFirstRead = new AtomicLong();
this.collectMetrics = new AtomicBoolean(false);
this.collectMetricsForNextRead = new AtomicBoolean(false);
this.collectLenMetrics = new AtomicBoolean(false);
this.dataLenRequested = new AtomicLong(0);
this.metricsMap = new ConcurrentSkipListMap<>();
}
public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
return metricsMap;
}
private boolean getIsParquetFile() {
return isParquetFile.get();
}
public void setIsParquetFile(boolean isParquetFile) {
this.isParquetFile.set(isParquetFile);
}
private String getSizeReadByFirstRead() {
return sizeReadByFirstRead;
}
public void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
this.sizeReadByFirstRead = sizeReadByFirstRead;
}
private String getOffsetDiffBetweenFirstAndSecondRead() {
return offsetDiffBetweenFirstAndSecondRead;
}
public void setOffsetDiffBetweenFirstAndSecondRead(final String offsetDiffBetweenFirstAndSecondRead) {
this.offsetDiffBetweenFirstAndSecondRead
= offsetDiffBetweenFirstAndSecondRead;
}
private long getFileLength() {
return fileLength.get();
}
private void setFileLength(long fileLength) {
this.fileLength.set(fileLength);
}
private double getAvgFileLength() {
return avgFileLength;
}
public void setAvgFileLength(final double avgFileLength) {
this.avgFileLength = avgFileLength;
}
private double getAvgReadLenRequested() {
return avgReadLenRequested;
}
public void setAvgReadLenRequested(final double avgReadLenRequested) {
this.avgReadLenRequested = avgReadLenRequested;
}
private boolean getCollectMetricsForNextRead() {
return collectMetricsForNextRead.get();
}
private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead) {
this.collectMetricsForNextRead.set(collectMetricsForNextRead);
}
private long getOffsetOfFirstRead() {
return offsetOfFirstRead.get();
}
private void setOffsetOfFirstRead(long offsetOfFirstRead) {
this.offsetOfFirstRead.set(offsetOfFirstRead);
}
private int getReadCount() {
return readCount.get();
}
private void setReadCount(int readCount) {
this.readCount.set(readCount);
}
private int incrementReadCount() {
this.readCount.incrementAndGet();
return getReadCount();
}
private boolean getCollectLenMetrics() {
return collectLenMetrics.get();
}
private void setCollectLenMetrics(boolean collectLenMetrics) {
this.collectLenMetrics.set(collectLenMetrics);
}
private long getDataLenRequested() {
return dataLenRequested.get();
}
private void setDataLenRequested(long dataLenRequested) {
this.dataLenRequested.set(dataLenRequested);
}
private void updateDataLenRequested(long dataLenRequested){
this.dataLenRequested.addAndGet(dataLenRequested);
}
private boolean getCollectMetrics() {
return collectMetrics.get();
}
private void setCollectMetrics(boolean collectMetrics) {
this.collectMetrics.set(collectMetrics);
}
private boolean getIsParquetEvaluated() {
return isParquetEvaluated.get();
}
private void setIsParquetEvaluated(boolean isParquetEvaluated) {
this.isParquetEvaluated.set(isParquetEvaluated);
}
private boolean getIsLenUpdated() {
return isLenUpdated.get();
}
private void setIsLenUpdated(boolean isLenUpdated) {
this.isLenUpdated.set(isLenUpdated);
}
/**
* Updates the metrics map with an entry for the specified file if it doesn't already exist.
*
* @param filePathIdentifier The unique identifier for the file.
*/
public void updateMap(String filePathIdentifier) {
// If the file is not already in the metrics map, add it with a new AbfsReadFooterMetrics object.
metricsMap.computeIfAbsent(filePathIdentifier, key -> new AbfsReadFooterMetrics());
}
/**
* Checks and updates metrics for a specific file identified by filePathIdentifier.
* If the metrics do not exist for the file, they are initialized.
*
* @param filePathIdentifier The unique identifier for the file.
* @param len The length of the read operation.
* @param contentLength The total content length of the file.
* @param nextReadPos The position of the next read operation.
*/
public void checkMetricUpdate(final String filePathIdentifier, final int len, final long contentLength,
final long nextReadPos) {
AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
filePathIdentifier, key -> new AbfsReadFooterMetrics());
if (readFooterMetrics.getReadCount() == 0
|| (readFooterMetrics.getReadCount() >= 1
&& readFooterMetrics.getCollectMetrics())) {
updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
}
}
/**
* Updates metrics for a specific file identified by filePathIdentifier.
*
* @param filePathIdentifier The unique identifier for the file.
* @param len The length of the read operation.
* @param contentLength The total content length of the file.
* @param nextReadPos The position of the next read operation.
*/
private void updateMetrics(final String filePathIdentifier, final int len, final long contentLength,
final long nextReadPos) {
AbfsReadFooterMetrics readFooterMetrics = metricsMap.get(filePathIdentifier);
// Create a new AbfsReadFooterMetrics object if it doesn't exist in the metricsMap.
if (readFooterMetrics == null) {
readFooterMetrics = new AbfsReadFooterMetrics();
metricsMap.put(filePathIdentifier, readFooterMetrics);
}
int readCount;
synchronized (this) {
readCount = readFooterMetrics.incrementReadCount();
}
if (readCount == 1) {
// Update metrics for the first read.
updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len, contentLength);
}
synchronized (this) {
if (readFooterMetrics.getCollectLenMetrics()) {
readFooterMetrics.updateDataLenRequested(len);
}
}
if (readCount == 2) {
// Update metrics for the second read.
updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len);
}
}
/**
* Updates metrics for the first read operation.
*
* @param readFooterMetrics The metrics object to update.
* @param nextReadPos The position of the next read operation.
* @param len The length of the read operation.
* @param contentLength The total content length of the file.
*/
private void updateMetricsOnFirstRead(AbfsReadFooterMetrics readFooterMetrics, long nextReadPos, int len, long contentLength) {
if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH) * ONE_KB) {
readFooterMetrics.setCollectMetrics(true);
readFooterMetrics.setCollectMetricsForNextRead(true);
readFooterMetrics.setOffsetOfFirstRead(nextReadPos);
readFooterMetrics.setSizeReadByFirstRead(len + "_" + Math.abs(contentLength - nextReadPos));
readFooterMetrics.setFileLength(contentLength);
}
}
/**
* Updates metrics for the second read operation.
*
* @param readFooterMetrics The metrics object to update.
* @param nextReadPos The position of the next read operation.
* @param len The length of the read operation.
*/
private void updateMetricsOnSecondRead(AbfsReadFooterMetrics readFooterMetrics, long nextReadPos, int len) {
if (readFooterMetrics.getCollectMetricsForNextRead()) {
long offsetDiff = Math.abs(nextReadPos - readFooterMetrics.getOffsetOfFirstRead());
readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + offsetDiff);
readFooterMetrics.setCollectLenMetrics(true);
}
}
/**
* Check if the given file should be marked as a Parquet file.
*
* @param metrics The metrics to evaluate.
* @return True if the file meet the criteria for being marked as a Parquet file, false otherwise.
*/
private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) {
return metrics.getCollectMetrics()
&& metrics.getReadCount() >= 2
&& !metrics.getIsParquetEvaluated()
&& haveEqualValues(metrics.getSizeReadByFirstRead())
&& haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead());
}
/**
* Check if two values are equal, considering they are in the format "value1_value2".
*
* @param value The value to check.
* @return True if the two parts of the value are equal, false otherwise.
*/
private boolean haveEqualValues(String value) {
String[] parts = value.split("_");
return parts.length == 2 && parts[0].equals(parts[1]);
}
/**
* Mark the given metrics as a Parquet file and update related values.
*
* @param metrics The metrics to mark as Parquet.
*/
private void markAsParquet(AbfsReadFooterMetrics metrics) {
metrics.setIsParquetFile(true);
String[] parts = metrics.getSizeReadByFirstRead().split("_");
metrics.setSizeReadByFirstRead(parts[0]);
parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]);
metrics.setIsParquetEvaluated(true);
}
/**
* Check each metric in the provided map and mark them as Parquet files if they meet the criteria.
*
* @param metricsMap The map containing metrics to evaluate.
*/
public void checkIsParquet(Map<String, AbfsReadFooterMetrics> metricsMap) {
for (Map.Entry<String, AbfsReadFooterMetrics> entry : metricsMap.entrySet()) {
AbfsReadFooterMetrics readFooterMetrics = entry.getValue();
if (shouldMarkAsParquet(readFooterMetrics)) {
markAsParquet(readFooterMetrics);
metricsMap.replace(entry.getKey(), readFooterMetrics);
}
}
}
/**
* Updates the average read length requested for metrics of all files in the metrics map.
* If the metrics indicate that the update is needed, it calculates the average read length and updates the metrics.
*
* @param metricsMap A map containing metrics for different files with unique identifiers.
*/
private void updateLenRequested(Map<String, AbfsReadFooterMetrics> metricsMap) {
for (AbfsReadFooterMetrics readFooterMetrics : metricsMap.values()) {
if (shouldUpdateLenRequested(readFooterMetrics)) {
int readReqCount = readFooterMetrics.getReadCount() - 2;
readFooterMetrics.setAvgReadLenRequested(
(double) readFooterMetrics.getDataLenRequested() / readReqCount);
readFooterMetrics.setIsLenUpdated(true);
}
}
}
/**
* Checks whether the average read length requested should be updated for the given metrics.
*
* The method returns true if the following conditions are met:
* - Metrics collection is enabled.
* - The number of read counts is greater than 2.
* - The average read length has not been updated previously.
*
* @param readFooterMetrics The metrics object to evaluate.
* @return True if the average read length should be updated, false otherwise.
*/
private boolean shouldUpdateLenRequested(AbfsReadFooterMetrics readFooterMetrics) {
return readFooterMetrics.getCollectMetrics()
&& readFooterMetrics.getReadCount() > 2
&& !readFooterMetrics.getIsLenUpdated();
}
/**
* Calculates the average metrics from a list of AbfsReadFooterMetrics and sets the values in the provided 'avgParquetReadFooterMetrics' object.
*
* @param isParquetList The list of AbfsReadFooterMetrics to compute the averages from.
* @param avgParquetReadFooterMetrics The target AbfsReadFooterMetrics object to store the computed average values.
*
* This method calculates various average metrics from the provided list and sets them in the 'avgParquetReadFooterMetrics' object.
* The metrics include:
* - Size read by the first read
* - Offset difference between the first and second read
* - Average file length
* - Average requested read length
*/
private void getParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> isParquetList,
AbfsReadFooterMetrics avgParquetReadFooterMetrics){
avgParquetReadFooterMetrics.setSizeReadByFirstRead(
String.format("%.3f", isParquetList.stream()
.map(AbfsReadFooterMetrics::getSizeReadByFirstRead).mapToDouble(
Double::parseDouble).average().orElse(0.0)));
avgParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(
String.format("%.3f", isParquetList.stream()
.map(AbfsReadFooterMetrics::getOffsetDiffBetweenFirstAndSecondRead)
.mapToDouble(Double::parseDouble).average().orElse(0.0)));
avgParquetReadFooterMetrics.setAvgFileLength(isParquetList.stream()
.mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
avgParquetReadFooterMetrics.setAvgReadLenRequested(isParquetList.stream().
map(AbfsReadFooterMetrics::getAvgReadLenRequested).
mapToDouble(Double::doubleValue).average().orElse(0.0));
}
/**
* Calculates the average metrics from a list of non-Parquet AbfsReadFooterMetrics instances.
*
* This method takes a list of AbfsReadFooterMetrics representing non-Parquet reads and calculates
* the average values for the size read by the first read and the offset difference between the first
* and second read. The averages are then set in the provided AbfsReadFooterMetrics instance.
*
* @param isNonParquetList A list of AbfsReadFooterMetrics instances representing non-Parquet reads.
* @param avgNonParquetReadFooterMetrics The AbfsReadFooterMetrics instance to store the calculated averages.
* It is assumed that the size of the list is at least 1, and the first
* element of the list is used to determine the size of arrays.
* The instance is modified in-place with the calculated averages.
*
*
**/
private void getNonParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> isNonParquetList,
AbfsReadFooterMetrics avgNonParquetReadFooterMetrics) {
int size = isNonParquetList.get(0).getSizeReadByFirstRead().split("_").length;
double[] store = new double[2 * size];
// Calculating sum of individual values
isNonParquetList.forEach(abfsReadFooterMetrics -> {
String[] firstReadSize = abfsReadFooterMetrics.getSizeReadByFirstRead().split("_");
String[] offDiffFirstSecondRead = abfsReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
for (int i = 0; i < firstReadSize.length; i++) {
store[i] += Long.parseLong(firstReadSize[i]);
store[i + size] += Long.parseLong(offDiffFirstSecondRead[i]);
}
});
// Calculating averages and creating formatted strings
StringJoiner firstReadSize = new StringJoiner("_");
StringJoiner offDiffFirstSecondRead = new StringJoiner("_");
for (int j = 0; j < size; j++) {
firstReadSize.add(String.format("%.3f", store[j] / isNonParquetList.size()));
offDiffFirstSecondRead.add(String.format("%.3f", store[j + size] / isNonParquetList.size()));
}
avgNonParquetReadFooterMetrics.setSizeReadByFirstRead(firstReadSize.toString());
avgNonParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(offDiffFirstSecondRead.toString());
avgNonParquetReadFooterMetrics.setAvgFileLength(isNonParquetList.stream()
.mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
avgNonParquetReadFooterMetrics.setAvgReadLenRequested(isNonParquetList.stream()
.mapToDouble(AbfsReadFooterMetrics::getAvgReadLenRequested).average().orElse(0.0));
}
/*
Acronyms:
1.FR :- First Read (In case of parquet we only maintain the size requested by application for
the first read, in case of non parquet we maintain a string separated by "_" delimiter where the first
substring represents the len requested for first read and the second substring represents the seek pointer difference from the
end of the file.)
2.SR :- Second Read (In case of parquet we only maintain the size requested by application for
the second read, in case of non parquet we maintain a string separated by "_" delimiter where the first
substring represents the len requested for second read and the second substring represents the seek pointer difference from the
offset of the first read.)
3.FL :- Total length of the file requested for read
*/
public String getReadFooterMetrics(AbfsReadFooterMetrics avgReadFooterMetrics) {
String readFooterMetric = "";
if (avgReadFooterMetrics.getIsParquetFile()) {
readFooterMetric += "$Parquet:";
} else {
readFooterMetric += "$NonParquet:";
}
readFooterMetric += "$FR=" + avgReadFooterMetrics.getSizeReadByFirstRead()
+ "$SR="
+ avgReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead()
+ "$FL=" + String.format("%.3f",
avgReadFooterMetrics.getAvgFileLength())
+ "$RL=" + String.format("%.3f",
avgReadFooterMetrics.getAvgReadLenRequested());
return readFooterMetric;
}
/**
* Retrieves and aggregates read footer metrics for both Parquet and non-Parquet files from a list
* of AbfsReadFooterMetrics instances. The function calculates the average metrics separately for
* Parquet and non-Parquet files and returns a formatted string containing the aggregated metrics.
*
* @param readFooterMetricsList A list of AbfsReadFooterMetrics instances containing read footer metrics
* for both Parquet and non-Parquet files.
*
* @return A formatted string containing the aggregated read footer metrics for both Parquet and non-Parquet files.
*
**/
private String getFooterMetrics(List<AbfsReadFooterMetrics> readFooterMetricsList) {
List<AbfsReadFooterMetrics> isParquetList = new ArrayList<>();
List<AbfsReadFooterMetrics> isNonParquetList = new ArrayList<>();
for (AbfsReadFooterMetrics abfsReadFooterMetrics : readFooterMetricsList) {
if (abfsReadFooterMetrics.getIsParquetFile()) {
isParquetList.add(abfsReadFooterMetrics);
} else {
if (abfsReadFooterMetrics.getReadCount() >= 2) {
isNonParquetList.add(abfsReadFooterMetrics);
}
}
}
AbfsReadFooterMetrics avgParquetReadFooterMetrics = new AbfsReadFooterMetrics();
AbfsReadFooterMetrics avgNonparquetReadFooterMetrics = new AbfsReadFooterMetrics();
String readFooterMetric = "";
if (!isParquetList.isEmpty()) {
avgParquetReadFooterMetrics.setIsParquetFile(true);
getParquetReadFooterMetricsAverage(isParquetList, avgParquetReadFooterMetrics);
readFooterMetric += getReadFooterMetrics(avgParquetReadFooterMetrics);
}
if (!isNonParquetList.isEmpty()) {
avgNonparquetReadFooterMetrics.setIsParquetFile(false);
getNonParquetReadFooterMetricsAverage(isNonParquetList, avgNonparquetReadFooterMetrics);
readFooterMetric += getReadFooterMetrics(avgNonparquetReadFooterMetrics);
}
return readFooterMetric;
}
@Override
public String toString() {
Map<String, AbfsReadFooterMetrics> metricsMap = getMetricsMap();
List<AbfsReadFooterMetrics> readFooterMetricsList = new ArrayList<>();
if (metricsMap != null && !(metricsMap.isEmpty())) {
checkIsParquet(metricsMap);
updateLenRequested(metricsMap);
for (Map.Entry<String, AbfsReadFooterMetrics> entry : metricsMap.entrySet()) {
AbfsReadFooterMetrics abfsReadFooterMetrics = entry.getValue();
if (abfsReadFooterMetrics.getCollectMetrics()) {
readFooterMetricsList.add(entry.getValue());
}
}
}
String readFooterMetrics = "";
if (!readFooterMetricsList.isEmpty()) {
readFooterMetrics = getFooterMetrics(readFooterMetricsList);
}
return readFooterMetrics;
}
}

View File

@ -37,6 +37,12 @@
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import java.util.Map;
import org.apache.hadoop.fs.azurebfs.AbfsBackoffMetrics;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
@ -68,17 +74,20 @@ public class AbfsRestOperation {
private final String sasToken;
private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
private static final Logger LOG1 = LoggerFactory.getLogger(AbfsRestOperation.class);
// For uploads, this is the request entity body. For downloads,
// this will hold the response entity body.
private byte[] buffer;
private int bufferOffset;
private int bufferLength;
private int retryCount = 0;
private boolean isThrottledRequest = false;
private long maxRetryCount = 0L;
private final int maxIoRetries;
private AbfsHttpOperation result;
private AbfsCounters abfsCounters;
private final AbfsCounters abfsCounters;
private AbfsBackoffMetrics abfsBackoffMetrics;
private Map<String, AbfsBackoffMetrics> metricsMap;
/**
* This variable contains the reason of last API call within the same
* AbfsRestOperation object.
@ -124,6 +133,11 @@ String getSasToken() {
return sasToken;
}
private static final int MIN_FIRST_RANGE = 1;
private static final int MAX_FIRST_RANGE = 5;
private static final int MAX_SECOND_RANGE = 15;
private static final int MAX_THIRD_RANGE = 25;
/**
* Initializes a new REST operation.
*
@ -165,6 +179,13 @@ String getSasToken() {
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
this.sasToken = sasToken;
this.abfsCounters = client.getAbfsCounters();
if (abfsCounters != null) {
this.abfsBackoffMetrics = abfsCounters.getAbfsBackoffMetrics();
}
if (abfsBackoffMetrics != null) {
this.metricsMap = abfsBackoffMetrics.getMetricsMap();
}
this.maxIoRetries = client.getAbfsConfiguration().getMaxIoRetries();
this.intercept = client.getIntercept();
this.retryPolicy = client.getExponentialRetryPolicy();
}
@ -196,7 +217,6 @@ String getSasToken() {
this.buffer = buffer;
this.bufferOffset = bufferOffset;
this.bufferLength = bufferLength;
this.abfsCounters = client.getAbfsCounters();
}
/**
@ -206,11 +226,12 @@ String getSasToken() {
*/
public void execute(TracingContext tracingContext)
throws AzureBlobFileSystemException {
// Since this might be a sub-sequential or parallel rest operation
// triggered by a single file system call, using a new tracing context.
lastUsedTracingContext = createNewTracingContext(tracingContext);
try {
abfsCounters.getLastExecutionTime().set(now());
client.timerOrchestrator(TimerFunctionality.RESUME, null);
IOStatisticsBinding.trackDurationOfInvocation(abfsCounters,
AbfsStatistic.getStatNameFromHttpCall(method),
() -> completeExecute(lastUsedTracingContext));
@ -241,6 +262,12 @@ void completeExecute(TracingContext tracingContext)
retryCount = 0;
retryPolicy = client.getExponentialRetryPolicy();
LOG.debug("First execution of REST operation - {}", operationType);
long sleepDuration = 0L;
if (abfsBackoffMetrics != null) {
synchronized (this) {
abfsBackoffMetrics.incrementTotalNumberOfRequests();
}
}
while (!executeHttpOperation(retryCount, tracingContext)) {
try {
++retryCount;
@ -248,12 +275,17 @@ void completeExecute(TracingContext tracingContext)
long retryInterval = retryPolicy.getRetryInterval(retryCount);
LOG.debug("Rest operation {} failed with failureReason: {}. Retrying with retryCount = {}, retryPolicy: {} and sleepInterval: {}",
operationType, failureReason, retryCount, retryPolicy.getAbbreviation(), retryInterval);
if (abfsBackoffMetrics != null) {
updateBackoffTimeMetrics(retryCount, sleepDuration);
}
Thread.sleep(retryInterval);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
if (abfsBackoffMetrics != null) {
updateBackoffMetrics(retryCount, result.getStatusCode());
}
int status = result.getStatusCode();
/*
If even after exhausting all retries, the http status code has an
@ -272,6 +304,30 @@ void completeExecute(TracingContext tracingContext)
LOG.trace("{} REST operation complete", operationType);
}
@VisibleForTesting
void updateBackoffMetrics(int retryCount, int statusCode) {
if (abfsBackoffMetrics != null) {
if (statusCode < HttpURLConnection.HTTP_OK
|| statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR) {
synchronized (this) {
if (retryCount >= maxIoRetries) {
abfsBackoffMetrics.incrementNumberOfRequestsFailed();
}
}
} else {
synchronized (this) {
if (retryCount > ZERO && retryCount <= maxIoRetries) {
maxRetryCount = Math.max(abfsBackoffMetrics.getMaxRetryCount(), retryCount);
abfsBackoffMetrics.setMaxRetryCount(maxRetryCount);
updateCount(retryCount);
} else {
abfsBackoffMetrics.incrementNumberOfRequestsSucceededWithoutRetrying();
}
}
}
}
}
@VisibleForTesting
String getClientLatency() {
return client.getAbfsPerfTracker().getClientLatency();
@ -315,7 +371,35 @@ private boolean executeHttpOperation(final int retryCount,
}
httpOperation.processResponse(buffer, bufferOffset, bufferLength);
incrementCounter(AbfsStatistic.GET_RESPONSES, 1);
if (!isThrottledRequest && httpOperation.getStatusCode()
>= HttpURLConnection.HTTP_INTERNAL_ERROR) {
isThrottledRequest = true;
AzureServiceErrorCode serviceErrorCode =
AzureServiceErrorCode.getAzureServiceCode(
httpOperation.getStatusCode(),
httpOperation.getStorageErrorCode(),
httpOperation.getStorageErrorMessage());
LOG1.trace("Service code is " + serviceErrorCode + " status code is "
+ httpOperation.getStatusCode() + " error code is "
+ httpOperation.getStorageErrorCode()
+ " error message is " + httpOperation.getStorageErrorMessage());
if (abfsBackoffMetrics != null) {
synchronized (this) {
if (serviceErrorCode.equals(
AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT)
|| serviceErrorCode.equals(
AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT)) {
abfsBackoffMetrics.incrementNumberOfBandwidthThrottledRequests();
} else if (serviceErrorCode.equals(
AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT)) {
abfsBackoffMetrics.incrementNumberOfIOPSThrottledRequests();
} else {
abfsBackoffMetrics.incrementNumberOfOtherThrottledRequests();
}
}
}
}
incrementCounter(AbfsStatistic.GET_RESPONSES, 1);
//Only increment bytesReceived counter when the status code is 2XX.
if (httpOperation.getStatusCode() >= HttpURLConnection.HTTP_OK
&& httpOperation.getStatusCode() <= HttpURLConnection.HTTP_PARTIAL) {
@ -351,7 +435,13 @@ private boolean executeHttpOperation(final int retryCount,
retryPolicy = client.getRetryPolicy(failureReason);
LOG.warn("Unknown host name: {}. Retrying to resolve the host name...",
hostname);
if (abfsBackoffMetrics != null) {
synchronized (this) {
abfsBackoffMetrics.incrementNumberOfNetworkFailedRequests();
}
}
if (!retryPolicy.shouldRetry(retryCount, -1)) {
updateBackoffMetrics(retryCount, httpOperation.getStatusCode());
throw new InvalidAbfsRestOperationException(ex, retryCount);
}
return false;
@ -360,13 +450,17 @@ private boolean executeHttpOperation(final int retryCount,
if (LOG.isDebugEnabled()) {
LOG.debug("HttpRequestFailure: {}, {}", httpOperation, ex);
}
if (abfsBackoffMetrics != null) {
synchronized (this) {
abfsBackoffMetrics.incrementNumberOfNetworkFailedRequests();
}
}
failureReason = RetryReason.getAbbreviation(ex, -1, "");
retryPolicy = client.getRetryPolicy(failureReason);
if (!retryPolicy.shouldRetry(retryCount, -1)) {
updateBackoffMetrics(retryCount, httpOperation.getStatusCode());
throw new InvalidAbfsRestOperationException(ex, retryCount);
}
return false;
} finally {
int statusCode = httpOperation.getStatusCode();
@ -388,26 +482,30 @@ private boolean executeHttpOperation(final int retryCount,
*/
@VisibleForTesting
public void signRequest(final AbfsHttpOperation httpOperation, int bytesToSign) throws IOException {
switch(client.getAuthType()) {
case Custom:
case OAuth:
LOG.debug("Authenticating request with OAuth2 access token");
httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
client.getAccessToken());
break;
case SAS:
// do nothing; the SAS token should already be appended to the query string
httpOperation.setMaskForSAS(); //mask sig/oid from url for logs
break;
case SharedKey:
default:
// sign the HTTP request
LOG.debug("Signing request with shared key");
// sign the HTTP request
client.getSharedKeyCredentials().signRequest(
httpOperation.getConnection(),
bytesToSign);
break;
if (client.isSendMetricCall()) {
client.getMetricSharedkeyCredentials().signRequest(httpOperation.getConnection(), bytesToSign);
} else {
switch (client.getAuthType()) {
case Custom:
case OAuth:
LOG.debug("Authenticating request with OAuth2 access token");
httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
client.getAccessToken());
break;
case SAS:
// do nothing; the SAS token should already be appended to the query string
httpOperation.setMaskForSAS(); //mask sig/oid from url for logs
break;
case SharedKey:
default:
// sign the HTTP request
LOG.debug("Signing request with shared key");
// sign the HTTP request
client.getSharedKeyCredentials().signRequest(
httpOperation.getConnection(),
bytesToSign);
break;
}
}
}
@ -436,6 +534,60 @@ private void incrementCounter(AbfsStatistic statistic, long value) {
}
}
/**
* Updates the count metrics based on the provided retry count.
* @param retryCount The retry count used to determine the metrics category.
*
* This method increments the number of succeeded requests for the specified retry count.
*/
private void updateCount(int retryCount){
String retryCounter = getKey(retryCount);
metricsMap.get(retryCounter).incrementNumberOfRequestsSucceeded();
}
/**
* Updates backoff time metrics based on the provided retry count and sleep duration.
* @param retryCount The retry count used to determine the metrics category.
* @param sleepDuration The duration of sleep during backoff.
*
* This method calculates and updates various backoff time metrics, including minimum, maximum,
* and total backoff time, as well as the total number of requests for the specified retry count.
*/
private void updateBackoffTimeMetrics(int retryCount, long sleepDuration) {
synchronized (this) {
String retryCounter = getKey(retryCount);
AbfsBackoffMetrics abfsBackoffMetrics = metricsMap.get(retryCounter);
long minBackoffTime = Math.min(abfsBackoffMetrics.getMinBackoff(), sleepDuration);
long maxBackoffForTime = Math.max(abfsBackoffMetrics.getMaxBackoff(), sleepDuration);
long totalBackoffTime = abfsBackoffMetrics.getTotalBackoff() + sleepDuration;
abfsBackoffMetrics.incrementTotalRequests();
abfsBackoffMetrics.setMinBackoff(minBackoffTime);
abfsBackoffMetrics.setMaxBackoff(maxBackoffForTime);
abfsBackoffMetrics.setTotalBackoff(totalBackoffTime);
metricsMap.put(retryCounter, abfsBackoffMetrics);
}
}
/**
* Generates a key based on the provided retry count to categorize metrics.
*
* @param retryCount The retry count used to determine the key.
* @return A string key representing the metrics category for the given retry count.
*
* This method categorizes retry counts into different ranges and assigns a corresponding key.
*/
private String getKey(int retryCount) {
if (retryCount >= MIN_FIRST_RANGE && retryCount < MAX_FIRST_RANGE) {
return Integer.toString(retryCount);
} else if (retryCount >= MAX_FIRST_RANGE && retryCount < MAX_SECOND_RANGE) {
return "5_15";
} else if (retryCount >= MAX_SECOND_RANGE && retryCount < MAX_THIRD_RANGE) {
return "15_25";
} else {
return "25AndAbove";
}
}
/**
* Updating Client Side Throttling Metrics for relevant response status codes.
* Following criteria is used to decide based on status code and failure reason.

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.fs.azurebfs.services;
/**
* Class for Timer Functionality.
*/
public enum TimerFunctionality {
RESUME,
SUSPEND
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.utils;
public enum MetricFormat {
INTERNAL_BACKOFF_METRIC_FORMAT, // <client-correlation-id>:<client-req-id>:<filesystem-id>
// :<backoff-metric-results>
INTERNAL_FOOTER_METRIC_FORMAT, // <client-correlation-id>:<client-req-id>:<filesystem-id>
// :<footer-metric-results>
INTERNAL_METRIC_FORMAT, // <client-correlation-id>:<client-req-id>:<filesystem-id>
// :<backoff-metric-results>:<footer-metric-results>
EMPTY;
@Override
public String toString() {
return this == EMPTY ? "" : this.name();
}
}

View File

@ -63,6 +63,8 @@ public class TracingContext {
private Listener listener = null; // null except when testing
//final concatenated ID list set into x-ms-client-request-id header
private String header = EMPTY_STRING;
private String metricResults = EMPTY_STRING;
private String metricHeader = EMPTY_STRING;
/**
* If {@link #primaryRequestId} is null, this field shall be set equal
@ -112,6 +114,15 @@ public TracingContext(String clientCorrelationID, String fileSystemID,
}
}
public TracingContext(String clientCorrelationID, String fileSystemID,
FSOperationType opType, boolean needsPrimaryReqId,
TracingHeaderFormat tracingHeaderFormat, Listener listener, String metricResults) {
this(clientCorrelationID, fileSystemID, opType, needsPrimaryReqId, tracingHeaderFormat,
listener);
this.metricResults = metricResults;
}
public TracingContext(TracingContext originalTracingContext) {
this.fileSystemID = originalTracingContext.fileSystemID;
this.streamID = originalTracingContext.streamID;
@ -123,8 +134,8 @@ public TracingContext(TracingContext originalTracingContext) {
if (originalTracingContext.listener != null) {
this.listener = originalTracingContext.listener.getClone();
}
this.metricResults = originalTracingContext.metricResults;
}
public static String validateClientCorrelationID(String clientCorrelationID) {
if ((clientCorrelationID.length() > MAX_CLIENT_CORRELATION_ID_LENGTH)
|| (!clientCorrelationID.matches(CLIENT_CORRELATION_ID_PATTERN))) {
@ -181,17 +192,24 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail
+ getPrimaryRequestIdForHeader(retryCount > 0) + ":" + streamID
+ ":" + opType + ":" + retryCount;
header = addFailureReasons(header, previousFailure, retryPolicyAbbreviation);
metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : "";
break;
case TWO_ID_FORMAT:
header = clientCorrelationID + ":" + clientRequestId;
metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : "";
break;
default:
header = clientRequestId; //case SINGLE_ID_FORMAT
//case SINGLE_ID_FORMAT
header = clientRequestId;
metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : "";
}
if (listener != null) { //for testing
listener.callTracingHeaderValidator(header, format);
}
httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, header);
if (!metricHeader.equals(EMPTY_STRING)) {
httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_FECLIENT_METRICS, metricHeader);
}
/*
* In case the primaryRequestId is an empty-string and if it is the first try to
* API call (previousFailure shall be null), maintain the last part of clientRequestId's

View File

@ -1052,6 +1052,49 @@ Note that these performance numbers are also sent back to the ADLS Gen 2 API end
in the `x-ms-abfs-client-latency` HTTP headers in subsequent requests. Azure uses these
settings to track their end-to-end latency.
### <a name="drivermetricoptions"></a> Driver Metric Options
Config `fs.azure.metric.format` provides an option to select the format of IDs included in the `header` for metrics.
This config accepts a String value corresponding to the following enum options.
`INTERNAL_METRIC_FORMAT` : backoff + footer metrics
`INTERNAL_BACKOFF_METRIC_FORMAT` : backoff metrics
`INTERNAL_FOOTER_METRIC_FORMAT` : footer metrics
`EMPTY` : default
`fs.azure.metric.account.name`: This configuration parameter is used to specify the name of the account which will be
used to push the metrics to the backend. We can configure a separate account to push metrics to the store or use the
same for as the existing account on which other requests are made.
```xml
<property>
<name>fs.azure.metric.account.name</name>
<value>METRICACCOUNTNAME.dfs.core.windows.net</value>
</property>
```
`fs.azure.metric.account.key`: This is the access key for the storage account used for pushing metrics to the store.
```xml
<property>
<name>fs.azure.metric.account.key</name>
<value>ACCOUNTKEY</value>
</property>
```
`fs.azure.metric.uri`: This configuration provides the uri in the format of 'https://`<accountname>`
.dfs.core.windows.net/`<containername>`'. This should be a part of the config in order to prevent extra calls to create
the filesystem. We use an existing filsystem to push the metrics.
```xml
<property>
<name>fs.azure.metric.uri</name>
<value>https://METRICACCOUNTNAME.dfs.core.windows.net/CONTAINERNAME</value>
</property>
```
## <a name="troubleshooting"></a> Troubleshooting
The problems associated with the connector usually come down to, in order

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;

View File

@ -0,0 +1,385 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.junit.Test;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
public class ITestAbfsReadFooterMetrics extends AbstractAbfsScaleTest {
public ITestAbfsReadFooterMetrics() throws Exception {
}
private static final String TEST_PATH = "/testfile";
private static final String SLEEP_PERIOD = "90000";
/**
* Integration test for reading footer metrics with both Parquet and non-Parquet reads.
*/
@Test
public void testReadFooterMetricsWithParquetAndNonParquet() throws Exception {
testReadWriteAndSeek(8 * ONE_MB, DEFAULT_READ_BUFFER_SIZE, ONE_KB, 4 * ONE_KB);
}
/**
* Configures the AzureBlobFileSystem with the given buffer size.
*
* @param bufferSize Buffer size to set for write and read operations.
* @return AbfsConfiguration used for configuration.
*/
private Configuration getConfiguration(int bufferSize) {
final Configuration configuration = getRawConfiguration();
configuration.set(FS_AZURE_METRIC_FORMAT, String.valueOf(MetricFormat.INTERNAL_FOOTER_METRIC_FORMAT));
configuration.setInt(AZURE_READ_BUFFER_SIZE, bufferSize);
configuration.setInt(AZURE_WRITE_BUFFER_SIZE, bufferSize);
return configuration;
}
/**
* Writes data to the specified file path in the AzureBlobFileSystem.
*
* @param fs AzureBlobFileSystem instance.
* @param testPath Path to the file.
* @param data Data to write to the file.
*/
private void writeDataToFile(AzureBlobFileSystem fs, Path testPath, byte[] data) throws IOException {
FSDataOutputStream stream = fs.create(testPath);
try {
stream.write(data);
} finally {
stream.close();
}
IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
}
/**
* Asserts that the actual metrics obtained from the AzureBlobFileSystem match the expected metrics string.
*
* @param fs AzureBlobFileSystem instance.
* @param expectedMetrics Expected metrics string.
*/
private void assertMetricsEquality(AzureBlobFileSystem fs, String expectedMetrics) {
AbfsReadFooterMetrics actualMetrics = fs.getAbfsClient().getAbfsCounters().getAbfsReadFooterMetrics();
assertNotNull("AbfsReadFooterMetrics is null", actualMetrics);
assertEquals("The computed metrics differs from the actual metrics", expectedMetrics, actualMetrics.toString());
}
/**
* Test for reading footer metrics with a non-Parquet file.
*/
@Test
public void testReadFooterMetrics() throws Exception {
// Initialize AzureBlobFileSystem and set buffer size for configuration.
int bufferSize = MIN_BUFFER_SIZE;
Configuration configuration = getConfiguration(bufferSize);
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
// Generate random data to write to the test file.
final byte[] b = new byte[2 * bufferSize];
new Random().nextBytes(b);
// Set up the test file path.
Path testPath = path(TEST_PATH);
// Write random data to the test file.
writeDataToFile(fs, testPath, b);
// Initialize a buffer for reading data.
final byte[] readBuffer = new byte[2 * bufferSize];
int result;
// Initialize statistics source for logging.
IOStatisticsSource statisticsSource = null;
try (FSDataInputStream inputStream = fs.open(testPath)) {
// Register a listener for tracing header validation.
statisticsSource = inputStream;
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.READ, true, 0,
((AbfsInputStream) inputStream.getWrappedStream())
.getStreamID()));
// Perform the first read operation with seek.
inputStream.seek(bufferSize);
result = inputStream.read(readBuffer, bufferSize, bufferSize);
assertNotEquals(-1, result);
// To test tracingHeader for case with bypassReadAhead == true
inputStream.seek(0);
byte[] temp = new byte[5];
int t = inputStream.read(temp, 0, 1);
// Seek back to the beginning and perform another read operation.
inputStream.seek(0);
result = inputStream.read(readBuffer, 0, bufferSize);
}
// Log IO statistics at the INFO level.
IOStatisticsLogging.logIOStatisticsAtLevel(LOG,
IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
// Ensure data is read successfully and matches the written data.
assertNotEquals("data read in final read()", -1, result);
assertArrayEquals(readBuffer, b);
// Get non-Parquet metrics and assert metrics equality.
AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
String metrics = nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics);
assertMetricsEquality(fs, metrics);
// Close the AzureBlobFileSystem.
fs.close();
}
/**
* Generates and returns an instance of AbfsReadFooterMetrics for non-Parquet files.
*/
private AbfsReadFooterMetrics getNonParquetMetrics() {
AbfsReadFooterMetrics nonParquetMetrics = new AbfsReadFooterMetrics();
nonParquetMetrics.setIsParquetFile(false);
nonParquetMetrics.setSizeReadByFirstRead("16384.000_16384.000");
nonParquetMetrics.setOffsetDiffBetweenFirstAndSecondRead("1.000_16384.000");
nonParquetMetrics.setAvgFileLength(Double.parseDouble("32768.000"));
nonParquetMetrics.setAvgReadLenRequested(Double.parseDouble("16384.000"));
return nonParquetMetrics;
}
/**
* Generates and returns an instance of AbfsReadFooterMetrics for parquet files.
*/
private AbfsReadFooterMetrics getParquetMetrics() {
AbfsReadFooterMetrics parquetMetrics = new AbfsReadFooterMetrics();
parquetMetrics.setIsParquetFile(true);
parquetMetrics.setSizeReadByFirstRead("1024.000");
parquetMetrics.setOffsetDiffBetweenFirstAndSecondRead("4096.000");
parquetMetrics.setAvgFileLength(Double.parseDouble("8388608.000"));
parquetMetrics.setAvgReadLenRequested(0.000);
return parquetMetrics;
}
/**
* Test for reading, writing, and seeking with footer metrics.
*
* This method performs the integration test for reading, writing, and seeking operations
* with footer metrics. It creates an AzureBlobFileSystem, configures it, writes random data
* to a test file, performs read and seek operations, and checks the footer metrics for both
* Parquet and non-Parquet scenarios.
*
* @param fileSize Size of the test file.
* @param bufferSize Size of the buffer used for read and write operations.
* @param seek1 The position to seek to in the test file.
* @param seek2 Additional position to seek to in the test file (if not 0).
*/
private void testReadWriteAndSeek(int fileSize, int bufferSize, Integer seek1, Integer seek2) throws Exception {
// Create an AzureBlobFileSystem instance.
Configuration configuration = getConfiguration(bufferSize);
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
// Generate random data to write to the test file.
final byte[] b = new byte[fileSize];
new Random().nextBytes(b);
// Define the path for the test file.
Path testPath = path("/testfile");
// Write the random data to the test file.
writeDataToFile(fs, testPath, b);
// Initialize a buffer for reading.
final byte[] readBuffer = new byte[fileSize];
// Initialize a source for IO statistics.
IOStatisticsSource statisticsSource = null;
// Open an input stream for the test file.
FSDataInputStream inputStream = fs.open(testPath);
statisticsSource = inputStream;
// Register a listener for tracing headers.
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.READ, true, 0,
((AbfsInputStream) inputStream.getWrappedStream())
.getStreamID()));
// Seek to the specified position in the test file and read data.
inputStream.seek(fileSize - seek1);
inputStream.read(readBuffer, 0, seek1);
// If seek2 is non-zero, perform an additional seek and read.
if (seek2 != 0) {
inputStream.seek(fileSize - seek1 - seek2);
inputStream.read(readBuffer, 0, seek2);
}
// Close the input stream.
inputStream.close();
// Set a new buffer size for read and write operations.
int bufferSize1 = MIN_BUFFER_SIZE;
abfsConfiguration.setWriteBufferSize(bufferSize1);
abfsConfiguration.setReadBufferSize(bufferSize1);
// Generate new random data for a second test file.
final byte[] b1 = new byte[2 * bufferSize1];
new Random().nextBytes(b1);
// Define the path for the second test file.
Path testPath1 = path("/testfile1");
// Write the new random data to the second test file.
writeDataToFile(fs, testPath1, b1);
// Initialize a buffer for reading from the second test file.
final byte[] readBuffer1 = new byte[2 * bufferSize1];
// Open an input stream for the second test file.
FSDataInputStream inputStream1 = fs.open(testPath1);
statisticsSource = inputStream1;
// Register a listener for tracing headers.
((AbfsInputStream) inputStream1.getWrappedStream()).registerListener(
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.READ, true, 0,
((AbfsInputStream) inputStream1.getWrappedStream())
.getStreamID()));
// Seek to a position in the second test file and read data.
inputStream1.seek(bufferSize1);
inputStream1.read(readBuffer1, bufferSize1, bufferSize1);
// To test tracingHeader for case with bypassReadAhead == true.
inputStream1.seek(0);
byte[] temp = new byte[5];
int t = inputStream1.read(temp, 0, 1);
// Seek to the beginning of the second test file and read data.
inputStream1.seek(0);
inputStream1.read(readBuffer1, 0, bufferSize1);
// Close the input stream for the second test file.
inputStream1.close();
// Get footer metrics for both Parquet and non-Parquet scenarios.
AbfsReadFooterMetrics parquetMetrics = getParquetMetrics();
AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
// Concatenate and assert the metrics equality.
String metrics = parquetMetrics.getReadFooterMetrics(parquetMetrics);
metrics += nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics);
assertMetricsEquality(fs, metrics);
// Close the AzureBlobFileSystem instance.
fs.close();
}
/**
* Test for reading footer metrics with an idle period.
*
* This method tests reading footer metrics with an idle period. It creates an AzureBlobFileSystem,
* configures it, writes random data to a test file, performs read operations, introduces an idle
* period, and checks the footer metrics for non-Parquet scenarios.
*
*/
@Test
public void testMetricWithIdlePeriod() throws Exception {
// Set the buffer size for the test.
int bufferSize = MIN_BUFFER_SIZE;
Configuration configuration = getConfiguration(bufferSize);
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
// Generate random data to write to the test file.
final byte[] b = new byte[2 * bufferSize];
new Random().nextBytes(b);
// Define the path for the test file.
Path testPath = path(TEST_PATH);
// Write the random data to the test file.
writeDataToFile(fs, testPath, b);
// Initialize a buffer for reading.
final byte[] readBuffer = new byte[2 * bufferSize];
// Initialize a source for IO statistics.
IOStatisticsSource statisticsSource = null;
// Open an input stream for the test file.
try (FSDataInputStream inputStream = fs.open(testPath)) {
// Register a listener for tracing headers.
((AbfsInputStream) inputStream.getWrappedStream()).registerListener(
new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.READ, true, 0,
((AbfsInputStream) inputStream.getWrappedStream())
.getStreamID()));
// Seek to the specified position in the test file and read data.
inputStream.seek(bufferSize);
inputStream.read(readBuffer, bufferSize, bufferSize);
// Introduce an idle period by sleeping.
int sleepPeriod = Integer.parseInt(SLEEP_PERIOD);
Thread.sleep(sleepPeriod);
// To test tracingHeader for case with bypassReadAhead == true.
inputStream.seek(0);
byte[] temp = new byte[5];
int t = inputStream.read(temp, 0, 1);
// Seek to the beginning of the test file and read data.
inputStream.seek(0);
inputStream.read(readBuffer, 0, bufferSize);
// Get and assert the footer metrics for non-Parquet scenarios.
AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics();
String metrics = nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics);
assertMetricsEquality(fs, metrics);
// Introduce an additional idle period by sleeping.
Thread.sleep(sleepPeriod);
}
}
}

View File

@ -80,8 +80,8 @@ public ITestAzureBlobFileSystemListStatus() throws Exception {
public void testListPath() throws Exception {
Configuration config = new Configuration(this.getRawConfiguration());
config.set(AZURE_LIST_MAX_RESULTS, "5000");
try (final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
.newInstance(getFileSystem().getUri(), config)) {
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
.newInstance(getFileSystem().getUri(), config);
final List<Future<Void>> tasks = new ArrayList<>();
ExecutorService es = Executors.newFixedThreadPool(10);
@ -108,7 +108,10 @@ public Void call() throws Exception {
fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 0));
FileStatus[] files = fs.listStatus(new Path("/"));
assertEquals(TEST_FILES_NUMBER, files.length /* user directory */);
}
fs.registerListener(
new TracingHeaderValidator(getConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.GET_ATTR, true, 0));
fs.close();
}
/**

View File

@ -20,6 +20,8 @@
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
@ -27,6 +29,7 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
import org.assertj.core.api.Assertions;
import org.mockito.AdditionalMatchers;
import org.mockito.Mockito;
@ -112,9 +115,13 @@ public static void addGeneralMockBehaviourToRestOpAndHttpOp(final AbfsRestOperat
public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient abfsClient,
final ExponentialRetryPolicy exponentialRetryPolicy,
final StaticRetryPolicy staticRetryPolicy,
final AbfsThrottlingIntercept intercept) throws IOException {
final AbfsThrottlingIntercept intercept) throws IOException, URISyntaxException {
Mockito.doReturn(OAuth).when(abfsClient).getAuthType();
Mockito.doReturn("").when(abfsClient).getAccessToken();
AbfsConfiguration abfsConfiguration = Mockito.mock(AbfsConfiguration.class);
Mockito.doReturn(abfsConfiguration).when(abfsClient).getAbfsConfiguration();
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
Mockito.doReturn(abfsCounters).when(abfsClient).getAbfsCounters();
Mockito.doReturn(intercept).when(abfsClient).getIntercept();
Mockito.doNothing()

View File

@ -22,11 +22,14 @@
import java.lang.reflect.Field;
import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.List;
import java.util.Random;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;
@ -133,8 +136,9 @@ public ITestAbfsClient() throws Exception {
}
private String getUserAgentString(AbfsConfiguration config,
boolean includeSSLProvider) throws IOException {
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().build();
boolean includeSSLProvider) throws IOException, URISyntaxException {
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();
AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
config, (AccessTokenProvider) null, null, abfsClientContext);
String sslProviderName = null;
@ -175,7 +179,7 @@ private void verifyBasicInfo(String userAgentStr) {
@Test
public void verifyUserAgentPrefix()
throws IOException, IllegalAccessException {
throws IOException, IllegalAccessException, URISyntaxException {
final Configuration configuration = new Configuration();
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, FS_AZURE_USER_AGENT_PREFIX);
@ -209,7 +213,7 @@ public void verifyUserAgentPrefix()
*/
@Test
public void verifyUserAgentExpectHeader()
throws IOException, IllegalAccessException {
throws IOException, IllegalAccessException, URISyntaxException {
final Configuration configuration = new Configuration();
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, FS_AZURE_USER_AGENT_PREFIX);
@ -315,18 +319,20 @@ public void verifyUserAgentClusterType() throws Exception {
public static AbfsClient createTestClientFromCurrentContext(
AbfsClient baseAbfsClientInstance,
AbfsConfiguration abfsConfig) throws IOException {
AbfsConfiguration abfsConfig) throws IOException, URISyntaxException {
AuthType currentAuthType = abfsConfig.getAuthType(
abfsConfig.getAccountName());
AbfsPerfTracker tracker = new AbfsPerfTracker("test",
abfsConfig.getAccountName(),
abfsConfig);
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientContext abfsClientContext =
new AbfsClientContextBuilder().withAbfsPerfTracker(tracker)
.withExponentialRetryPolicy(
new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries()))
.withAbfsCounters(abfsCounters)
.build();
// Create test AbfsClient
@ -352,6 +358,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
AbfsConfiguration abfsConfig) throws Exception {
AuthType currentAuthType = abfsConfig.getAuthType(
abfsConfig.getAccountName());
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
org.junit.Assume.assumeTrue(
(currentAuthType == AuthType.SharedKey)
@ -372,14 +379,18 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
when(client.createDefaultUriQueryBuilder()).thenCallRealMethod();
when(client.createRequestUrl(any(), any())).thenCallRealMethod();
when(client.createRequestUrl(any(), any(), any())).thenCallRealMethod();
when(client.getAccessToken()).thenCallRealMethod();
when(client.getSharedKeyCredentials()).thenCallRealMethod();
when(client.createDefaultHeaders()).thenCallRealMethod();
when(client.getAbfsConfiguration()).thenReturn(abfsConfig);
when(client.getIntercept()).thenReturn(
AbfsThrottlingInterceptFactory.getInstance(
abfsConfig.getAccountName().substring(0,
abfsConfig.getAccountName().indexOf(DOT)), abfsConfig));
when(client.getAbfsCounters()).thenReturn(abfsCounters);
// override baseurl
client = ITestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
abfsConfig);

View File

@ -19,11 +19,14 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
@ -98,9 +101,11 @@ private AbfsRestOperation getMockRestOp() {
return op;
}
private AbfsClient getMockAbfsClient() {
private AbfsClient getMockAbfsClient() throws URISyntaxException {
// Mock failure for client.read()
AbfsClient client = mock(AbfsClient.class);
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
Mockito.doReturn(abfsCounters).when(client).getAbfsCounters();
AbfsPerfTracker tracker = new AbfsPerfTracker(
"test",
this.getAccountName(),

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.junit.Test;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.DeletePath;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import java.util.ArrayList;
import java.util.Arrays;
import org.junit.Assert;
import java.net.HttpURLConnection;
public class TestAbfsRestOperation extends
AbstractAbfsIntegrationTest {
public TestAbfsRestOperation() throws Exception {
}
/**
* Test for backoff retry metrics.
*
* This method tests backoff retry metrics by creating an AzureBlobFileSystem, initializing an
* AbfsClient, and performing mock operations on an AbfsRestOperation. The method then updates
* backoff metrics using the AbfsRestOperation.
*
*/
@Test
public void testBackoffRetryMetrics() throws Exception {
// Create an AzureBlobFileSystem instance.
final Configuration configuration = getRawConfiguration();
configuration.set(FS_AZURE_METRIC_FORMAT, String.valueOf(MetricFormat.INTERNAL_BACKOFF_METRIC_FORMAT));
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
// Get an instance of AbfsClient and AbfsRestOperation.
AbfsClient testClient = super.getAbfsClient(super.getAbfsStore(fs));
AbfsRestOperation op = ITestAbfsClient.getRestOp(
DeletePath, testClient, HTTP_METHOD_DELETE,
ITestAbfsClient.getTestUrl(testClient, "/NonExistingPath"), ITestAbfsClient.getTestRequestHeaders(testClient));
// Mock retry counts and status code.
ArrayList<String> retryCounts = new ArrayList<>(Arrays.asList("35", "28", "31", "45", "10", "2", "9"));
int statusCode = HttpURLConnection.HTTP_UNAVAILABLE;
// Update backoff metrics.
for (String retryCount : retryCounts) {
op.updateBackoffMetrics(Integer.parseInt(retryCount), statusCode);
}
// For retry count greater than the max configured value, the request should fail.
Assert.assertEquals("Number of failed requests does not match expected value.",
"3", String.valueOf(testClient.getAbfsCounters().getAbfsBackoffMetrics().getNumberOfRequestsFailed()));
// Close the AzureBlobFileSystem.
fs.close();
}
}

View File

@ -23,7 +23,6 @@
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;
@ -308,7 +307,7 @@ private void testClientRequestIdForStatusRetry(int status,
int[] statusCount = new int[1];
statusCount[0] = 0;
Mockito.doAnswer(answer -> {
if (statusCount[0] <= 5) {
if (statusCount[0] <= 10) {
statusCount[0]++;
return status;
}