From d168d3ffeee15ea71786263d7eaa60dc92c4d3a0 Mon Sep 17 00:00:00 2001 From: Anmol Asrani Date: Thu, 23 May 2024 19:40:10 +0530 Subject: [PATCH] HADOOP-18325: ABFS: Add correlated metric support for ABFS operations (#6314) Adds support for metric collection at the filesystem instance level. Metrics are pushed to the store upon the closure of a filesystem instance, encompassing all operations that utilized that specific instance. Collected Metrics: - Number of successful requests without any retries. - Count of requests that succeeded after a specified number of retries (x retries). - Request count subjected to throttling. - Number of requests that failed despite exhausting all retry attempts. etc. Implementation Details: Incorporated logic in the AbfsClient to facilitate metric pushing through an additional request. This occurs in scenarios where no requests are sent to the backend for a defined idle period. By implementing these enhancements, we ensure comprehensive monitoring and analysis of filesystem interactions, enabling a deeper understanding of success rates, retry scenarios, throttling instances, and exhaustive failure scenarios. Additionally, the AbfsClient logic ensures that metrics are proactively pushed even during idle periods, maintaining a continuous and accurate representation of filesystem performance. Contributed by Anmol Asrani --- .../fs/azurebfs/AbfsBackoffMetrics.java | 312 ++++++++++ .../hadoop/fs/azurebfs/AbfsConfiguration.java | 45 ++ .../hadoop/fs/azurebfs/AbfsCountersImpl.java | 102 +++- .../fs/azurebfs/AzureBlobFileSystem.java | 15 +- .../azurebfs/constants/ConfigurationKeys.java | 7 + .../constants/FileSystemConfigurations.java | 6 + .../constants/HttpHeaderConfigurations.java | 1 + .../services/AzureServiceErrorCode.java | 18 +- .../fs/azurebfs/services/AbfsClient.java | 195 ++++++- .../azurebfs/services/AbfsClientContext.java | 2 +- .../fs/azurebfs/services/AbfsCounters.java | 11 + .../fs/azurebfs/services/AbfsInputStream.java | 18 +- .../services/AbfsReadFooterMetrics.java | 549 ++++++++++++++++++ .../azurebfs/services/AbfsRestOperation.java | 212 ++++++- .../azurebfs/services/TimerFunctionality.java | 4 +- .../fs/azurebfs/utils/MetricFormat.java | 36 ++ .../fs/azurebfs/utils/TracingContext.java | 22 +- .../hadoop-azure/src/site/markdown/abfs.md | 43 ++ .../ITestAbfsInputStreamStatistics.java | 1 - .../azurebfs/ITestAbfsReadFooterMetrics.java | 385 ++++++++++++ .../ITestAzureBlobFileSystemListStatus.java | 9 +- .../azurebfs/services/AbfsClientTestUtil.java | 9 +- .../fs/azurebfs/services/ITestAbfsClient.java | 21 +- .../services/TestAbfsInputStream.java | 7 +- .../services/TestAbfsRestOperation.java | 81 +++ .../TestAbfsRestOperationMockFailures.java | 3 +- 26 files changed, 2042 insertions(+), 72 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsBackoffMetrics.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/MetricFormat.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadFooterMetrics.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsBackoffMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsBackoffMetrics.java new file mode 100644 index 0000000000..37dbdfffee --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsBackoffMetrics.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THOUSAND; + +public class AbfsBackoffMetrics { + + private AtomicLong numberOfRequestsSucceeded; + + private AtomicLong minBackoff; + + private AtomicLong maxBackoff; + + private AtomicLong totalRequests; + + private AtomicLong totalBackoff; + + private String retryCount; + + private AtomicLong numberOfIOPSThrottledRequests; + + private AtomicLong numberOfBandwidthThrottledRequests; + + private AtomicLong numberOfOtherThrottledRequests; + + private AtomicLong numberOfNetworkFailedRequests; + + private AtomicLong maxRetryCount; + + private AtomicLong totalNumberOfRequests; + + private AtomicLong numberOfRequestsSucceededWithoutRetrying; + + private AtomicLong numberOfRequestsFailed; + + private final Map metricsMap + = new ConcurrentHashMap<>(); + + public AbfsBackoffMetrics() { + initializeMap(); + this.numberOfIOPSThrottledRequests = new AtomicLong(); + this.numberOfBandwidthThrottledRequests = new AtomicLong(); + this.numberOfOtherThrottledRequests = new AtomicLong(); + this.totalNumberOfRequests = new AtomicLong(); + this.maxRetryCount = new AtomicLong(); + this.numberOfRequestsSucceededWithoutRetrying = new AtomicLong(); + this.numberOfRequestsFailed = new AtomicLong(); + this.numberOfNetworkFailedRequests = new AtomicLong(); + } + + public AbfsBackoffMetrics(String retryCount) { + this.retryCount = retryCount; + this.numberOfRequestsSucceeded = new AtomicLong(); + this.minBackoff = new AtomicLong(Long.MAX_VALUE); + this.maxBackoff = new AtomicLong(); + this.totalRequests = new AtomicLong(); + this.totalBackoff = new AtomicLong(); + } + + private void initializeMap() { + ArrayList retryCountList = new ArrayList( + Arrays.asList("1", "2", "3", "4", "5_15", "15_25", "25AndAbove")); + for (String s : retryCountList) { + metricsMap.put(s, new AbfsBackoffMetrics(s)); + } + } + + public long getNumberOfRequestsSucceeded() { + return this.numberOfRequestsSucceeded.get(); + } + + public void setNumberOfRequestsSucceeded(long numberOfRequestsSucceeded) { + this.numberOfRequestsSucceeded.set(numberOfRequestsSucceeded); + } + + public void incrementNumberOfRequestsSucceeded() { + this.numberOfRequestsSucceeded.getAndIncrement(); + } + + public long getMinBackoff() { + return this.minBackoff.get(); + } + + public void setMinBackoff(long minBackoff) { + this.minBackoff.set(minBackoff); + } + + public long getMaxBackoff() { + return this.maxBackoff.get(); + } + + public void setMaxBackoff(long maxBackoff) { + this.maxBackoff.set(maxBackoff); + } + + public long getTotalRequests() { + return this.totalRequests.get(); + } + + public void incrementTotalRequests() { + this.totalRequests.incrementAndGet(); + } + + public void setTotalRequests(long totalRequests) { + this.totalRequests.set(totalRequests); + } + + public long getTotalBackoff() { + return this.totalBackoff.get(); + } + + public void setTotalBackoff(long totalBackoff) { + this.totalBackoff.set(totalBackoff); + } + + public String getRetryCount() { + return this.retryCount; + } + + public long getNumberOfIOPSThrottledRequests() { + return this.numberOfIOPSThrottledRequests.get(); + } + + public void setNumberOfIOPSThrottledRequests(long numberOfIOPSThrottledRequests) { + this.numberOfIOPSThrottledRequests.set(numberOfIOPSThrottledRequests); + } + + public void incrementNumberOfIOPSThrottledRequests() { + this.numberOfIOPSThrottledRequests.getAndIncrement(); + } + + public long getNumberOfBandwidthThrottledRequests() { + return this.numberOfBandwidthThrottledRequests.get(); + } + + public void setNumberOfBandwidthThrottledRequests(long numberOfBandwidthThrottledRequests) { + this.numberOfBandwidthThrottledRequests.set(numberOfBandwidthThrottledRequests); + } + + public void incrementNumberOfBandwidthThrottledRequests() { + this.numberOfBandwidthThrottledRequests.getAndIncrement(); + } + + public long getNumberOfOtherThrottledRequests() { + return this.numberOfOtherThrottledRequests.get(); + } + + public void setNumberOfOtherThrottledRequests(long numberOfOtherThrottledRequests) { + this.numberOfOtherThrottledRequests.set(numberOfOtherThrottledRequests); + } + + public void incrementNumberOfOtherThrottledRequests() { + this.numberOfOtherThrottledRequests.getAndIncrement(); + } + + public long getMaxRetryCount() { + return this.maxRetryCount.get(); + } + + public void setMaxRetryCount(long maxRetryCount) { + this.maxRetryCount.set(maxRetryCount); + } + + public void incrementMaxRetryCount() { + this.maxRetryCount.getAndIncrement(); + } + + public long getTotalNumberOfRequests() { + return this.totalNumberOfRequests.get(); + } + + public void setTotalNumberOfRequests(long totalNumberOfRequests) { + this.totalNumberOfRequests.set(totalNumberOfRequests); + } + + public void incrementTotalNumberOfRequests() { + this.totalNumberOfRequests.getAndIncrement(); + } + + public Map getMetricsMap() { + return metricsMap; + } + + public long getNumberOfRequestsSucceededWithoutRetrying() { + return this.numberOfRequestsSucceededWithoutRetrying.get(); + } + + public void setNumberOfRequestsSucceededWithoutRetrying(long numberOfRequestsSucceededWithoutRetrying) { + this.numberOfRequestsSucceededWithoutRetrying.set(numberOfRequestsSucceededWithoutRetrying); + } + + public void incrementNumberOfRequestsSucceededWithoutRetrying() { + this.numberOfRequestsSucceededWithoutRetrying.getAndIncrement(); + } + + public long getNumberOfRequestsFailed() { + return this.numberOfRequestsFailed.get(); + } + + public void setNumberOfRequestsFailed(long numberOfRequestsFailed) { + this.numberOfRequestsFailed.set(numberOfRequestsFailed); + } + + public void incrementNumberOfRequestsFailed() { + this.numberOfRequestsFailed.getAndIncrement(); + } + + public long getNumberOfNetworkFailedRequests() { + return this.numberOfNetworkFailedRequests.get(); + } + + public void setNumberOfNetworkFailedRequests(long numberOfNetworkFailedRequests) { + this.numberOfNetworkFailedRequests.set(numberOfNetworkFailedRequests); + } + + public void incrementNumberOfNetworkFailedRequests() { + this.numberOfNetworkFailedRequests.getAndIncrement(); + } + + /* + Acronyms :- + 1.RCTSI :- Request count that succeeded in x retries + 2.MMA :- Min Max Average (This refers to the backoff or sleep time between 2 requests) + 3.s :- seconds + 4.BWT :- Number of Bandwidth throttled requests + 5.IT :- Number of IOPS throttled requests + 6.OT :- Number of Other throttled requests + 7.NFR :- Number of requests which failed due to network errors + 8.%RT :- Percentage of requests that are throttled + 9.TRNR :- Total number of requests which succeeded without retrying + 10.TRF :- Total number of requests which failed + 11.TR :- Total number of requests which were made + 12.MRC :- Max retry count across all requests + */ + @Override + public String toString() { + StringBuilder metricString = new StringBuilder(); + long totalRequestsThrottled = getNumberOfBandwidthThrottledRequests() + + getNumberOfIOPSThrottledRequests() + + getNumberOfOtherThrottledRequests(); + double percentageOfRequestsThrottled = + ((double) totalRequestsThrottled / getTotalNumberOfRequests()) * HUNDRED; + for (Map.Entry entry : metricsMap.entrySet()) { + metricString.append("$RCTSI$_").append(entry.getKey()) + .append("R_").append("=") + .append(entry.getValue().getNumberOfRequestsSucceeded()); + long totalRequests = entry.getValue().getTotalRequests(); + if (totalRequests > 0) { + metricString.append("$MMA$_").append(entry.getKey()) + .append("R_").append("=") + .append(String.format("%.3f", + (double) entry.getValue().getMinBackoff() / THOUSAND)) + .append("s") + .append(String.format("%.3f", + (double) entry.getValue().getMaxBackoff() / THOUSAND)) + .append("s") + .append(String.format("%.3f", + ((double) entry.getValue().getTotalBackoff() / totalRequests) + / THOUSAND)) + .append("s"); + } else { + metricString.append("$MMA$_").append(entry.getKey()) + .append("R_").append("=0s"); + } + } + metricString.append("$BWT=") + .append(getNumberOfBandwidthThrottledRequests()) + .append("$IT=") + .append(getNumberOfIOPSThrottledRequests()) + .append("$OT=") + .append(getNumberOfOtherThrottledRequests()) + .append("$RT=") + .append(String.format("%.3f", percentageOfRequestsThrottled)) + .append("$NFR=") + .append(getNumberOfNetworkFailedRequests()) + .append("$TRNR=") + .append(getNumberOfRequestsSucceededWithoutRetrying()) + .append("$TRF=") + .append(getNumberOfRequestsFailed()) + .append("$TR=") + .append(getTotalNumberOfRequests()) + .append("$MRC=") + .append(getMaxRetryCount()); + + return metricString + ""; + } +} + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index a1b6fc12a5..6e5e772e18 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -22,6 +22,7 @@ import java.lang.reflect.Field; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; import org.apache.hadoop.util.Preconditions; import org.apache.commons.lang3.StringUtils; @@ -291,6 +292,26 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING) private boolean enableAutoThrottling; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_IDLE_TIMEOUT, + DefaultValue = DEFAULT_METRIC_IDLE_TIMEOUT_MS) + private int metricIdleTimeout; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ANALYSIS_TIMEOUT, + DefaultValue = DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS) + private int metricAnalysisTimeout; + + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_URI, + DefaultValue = EMPTY_STRING) + private String metricUri; + + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ACCOUNT_NAME, + DefaultValue = EMPTY_STRING) + private String metricAccount; + + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_METRIC_ACCOUNT_KEY, + DefaultValue = EMPTY_STRING) + private String metricAccountKey; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT, DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS) private int accountOperationIdleTimeout; @@ -818,6 +839,26 @@ public boolean isAutoThrottlingEnabled() { return this.enableAutoThrottling; } + public int getMetricIdleTimeout() { + return this.metricIdleTimeout; + } + + public int getMetricAnalysisTimeout() { + return this.metricAnalysisTimeout; + } + + public String getMetricUri() { + return metricUri; + } + + public String getMetricAccount() { + return metricAccount; + } + + public String getMetricAccountKey() { + return metricAccountKey; + } + public int getAccountOperationIdleTimeout() { return accountOperationIdleTimeout; } @@ -854,6 +895,10 @@ public TracingHeaderFormat getTracingHeaderFormat() { return getEnum(FS_AZURE_TRACINGHEADER_FORMAT, TracingHeaderFormat.ALL_ID_FORMAT); } + public MetricFormat getMetricFormat() { + return getEnum(FS_AZURE_METRIC_FORMAT, MetricFormat.EMPTY); + } + public AuthType getAuthType(String accountName) { return getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java index 67ee8e90ef..c4d3e05cdb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java @@ -21,10 +21,12 @@ import java.net.URI; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.VisibleForTesting; - import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; +import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics; +import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; @@ -34,8 +36,42 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableMetric; -import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_APPEND; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_CREATE; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_CREATE_NON_RECURSIVE; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_DELETE; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_EXIST; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_DELEGATION_TOKEN; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_FILE_STATUS; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_LIST_STATUS; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_MKDIRS; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_OPEN; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_RENAME; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.DIRECTORIES_CREATED; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.DIRECTORIES_DELETED; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.ERROR_IGNORED; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.FILES_CREATED; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.FILES_DELETED; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_DELETE_REQUEST; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_GET_REQUEST; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_HEAD_REQUEST; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PATCH_REQUEST; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_POST_REQUEST; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PUT_REQUEST; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.METADATA_INCOMPLETE_RENAME_FAILURES; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.READ_THROTTLES; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_RECOVERY; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SERVER_UNAVAILABLE; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.WRITE_THROTTLES; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; +import static org.apache.hadoop.util.Time.now; + /** * Instrumentation of Abfs counters. @@ -63,6 +99,12 @@ public class AbfsCountersImpl implements AbfsCounters { private final IOStatisticsStore ioStatisticsStore; + private AbfsBackoffMetrics abfsBackoffMetrics = null; + + private AbfsReadFooterMetrics abfsReadFooterMetrics = null; + + private AtomicLong lastExecutionTime = null; + private static final AbfsStatistic[] STATISTIC_LIST = { CALL_CREATE, CALL_OPEN, @@ -91,7 +133,6 @@ public class AbfsCountersImpl implements AbfsCounters { RENAME_RECOVERY, METADATA_INCOMPLETE_RENAME_FAILURES, RENAME_PATH_ATTEMPTS - }; private static final AbfsStatistic[] DURATION_TRACKER_LIST = { @@ -121,6 +162,25 @@ public AbfsCountersImpl(URI uri) { ioStatisticsStoreBuilder.withDurationTracking(durationStats.getStatName()); } ioStatisticsStore = ioStatisticsStoreBuilder.build(); + lastExecutionTime = new AtomicLong(now()); + } + + @Override + public void initializeMetrics(MetricFormat metricFormat) { + switch (metricFormat) { + case INTERNAL_BACKOFF_METRIC_FORMAT: + abfsBackoffMetrics = new AbfsBackoffMetrics(); + break; + case INTERNAL_FOOTER_METRIC_FORMAT: + abfsReadFooterMetrics = new AbfsReadFooterMetrics(); + break; + case INTERNAL_METRIC_FORMAT: + abfsBackoffMetrics = new AbfsBackoffMetrics(); + abfsReadFooterMetrics = new AbfsReadFooterMetrics(); + break; + default: + break; + } } /** @@ -188,6 +248,21 @@ private MetricsRegistry getRegistry() { return registry; } + @Override + public AbfsBackoffMetrics getAbfsBackoffMetrics() { + return abfsBackoffMetrics != null ? abfsBackoffMetrics : null; + } + + @Override + public AtomicLong getLastExecutionTime() { + return lastExecutionTime; + } + + @Override + public AbfsReadFooterMetrics getAbfsReadFooterMetrics() { + return abfsReadFooterMetrics != null ? abfsReadFooterMetrics : null; + } + /** * {@inheritDoc} * @@ -244,4 +319,25 @@ public IOStatistics getIOStatistics() { public DurationTracker trackDuration(String key) { return ioStatisticsStore.trackDuration(key); } + + @Override + public String toString() { + String metric = ""; + if (abfsBackoffMetrics != null) { + long totalNoRequests = getAbfsBackoffMetrics().getTotalNumberOfRequests(); + if (totalNoRequests > 0) { + metric += "#BO:" + getAbfsBackoffMetrics().toString(); + } + } + if (abfsReadFooterMetrics != null) { + Map metricsMap = getAbfsReadFooterMetrics().getMetricsMap(); + if (metricsMap != null && !(metricsMap.isEmpty())) { + String readFooterMetric = getAbfsReadFooterMetrics().toString(); + if (!readFooterMetric.equals("")) { + metric += "#FO:" + getAbfsReadFooterMetrics().toString(); + } + } + } + return metric; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 51ba90f8e0..7ca960d569 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -41,7 +41,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; - import javax.annotation.Nullable; import org.apache.hadoop.classification.VisibleForTesting; @@ -50,7 +49,6 @@ import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; @@ -700,6 +698,18 @@ public synchronized void close() throws IOException { if (isClosed) { return; } + if (abfsStore.getClient().isMetricCollectionEnabled()) { + TracingContext tracingMetricContext = new TracingContext( + clientCorrelationId, + fileSystemId, FSOperationType.GET_ATTR, true, + tracingHeaderFormat, + listener, abfsCounters.toString()); + try { + getAbfsClient().getMetricCall(tracingMetricContext); + } catch (IOException e) { + throw new IOException(e); + } + } // does all the delete-on-exit calls, and may be slow. super.close(); LOG.debug("AzureBlobFileSystem.close"); @@ -1680,3 +1690,4 @@ public IOStatistics getIOStatistics() { return abfsCounters != null ? abfsCounters.getIOStatistics() : null; } } + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index b667934c39..299cc5c9c4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -41,6 +41,10 @@ public final class ConfigurationKeys { */ public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = "fs.azure.account.expect.header.enabled"; public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key"; + public static final String FS_AZURE_METRIC_ACCOUNT_NAME = "fs.azure.metric.account.name"; + public static final String FS_AZURE_METRIC_ACCOUNT_KEY = "fs.azure.metric.account.key"; + public static final String FS_AZURE_METRIC_URI = "fs.azure.metric.uri"; + public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)"; public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode"; public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled"; @@ -150,6 +154,8 @@ public final class ConfigurationKeys { public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization"; public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization"; public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; + public static final String FS_AZURE_METRIC_IDLE_TIMEOUT = "fs.azure.metric.idle.timeout"; + public static final String FS_AZURE_METRIC_ANALYSIS_TIMEOUT = "fs.azure.metric.analysis.timeout"; public static final String FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT = "fs.azure.account.operation.idle.timeout"; public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period"; public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; @@ -190,6 +196,7 @@ public final class ConfigurationKeys { * character constraints are not satisfied. **/ public static final String FS_AZURE_CLIENT_CORRELATIONID = "fs.azure.client.correlationid"; public static final String FS_AZURE_TRACINGHEADER_FORMAT = "fs.azure.tracingheader.format"; + public static final String FS_AZURE_METRIC_FORMAT = "fs.azure.metric.format"; public static final String FS_AZURE_CLUSTER_NAME = "fs.azure.cluster.name"; public static final String FS_AZURE_CLUSTER_TYPE = "fs.azure.cluster.type"; public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 0af485bbe5..ade0dc39cf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -108,6 +108,8 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_ENABLE_FLUSH = true; public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true; public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true; + public static final int DEFAULT_METRIC_IDLE_TIMEOUT_MS = 60_000; + public static final int DEFAULT_METRIC_ANALYSIS_TIMEOUT_MS = 60_000; public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true; public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000; public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000; @@ -161,5 +163,9 @@ public final class FileSystemConfigurations { */ public static final int RATE_LIMIT_DEFAULT = 1_000; + public static final int ZERO = 0; + public static final int HUNDRED = 100; + public static final long THOUSAND = 1000L; + private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index 84a94b994c..b3c2b21d3c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -43,6 +43,7 @@ public final class HttpHeaderConfigurations { public static final String USER_AGENT = "User-Agent"; public static final String X_HTTP_METHOD_OVERRIDE = "X-HTTP-Method-Override"; public static final String X_MS_CLIENT_REQUEST_ID = "x-ms-client-request-id"; + public static final String X_MS_FECLIENT_METRICS = "x-ms-feclient-metrics"; public static final String X_MS_EXISTING_RESOURCE_TYPE = "x-ms-existing-resource-type"; public static final String X_MS_DATE = "x-ms-date"; public static final String X_MS_REQUEST_ID = "x-ms-request-id"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java index 12e687c15b..439caabe23 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java @@ -21,7 +21,8 @@ import java.net.HttpURLConnection; import java.util.ArrayList; import java.util.List; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -60,6 +61,9 @@ public enum AzureServiceErrorCode { private final String errorCode; private final int httpStatusCode; private final String errorMessage; + + private static final Logger LOG1 = LoggerFactory.getLogger(AzureServiceErrorCode.class); + AzureServiceErrorCode(String errorCode, int httpStatusCodes, String errorMessage) { this.errorCode = errorCode; this.httpStatusCode = httpStatusCodes; @@ -105,7 +109,6 @@ public static AzureServiceErrorCode getAzureServiceCode(int httpStatusCode, Stri return azureServiceErrorCode; } } - return UNKNOWN; } @@ -113,16 +116,15 @@ public static AzureServiceErrorCode getAzureServiceCode(int httpStatusCode, Stri if (errorCode == null || errorCode.isEmpty() || httpStatusCode == UNKNOWN.httpStatusCode || errorMessage == null || errorMessage.isEmpty()) { return UNKNOWN; } - + String[] errorMessages = errorMessage.split(System.lineSeparator(), 2); for (AzureServiceErrorCode azureServiceErrorCode : AzureServiceErrorCode.values()) { - if (azureServiceErrorCode.httpStatusCode == httpStatusCode - && errorCode.equalsIgnoreCase(azureServiceErrorCode.errorCode) - && errorMessage.equalsIgnoreCase(azureServiceErrorCode.errorMessage) - ) { + if (azureServiceErrorCode.getStatusCode() == httpStatusCode + && azureServiceErrorCode.getErrorCode().equalsIgnoreCase(errorCode) + && azureServiceErrorCode.getErrorMessage() + .equalsIgnoreCase(errorMessages[0])) { return azureServiceErrorCode; } } - return UNKNOWN; } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 1ab1c7a0af..f4ff181357 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -22,24 +22,31 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; +import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; import java.net.URLEncoder; +import java.net.UnknownHostException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Locale; +import java.util.Timer; +import java.util.TimerTask; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; import org.apache.hadoop.fs.azurebfs.utils.NamespaceUtil; import org.apache.hadoop.fs.store.LogExactlyOnce; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions; @@ -115,6 +122,13 @@ public class AbfsClient implements Closeable { private AccessTokenProvider tokenProvider; private SASTokenProvider sasTokenProvider; private final AbfsCounters abfsCounters; + private final Timer timer; + private final String abfsMetricUrl; + private boolean isMetricCollectionEnabled = false; + private final MetricFormat metricFormat; + private final AtomicBoolean isMetricCollectionStopped; + private final int metricAnalysisPeriod; + private final int metricIdlePeriod; private EncryptionContextProvider encryptionContextProvider = null; private EncryptionType encryptionType = EncryptionType.NONE; private final AbfsThrottlingIntercept intercept; @@ -123,6 +137,9 @@ public class AbfsClient implements Closeable { private Boolean isNamespaceEnabled; private boolean renameResilience; + private TimerTask runningTimerTask; + private boolean isSendMetricCall; + private SharedKeyCredentials metricSharedkeyCredentials = null; /** * logging the rename failure if metadata is in an incomplete state. @@ -181,6 +198,35 @@ private AbfsClient(final URL baseUrl, new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease Ops").setDaemon(true).build(); this.executorService = MoreExecutors.listeningDecorator( HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(), tf)); + this.metricFormat = abfsConfiguration.getMetricFormat(); + this.isMetricCollectionStopped = new AtomicBoolean(false); + this.metricAnalysisPeriod = abfsConfiguration.getMetricAnalysisTimeout(); + this.metricIdlePeriod = abfsConfiguration.getMetricIdleTimeout(); + if (!metricFormat.toString().equals("")) { + isMetricCollectionEnabled = true; + abfsCounters.initializeMetrics(metricFormat); + String metricAccountName = abfsConfiguration.getMetricAccount(); + int dotIndex = metricAccountName.indexOf(AbfsHttpConstants.DOT); + if (dotIndex <= 0) { + throw new InvalidUriException( + metricAccountName + " - account name is not fully qualified."); + } + String metricAccountKey = abfsConfiguration.getMetricAccountKey(); + try { + metricSharedkeyCredentials = new SharedKeyCredentials(metricAccountName.substring(0, dotIndex), + metricAccountKey); + } catch (IllegalArgumentException e) { + throw new IOException("Exception while initializing metric credentials " + e); + } + } + this.timer = new Timer( + "abfs-timer-client", true); + if (isMetricCollectionEnabled) { + timer.schedule(new TimerTaskImpl(), + metricIdlePeriod, + metricIdlePeriod); + } + this.abfsMetricUrl = abfsConfiguration.getMetricUri(); } public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, @@ -207,6 +253,10 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent @Override public void close() throws IOException { + if (runningTimerTask != null) { + runningTimerTask.cancel(); + timer.purge(); + } if (tokenProvider instanceof Closeable) { IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider); @@ -246,6 +296,10 @@ SharedKeyCredentials getSharedKeyCredentials() { return sharedKeyCredentials; } + SharedKeyCredentials getMetricSharedkeyCredentials() { + return metricSharedkeyCredentials; + } + public void setEncryptionType(EncryptionType encryptionType) { this.encryptionType = encryptionType; } @@ -1057,7 +1111,6 @@ public AbfsRestOperation getPathStatus(final String path, final ContextEncryptionAdapter contextEncryptionAdapter) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); String operation = SASTokenProvider.GET_PROPERTIES_OPERATION; if (!includeProperties) { @@ -1318,7 +1371,6 @@ public AbfsRestOperation getAclStatus(final String path, TracingContext tracingC public AbfsRestOperation getAclStatus(final String path, final boolean useUPN, TracingContext tracingContext) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_ACCESS_CONTROL); abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(useUPN)); @@ -1435,6 +1487,7 @@ private String appendSASTokenToQuery(String path, return sasToken; } + @VisibleForTesting private URL createRequestUrl(final String query) throws AzureBlobFileSystemException { return createRequestUrl(EMPTY_STRING, query); } @@ -1442,7 +1495,12 @@ private URL createRequestUrl(final String query) throws AzureBlobFileSystemExcep @VisibleForTesting protected URL createRequestUrl(final String path, final String query) throws AzureBlobFileSystemException { - final String base = baseUrl.toString(); + return createRequestUrl(baseUrl, path, query); + } + + @VisibleForTesting + protected URL createRequestUrl(final URL baseUrl, final String path, final String query) + throws AzureBlobFileSystemException { String encodedPath = path; try { encodedPath = urlEncode(path); @@ -1452,7 +1510,10 @@ protected URL createRequestUrl(final String path, final String query) } final StringBuilder sb = new StringBuilder(); - sb.append(base); + if (baseUrl == null) { + throw new InvalidUriException("URL provided is null"); + } + sb.append(baseUrl.toString()); sb.append(encodedPath); sb.append(query); @@ -1460,7 +1521,7 @@ protected URL createRequestUrl(final String path, final String query) try { url = new URL(sb.toString()); } catch (MalformedURLException ex) { - throw new InvalidUriException(sb.toString()); + throw new InvalidUriException("URL is malformed" + sb.toString()); } return url; } @@ -1693,7 +1754,7 @@ void setIsNamespaceEnabled(final Boolean isNamespaceEnabled) { * Getter for abfsCounters from AbfsClient. * @return AbfsCounters instance. */ - protected AbfsCounters getAbfsCounters() { + public AbfsCounters getAbfsCounters() { return abfsCounters; } @@ -1731,6 +1792,128 @@ protected AccessTokenProvider getTokenProvider() { return tokenProvider; } + /** + * Retrieves a TracingContext object configured for metric tracking. + * This method creates a TracingContext object with the validated client correlation ID, + * the host name of the local machine (or "UnknownHost" if unable to determine), + * the file system operation type set to GET_ATTR, and additional configuration parameters + * for metric tracking. + * The TracingContext is intended for use in tracking metrics related to Azure Blob FileSystem (ABFS) operations. + * + * @return A TracingContext object configured for metric tracking. + */ + private TracingContext getMetricTracingContext() { + String hostName; + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + hostName = "UnknownHost"; + } + return new TracingContext(TracingContext.validateClientCorrelationID( + abfsConfiguration.getClientCorrelationId()), + hostName, FSOperationType.GET_ATTR, true, + abfsConfiguration.getTracingHeaderFormat(), + null, abfsCounters.toString()); + } + + /** + * Synchronized method to suspend or resume timer. + * @param timerFunctionality resume or suspend. + * @param timerTask The timertask object. + * @return true or false. + */ + boolean timerOrchestrator(TimerFunctionality timerFunctionality, TimerTask timerTask) { + switch (timerFunctionality) { + case RESUME: + if (isMetricCollectionStopped.get()) { + synchronized (this) { + if (isMetricCollectionStopped.get()) { + resumeTimer(); + } + } + } + break; + case SUSPEND: + long now = System.currentTimeMillis(); + long lastExecutionTime = abfsCounters.getLastExecutionTime().get(); + if (isMetricCollectionEnabled && (now - lastExecutionTime >= metricAnalysisPeriod)) { + synchronized (this) { + if (!isMetricCollectionStopped.get()) { + timerTask.cancel(); + timer.purge(); + isMetricCollectionStopped.set(true); + return true; + } + } + } + break; + default: + break; + } + return false; + } + + private void resumeTimer() { + isMetricCollectionStopped.set(false); + timer.schedule(new TimerTaskImpl(), + metricIdlePeriod, + metricIdlePeriod); + } + + /** + * Initiates a metric call to the Azure Blob FileSystem (ABFS) for retrieving file system properties. + * This method performs a HEAD request to the specified metric URL, using default headers and query parameters. + * + * @param tracingContext The tracing context to be used for capturing tracing information. + * @throws IOException throws IOException. + */ + public void getMetricCall(TracingContext tracingContext) throws IOException { + this.isSendMetricCall = true; + final List requestHeaders = createDefaultHeaders(); + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + + final URL url = createRequestUrl(new URL(abfsMetricUrl), EMPTY_STRING, abfsUriQueryBuilder.toString()); + + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetFileSystemProperties, + HTTP_METHOD_HEAD, + url, + requestHeaders); + try { + op.execute(tracingContext); + } finally { + this.isSendMetricCall = false; + } + } + + public boolean isSendMetricCall() { + return isSendMetricCall; + } + + public boolean isMetricCollectionEnabled() { + return isMetricCollectionEnabled; + } + + class TimerTaskImpl extends TimerTask { + TimerTaskImpl() { + runningTimerTask = this; + } + @Override + public void run() { + try { + if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) { + try { + getMetricCall(getMetricTracingContext()); + } finally { + abfsCounters.initializeMetrics(metricFormat); + } + } + } catch (IOException e) { + } + } + } + /** * Creates an AbfsRestOperation with additional parameters for buffer and SAS token. * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java index 0a5182a699..baf79e7dd8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java @@ -53,7 +53,7 @@ public AbfsPerfTracker getAbfsPerfTracker() { return abfsPerfTracker; } - public AbfsCounters getAbfsCounters() { + AbfsCounters getAbfsCounters() { return abfsCounters; } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java index d01a3598af..65e5fa29a1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java @@ -19,12 +19,15 @@ package org.apache.hadoop.fs.azurebfs.services; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.azurebfs.AbfsBackoffMetrics; import org.apache.hadoop.fs.azurebfs.AbfsStatistic; +import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.statistics.IOStatisticsSource; @@ -74,4 +77,12 @@ String formString(String prefix, String separator, String suffix, */ @Override DurationTracker trackDuration(String key); + + void initializeMetrics(MetricFormat metricFormat); + + AbfsBackoffMetrics getAbfsBackoffMetrics(); + + AbfsReadFooterMetrics getAbfsReadFooterMetrics(); + + AtomicLong getLastExecutionTime(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 19c67a8358..cacd3b092e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -110,15 +110,15 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private int bCursorBkp; private long fCursorBkp; private long fCursorAfterLastReadBkp; - + private final AbfsReadFooterMetrics abfsReadFooterMetrics; /** Stream statistics. */ private final AbfsInputStreamStatistics streamStatistics; private long bytesFromReadAhead; // bytes read from readAhead; for testing private long bytesFromRemoteRead; // bytes read remotely; for testing private Listener listener; - private final AbfsInputStreamContext context; private IOStatistics ioStatistics; + private String filePathIdentifier; /** * This is the actual position within the object, used by * lazy seek to decide whether to seek on the next read or not. @@ -141,9 +141,6 @@ public AbfsInputStream( this.path = path; this.contentLength = contentLength; this.bufferSize = abfsInputStreamContext.getReadBufferSize(); - /* - * FooterReadSize should not be more than bufferSize. - */ this.footerReadSize = Math.min(bufferSize, abfsInputStreamContext.getFooterReadBufferSize()); this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); @@ -157,12 +154,19 @@ public AbfsInputStream( this.cachedSasToken = new CachedSASToken( abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); this.streamStatistics = abfsInputStreamContext.getStreamStatistics(); + this.abfsReadFooterMetrics = client.getAbfsCounters().getAbfsReadFooterMetrics(); this.inputStreamId = createInputStreamId(); this.tracingContext = new TracingContext(tracingContext); this.tracingContext.setOperation(FSOperationType.READ); this.tracingContext.setStreamID(inputStreamId); this.context = abfsInputStreamContext; readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize(); + if (abfsReadFooterMetrics != null) { + this.filePathIdentifier = eTag + path; + synchronized (this) { + abfsReadFooterMetrics.updateMap(filePathIdentifier); + } + } this.fsBackRef = abfsInputStreamContext.getFsBackRef(); contextEncryptionAdapter = abfsInputStreamContext.getEncryptionAdapter(); @@ -253,6 +257,9 @@ public synchronized int read(final byte[] b, final int off, final int len) throw // go back and read from buffer is fCursor - limit. // There maybe case that we read less than requested data. long filePosAtStartOfBuffer = fCursor - limit; + if (abfsReadFooterMetrics != null) { + abfsReadFooterMetrics.checkMetricUpdate(filePathIdentifier, len, contentLength, nextReadPos); + } if (nextReadPos >= filePosAtStartOfBuffer && nextReadPos <= fCursor) { // Determining position in buffer from where data is to be read. bCursor = (int) (nextReadPos - filePosAtStartOfBuffer); @@ -339,7 +346,6 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO if (firstRead) { firstRead = false; } - if (bytesRead == -1) { return -1; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java new file mode 100644 index 0000000000..5abb97cd9c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java @@ -0,0 +1,549 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; + +public class AbfsReadFooterMetrics { + private final AtomicBoolean isParquetFile; + private final AtomicBoolean isParquetEvaluated; + private final AtomicBoolean isLenUpdated; + private String sizeReadByFirstRead; + private String offsetDiffBetweenFirstAndSecondRead; + private final AtomicLong fileLength; + private double avgFileLength; + private double avgReadLenRequested; + private final AtomicBoolean collectMetrics; + private final AtomicBoolean collectMetricsForNextRead; + private final AtomicBoolean collectLenMetrics; + private final AtomicLong dataLenRequested; + private final AtomicLong offsetOfFirstRead; + private final AtomicInteger readCount; + private final ConcurrentSkipListMap metricsMap; + private static final String FOOTER_LENGTH = "20"; + + public AbfsReadFooterMetrics() { + this.isParquetFile = new AtomicBoolean(false); + this.isParquetEvaluated = new AtomicBoolean(false); + this.isLenUpdated = new AtomicBoolean(false); + this.fileLength = new AtomicLong(); + this.readCount = new AtomicInteger(0); + this.offsetOfFirstRead = new AtomicLong(); + this.collectMetrics = new AtomicBoolean(false); + this.collectMetricsForNextRead = new AtomicBoolean(false); + this.collectLenMetrics = new AtomicBoolean(false); + this.dataLenRequested = new AtomicLong(0); + this.metricsMap = new ConcurrentSkipListMap<>(); + } + + public Map getMetricsMap() { + return metricsMap; + } + + private boolean getIsParquetFile() { + return isParquetFile.get(); + } + + public void setIsParquetFile(boolean isParquetFile) { + this.isParquetFile.set(isParquetFile); + } + + private String getSizeReadByFirstRead() { + return sizeReadByFirstRead; + } + + public void setSizeReadByFirstRead(final String sizeReadByFirstRead) { + this.sizeReadByFirstRead = sizeReadByFirstRead; + } + + private String getOffsetDiffBetweenFirstAndSecondRead() { + return offsetDiffBetweenFirstAndSecondRead; + } + + public void setOffsetDiffBetweenFirstAndSecondRead(final String offsetDiffBetweenFirstAndSecondRead) { + this.offsetDiffBetweenFirstAndSecondRead + = offsetDiffBetweenFirstAndSecondRead; + } + + private long getFileLength() { + return fileLength.get(); + } + + private void setFileLength(long fileLength) { + this.fileLength.set(fileLength); + } + + private double getAvgFileLength() { + return avgFileLength; + } + + public void setAvgFileLength(final double avgFileLength) { + this.avgFileLength = avgFileLength; + } + + private double getAvgReadLenRequested() { + return avgReadLenRequested; + } + + public void setAvgReadLenRequested(final double avgReadLenRequested) { + this.avgReadLenRequested = avgReadLenRequested; + } + + private boolean getCollectMetricsForNextRead() { + return collectMetricsForNextRead.get(); + } + + private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead) { + this.collectMetricsForNextRead.set(collectMetricsForNextRead); + } + + private long getOffsetOfFirstRead() { + return offsetOfFirstRead.get(); + } + + private void setOffsetOfFirstRead(long offsetOfFirstRead) { + this.offsetOfFirstRead.set(offsetOfFirstRead); + } + + private int getReadCount() { + return readCount.get(); + } + + private void setReadCount(int readCount) { + this.readCount.set(readCount); + } + + private int incrementReadCount() { + this.readCount.incrementAndGet(); + return getReadCount(); + } + + private boolean getCollectLenMetrics() { + return collectLenMetrics.get(); + } + + private void setCollectLenMetrics(boolean collectLenMetrics) { + this.collectLenMetrics.set(collectLenMetrics); + + } + + private long getDataLenRequested() { + return dataLenRequested.get(); + } + + private void setDataLenRequested(long dataLenRequested) { + this.dataLenRequested.set(dataLenRequested); + } + + private void updateDataLenRequested(long dataLenRequested){ + this.dataLenRequested.addAndGet(dataLenRequested); + } + + private boolean getCollectMetrics() { + return collectMetrics.get(); + } + + private void setCollectMetrics(boolean collectMetrics) { + this.collectMetrics.set(collectMetrics); + } + + private boolean getIsParquetEvaluated() { + return isParquetEvaluated.get(); + } + + private void setIsParquetEvaluated(boolean isParquetEvaluated) { + this.isParquetEvaluated.set(isParquetEvaluated); + } + + private boolean getIsLenUpdated() { + return isLenUpdated.get(); + } + + private void setIsLenUpdated(boolean isLenUpdated) { + this.isLenUpdated.set(isLenUpdated); + } + + /** + * Updates the metrics map with an entry for the specified file if it doesn't already exist. + * + * @param filePathIdentifier The unique identifier for the file. + */ + public void updateMap(String filePathIdentifier) { + // If the file is not already in the metrics map, add it with a new AbfsReadFooterMetrics object. + metricsMap.computeIfAbsent(filePathIdentifier, key -> new AbfsReadFooterMetrics()); + } + + /** + * Checks and updates metrics for a specific file identified by filePathIdentifier. + * If the metrics do not exist for the file, they are initialized. + * + * @param filePathIdentifier The unique identifier for the file. + * @param len The length of the read operation. + * @param contentLength The total content length of the file. + * @param nextReadPos The position of the next read operation. + */ + public void checkMetricUpdate(final String filePathIdentifier, final int len, final long contentLength, + final long nextReadPos) { + AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent( + filePathIdentifier, key -> new AbfsReadFooterMetrics()); + if (readFooterMetrics.getReadCount() == 0 + || (readFooterMetrics.getReadCount() >= 1 + && readFooterMetrics.getCollectMetrics())) { + updateMetrics(filePathIdentifier, len, contentLength, nextReadPos); + } + } + + /** + * Updates metrics for a specific file identified by filePathIdentifier. + * + * @param filePathIdentifier The unique identifier for the file. + * @param len The length of the read operation. + * @param contentLength The total content length of the file. + * @param nextReadPos The position of the next read operation. + */ + private void updateMetrics(final String filePathIdentifier, final int len, final long contentLength, + final long nextReadPos) { + AbfsReadFooterMetrics readFooterMetrics = metricsMap.get(filePathIdentifier); + + // Create a new AbfsReadFooterMetrics object if it doesn't exist in the metricsMap. + if (readFooterMetrics == null) { + readFooterMetrics = new AbfsReadFooterMetrics(); + metricsMap.put(filePathIdentifier, readFooterMetrics); + } + + int readCount; + synchronized (this) { + readCount = readFooterMetrics.incrementReadCount(); + } + + if (readCount == 1) { + // Update metrics for the first read. + updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len, contentLength); + } + + synchronized (this) { + if (readFooterMetrics.getCollectLenMetrics()) { + readFooterMetrics.updateDataLenRequested(len); + } + } + + if (readCount == 2) { + // Update metrics for the second read. + updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len); + } + } + + /** + * Updates metrics for the first read operation. + * + * @param readFooterMetrics The metrics object to update. + * @param nextReadPos The position of the next read operation. + * @param len The length of the read operation. + * @param contentLength The total content length of the file. + */ + private void updateMetricsOnFirstRead(AbfsReadFooterMetrics readFooterMetrics, long nextReadPos, int len, long contentLength) { + if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH) * ONE_KB) { + readFooterMetrics.setCollectMetrics(true); + readFooterMetrics.setCollectMetricsForNextRead(true); + readFooterMetrics.setOffsetOfFirstRead(nextReadPos); + readFooterMetrics.setSizeReadByFirstRead(len + "_" + Math.abs(contentLength - nextReadPos)); + readFooterMetrics.setFileLength(contentLength); + } + } + + /** + * Updates metrics for the second read operation. + * + * @param readFooterMetrics The metrics object to update. + * @param nextReadPos The position of the next read operation. + * @param len The length of the read operation. + */ + private void updateMetricsOnSecondRead(AbfsReadFooterMetrics readFooterMetrics, long nextReadPos, int len) { + if (readFooterMetrics.getCollectMetricsForNextRead()) { + long offsetDiff = Math.abs(nextReadPos - readFooterMetrics.getOffsetOfFirstRead()); + readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + offsetDiff); + readFooterMetrics.setCollectLenMetrics(true); + } + } + + + /** + * Check if the given file should be marked as a Parquet file. + * + * @param metrics The metrics to evaluate. + * @return True if the file meet the criteria for being marked as a Parquet file, false otherwise. + */ + private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) { + return metrics.getCollectMetrics() + && metrics.getReadCount() >= 2 + && !metrics.getIsParquetEvaluated() + && haveEqualValues(metrics.getSizeReadByFirstRead()) + && haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead()); + } + + /** + * Check if two values are equal, considering they are in the format "value1_value2". + * + * @param value The value to check. + * @return True if the two parts of the value are equal, false otherwise. + */ + private boolean haveEqualValues(String value) { + String[] parts = value.split("_"); + return parts.length == 2 && parts[0].equals(parts[1]); + } + + /** + * Mark the given metrics as a Parquet file and update related values. + * + * @param metrics The metrics to mark as Parquet. + */ + private void markAsParquet(AbfsReadFooterMetrics metrics) { + metrics.setIsParquetFile(true); + String[] parts = metrics.getSizeReadByFirstRead().split("_"); + metrics.setSizeReadByFirstRead(parts[0]); + parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_"); + metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]); + metrics.setIsParquetEvaluated(true); + } + + /** + * Check each metric in the provided map and mark them as Parquet files if they meet the criteria. + * + * @param metricsMap The map containing metrics to evaluate. + */ + public void checkIsParquet(Map metricsMap) { + for (Map.Entry entry : metricsMap.entrySet()) { + AbfsReadFooterMetrics readFooterMetrics = entry.getValue(); + if (shouldMarkAsParquet(readFooterMetrics)) { + markAsParquet(readFooterMetrics); + metricsMap.replace(entry.getKey(), readFooterMetrics); + } + } + } + + /** + * Updates the average read length requested for metrics of all files in the metrics map. + * If the metrics indicate that the update is needed, it calculates the average read length and updates the metrics. + * + * @param metricsMap A map containing metrics for different files with unique identifiers. + */ + private void updateLenRequested(Map metricsMap) { + for (AbfsReadFooterMetrics readFooterMetrics : metricsMap.values()) { + if (shouldUpdateLenRequested(readFooterMetrics)) { + int readReqCount = readFooterMetrics.getReadCount() - 2; + readFooterMetrics.setAvgReadLenRequested( + (double) readFooterMetrics.getDataLenRequested() / readReqCount); + readFooterMetrics.setIsLenUpdated(true); + } + } + } + + /** + * Checks whether the average read length requested should be updated for the given metrics. + * + * The method returns true if the following conditions are met: + * - Metrics collection is enabled. + * - The number of read counts is greater than 2. + * - The average read length has not been updated previously. + * + * @param readFooterMetrics The metrics object to evaluate. + * @return True if the average read length should be updated, false otherwise. + */ + private boolean shouldUpdateLenRequested(AbfsReadFooterMetrics readFooterMetrics) { + return readFooterMetrics.getCollectMetrics() + && readFooterMetrics.getReadCount() > 2 + && !readFooterMetrics.getIsLenUpdated(); + } + + /** + * Calculates the average metrics from a list of AbfsReadFooterMetrics and sets the values in the provided 'avgParquetReadFooterMetrics' object. + * + * @param isParquetList The list of AbfsReadFooterMetrics to compute the averages from. + * @param avgParquetReadFooterMetrics The target AbfsReadFooterMetrics object to store the computed average values. + * + * This method calculates various average metrics from the provided list and sets them in the 'avgParquetReadFooterMetrics' object. + * The metrics include: + * - Size read by the first read + * - Offset difference between the first and second read + * - Average file length + * - Average requested read length + */ + private void getParquetReadFooterMetricsAverage(List isParquetList, + AbfsReadFooterMetrics avgParquetReadFooterMetrics){ + avgParquetReadFooterMetrics.setSizeReadByFirstRead( + String.format("%.3f", isParquetList.stream() + .map(AbfsReadFooterMetrics::getSizeReadByFirstRead).mapToDouble( + Double::parseDouble).average().orElse(0.0))); + avgParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead( + String.format("%.3f", isParquetList.stream() + .map(AbfsReadFooterMetrics::getOffsetDiffBetweenFirstAndSecondRead) + .mapToDouble(Double::parseDouble).average().orElse(0.0))); + avgParquetReadFooterMetrics.setAvgFileLength(isParquetList.stream() + .mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0)); + avgParquetReadFooterMetrics.setAvgReadLenRequested(isParquetList.stream(). + map(AbfsReadFooterMetrics::getAvgReadLenRequested). + mapToDouble(Double::doubleValue).average().orElse(0.0)); + } + + /** + * Calculates the average metrics from a list of non-Parquet AbfsReadFooterMetrics instances. + * + * This method takes a list of AbfsReadFooterMetrics representing non-Parquet reads and calculates + * the average values for the size read by the first read and the offset difference between the first + * and second read. The averages are then set in the provided AbfsReadFooterMetrics instance. + * + * @param isNonParquetList A list of AbfsReadFooterMetrics instances representing non-Parquet reads. + * @param avgNonParquetReadFooterMetrics The AbfsReadFooterMetrics instance to store the calculated averages. + * It is assumed that the size of the list is at least 1, and the first + * element of the list is used to determine the size of arrays. + * The instance is modified in-place with the calculated averages. + * + * + **/ + private void getNonParquetReadFooterMetricsAverage(List isNonParquetList, + AbfsReadFooterMetrics avgNonParquetReadFooterMetrics) { + int size = isNonParquetList.get(0).getSizeReadByFirstRead().split("_").length; + double[] store = new double[2 * size]; + // Calculating sum of individual values + isNonParquetList.forEach(abfsReadFooterMetrics -> { + String[] firstReadSize = abfsReadFooterMetrics.getSizeReadByFirstRead().split("_"); + String[] offDiffFirstSecondRead = abfsReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_"); + + for (int i = 0; i < firstReadSize.length; i++) { + store[i] += Long.parseLong(firstReadSize[i]); + store[i + size] += Long.parseLong(offDiffFirstSecondRead[i]); + } + }); + + // Calculating averages and creating formatted strings + StringJoiner firstReadSize = new StringJoiner("_"); + StringJoiner offDiffFirstSecondRead = new StringJoiner("_"); + + for (int j = 0; j < size; j++) { + firstReadSize.add(String.format("%.3f", store[j] / isNonParquetList.size())); + offDiffFirstSecondRead.add(String.format("%.3f", store[j + size] / isNonParquetList.size())); + } + + avgNonParquetReadFooterMetrics.setSizeReadByFirstRead(firstReadSize.toString()); + avgNonParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(offDiffFirstSecondRead.toString()); + avgNonParquetReadFooterMetrics.setAvgFileLength(isNonParquetList.stream() + .mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0)); + avgNonParquetReadFooterMetrics.setAvgReadLenRequested(isNonParquetList.stream() + .mapToDouble(AbfsReadFooterMetrics::getAvgReadLenRequested).average().orElse(0.0)); + } + + /* + Acronyms: + 1.FR :- First Read (In case of parquet we only maintain the size requested by application for + the first read, in case of non parquet we maintain a string separated by "_" delimiter where the first + substring represents the len requested for first read and the second substring represents the seek pointer difference from the + end of the file.) + 2.SR :- Second Read (In case of parquet we only maintain the size requested by application for + the second read, in case of non parquet we maintain a string separated by "_" delimiter where the first + substring represents the len requested for second read and the second substring represents the seek pointer difference from the + offset of the first read.) + 3.FL :- Total length of the file requested for read + */ + public String getReadFooterMetrics(AbfsReadFooterMetrics avgReadFooterMetrics) { + String readFooterMetric = ""; + if (avgReadFooterMetrics.getIsParquetFile()) { + readFooterMetric += "$Parquet:"; + } else { + readFooterMetric += "$NonParquet:"; + } + readFooterMetric += "$FR=" + avgReadFooterMetrics.getSizeReadByFirstRead() + + "$SR=" + + avgReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead() + + "$FL=" + String.format("%.3f", + avgReadFooterMetrics.getAvgFileLength()) + + "$RL=" + String.format("%.3f", + avgReadFooterMetrics.getAvgReadLenRequested()); + return readFooterMetric; + } + +/** + * Retrieves and aggregates read footer metrics for both Parquet and non-Parquet files from a list + * of AbfsReadFooterMetrics instances. The function calculates the average metrics separately for + * Parquet and non-Parquet files and returns a formatted string containing the aggregated metrics. + * + * @param readFooterMetricsList A list of AbfsReadFooterMetrics instances containing read footer metrics + * for both Parquet and non-Parquet files. + * + * @return A formatted string containing the aggregated read footer metrics for both Parquet and non-Parquet files. + * + **/ +private String getFooterMetrics(List readFooterMetricsList) { + List isParquetList = new ArrayList<>(); + List isNonParquetList = new ArrayList<>(); + for (AbfsReadFooterMetrics abfsReadFooterMetrics : readFooterMetricsList) { + if (abfsReadFooterMetrics.getIsParquetFile()) { + isParquetList.add(abfsReadFooterMetrics); + } else { + if (abfsReadFooterMetrics.getReadCount() >= 2) { + isNonParquetList.add(abfsReadFooterMetrics); + } + } + } + AbfsReadFooterMetrics avgParquetReadFooterMetrics = new AbfsReadFooterMetrics(); + AbfsReadFooterMetrics avgNonparquetReadFooterMetrics = new AbfsReadFooterMetrics(); + String readFooterMetric = ""; + if (!isParquetList.isEmpty()) { + avgParquetReadFooterMetrics.setIsParquetFile(true); + getParquetReadFooterMetricsAverage(isParquetList, avgParquetReadFooterMetrics); + readFooterMetric += getReadFooterMetrics(avgParquetReadFooterMetrics); + } + if (!isNonParquetList.isEmpty()) { + avgNonparquetReadFooterMetrics.setIsParquetFile(false); + getNonParquetReadFooterMetricsAverage(isNonParquetList, avgNonparquetReadFooterMetrics); + readFooterMetric += getReadFooterMetrics(avgNonparquetReadFooterMetrics); + } + return readFooterMetric; +} + + + @Override + public String toString() { + Map metricsMap = getMetricsMap(); + List readFooterMetricsList = new ArrayList<>(); + if (metricsMap != null && !(metricsMap.isEmpty())) { + checkIsParquet(metricsMap); + updateLenRequested(metricsMap); + for (Map.Entry entry : metricsMap.entrySet()) { + AbfsReadFooterMetrics abfsReadFooterMetrics = entry.getValue(); + if (abfsReadFooterMetrics.getCollectMetrics()) { + readFooterMetricsList.add(entry.getValue()); + } + } + } + String readFooterMetrics = ""; + if (!readFooterMetricsList.isEmpty()) { + readFooterMetrics = getFooterMetrics(readFooterMetricsList); + } + return readFooterMetrics; + } +} + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 4abe9a574a..c696bd8e68 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -37,6 +37,12 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import java.util.Map; +import org.apache.hadoop.fs.azurebfs.AbfsBackoffMetrics; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO; +import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION; @@ -68,17 +74,20 @@ public class AbfsRestOperation { private final String sasToken; private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); - + private static final Logger LOG1 = LoggerFactory.getLogger(AbfsRestOperation.class); // For uploads, this is the request entity body. For downloads, // this will hold the response entity body. private byte[] buffer; private int bufferOffset; private int bufferLength; private int retryCount = 0; - + private boolean isThrottledRequest = false; + private long maxRetryCount = 0L; + private final int maxIoRetries; private AbfsHttpOperation result; - private AbfsCounters abfsCounters; - + private final AbfsCounters abfsCounters; + private AbfsBackoffMetrics abfsBackoffMetrics; + private Map metricsMap; /** * This variable contains the reason of last API call within the same * AbfsRestOperation object. @@ -124,6 +133,11 @@ String getSasToken() { return sasToken; } + private static final int MIN_FIRST_RANGE = 1; + private static final int MAX_FIRST_RANGE = 5; + private static final int MAX_SECOND_RANGE = 15; + private static final int MAX_THIRD_RANGE = 25; + /** * Initializes a new REST operation. * @@ -165,6 +179,13 @@ String getSasToken() { || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method)); this.sasToken = sasToken; this.abfsCounters = client.getAbfsCounters(); + if (abfsCounters != null) { + this.abfsBackoffMetrics = abfsCounters.getAbfsBackoffMetrics(); + } + if (abfsBackoffMetrics != null) { + this.metricsMap = abfsBackoffMetrics.getMetricsMap(); + } + this.maxIoRetries = client.getAbfsConfiguration().getMaxIoRetries(); this.intercept = client.getIntercept(); this.retryPolicy = client.getExponentialRetryPolicy(); } @@ -196,7 +217,6 @@ String getSasToken() { this.buffer = buffer; this.bufferOffset = bufferOffset; this.bufferLength = bufferLength; - this.abfsCounters = client.getAbfsCounters(); } /** @@ -206,11 +226,12 @@ String getSasToken() { */ public void execute(TracingContext tracingContext) throws AzureBlobFileSystemException { - // Since this might be a sub-sequential or parallel rest operation // triggered by a single file system call, using a new tracing context. lastUsedTracingContext = createNewTracingContext(tracingContext); try { + abfsCounters.getLastExecutionTime().set(now()); + client.timerOrchestrator(TimerFunctionality.RESUME, null); IOStatisticsBinding.trackDurationOfInvocation(abfsCounters, AbfsStatistic.getStatNameFromHttpCall(method), () -> completeExecute(lastUsedTracingContext)); @@ -241,6 +262,12 @@ void completeExecute(TracingContext tracingContext) retryCount = 0; retryPolicy = client.getExponentialRetryPolicy(); LOG.debug("First execution of REST operation - {}", operationType); + long sleepDuration = 0L; + if (abfsBackoffMetrics != null) { + synchronized (this) { + abfsBackoffMetrics.incrementTotalNumberOfRequests(); + } + } while (!executeHttpOperation(retryCount, tracingContext)) { try { ++retryCount; @@ -248,12 +275,17 @@ void completeExecute(TracingContext tracingContext) long retryInterval = retryPolicy.getRetryInterval(retryCount); LOG.debug("Rest operation {} failed with failureReason: {}. Retrying with retryCount = {}, retryPolicy: {} and sleepInterval: {}", operationType, failureReason, retryCount, retryPolicy.getAbbreviation(), retryInterval); + if (abfsBackoffMetrics != null) { + updateBackoffTimeMetrics(retryCount, sleepDuration); + } Thread.sleep(retryInterval); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } - + if (abfsBackoffMetrics != null) { + updateBackoffMetrics(retryCount, result.getStatusCode()); + } int status = result.getStatusCode(); /* If even after exhausting all retries, the http status code has an @@ -272,6 +304,30 @@ void completeExecute(TracingContext tracingContext) LOG.trace("{} REST operation complete", operationType); } + @VisibleForTesting + void updateBackoffMetrics(int retryCount, int statusCode) { + if (abfsBackoffMetrics != null) { + if (statusCode < HttpURLConnection.HTTP_OK + || statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR) { + synchronized (this) { + if (retryCount >= maxIoRetries) { + abfsBackoffMetrics.incrementNumberOfRequestsFailed(); + } + } + } else { + synchronized (this) { + if (retryCount > ZERO && retryCount <= maxIoRetries) { + maxRetryCount = Math.max(abfsBackoffMetrics.getMaxRetryCount(), retryCount); + abfsBackoffMetrics.setMaxRetryCount(maxRetryCount); + updateCount(retryCount); + } else { + abfsBackoffMetrics.incrementNumberOfRequestsSucceededWithoutRetrying(); + } + } + } + } + } + @VisibleForTesting String getClientLatency() { return client.getAbfsPerfTracker().getClientLatency(); @@ -315,7 +371,35 @@ private boolean executeHttpOperation(final int retryCount, } httpOperation.processResponse(buffer, bufferOffset, bufferLength); - incrementCounter(AbfsStatistic.GET_RESPONSES, 1); + if (!isThrottledRequest && httpOperation.getStatusCode() + >= HttpURLConnection.HTTP_INTERNAL_ERROR) { + isThrottledRequest = true; + AzureServiceErrorCode serviceErrorCode = + AzureServiceErrorCode.getAzureServiceCode( + httpOperation.getStatusCode(), + httpOperation.getStorageErrorCode(), + httpOperation.getStorageErrorMessage()); + LOG1.trace("Service code is " + serviceErrorCode + " status code is " + + httpOperation.getStatusCode() + " error code is " + + httpOperation.getStorageErrorCode() + + " error message is " + httpOperation.getStorageErrorMessage()); + if (abfsBackoffMetrics != null) { + synchronized (this) { + if (serviceErrorCode.equals( + AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT) + || serviceErrorCode.equals( + AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT)) { + abfsBackoffMetrics.incrementNumberOfBandwidthThrottledRequests(); + } else if (serviceErrorCode.equals( + AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT)) { + abfsBackoffMetrics.incrementNumberOfIOPSThrottledRequests(); + } else { + abfsBackoffMetrics.incrementNumberOfOtherThrottledRequests(); + } + } + } + } + incrementCounter(AbfsStatistic.GET_RESPONSES, 1); //Only increment bytesReceived counter when the status code is 2XX. if (httpOperation.getStatusCode() >= HttpURLConnection.HTTP_OK && httpOperation.getStatusCode() <= HttpURLConnection.HTTP_PARTIAL) { @@ -351,7 +435,13 @@ private boolean executeHttpOperation(final int retryCount, retryPolicy = client.getRetryPolicy(failureReason); LOG.warn("Unknown host name: {}. Retrying to resolve the host name...", hostname); + if (abfsBackoffMetrics != null) { + synchronized (this) { + abfsBackoffMetrics.incrementNumberOfNetworkFailedRequests(); + } + } if (!retryPolicy.shouldRetry(retryCount, -1)) { + updateBackoffMetrics(retryCount, httpOperation.getStatusCode()); throw new InvalidAbfsRestOperationException(ex, retryCount); } return false; @@ -360,13 +450,17 @@ private boolean executeHttpOperation(final int retryCount, if (LOG.isDebugEnabled()) { LOG.debug("HttpRequestFailure: {}, {}", httpOperation, ex); } - + if (abfsBackoffMetrics != null) { + synchronized (this) { + abfsBackoffMetrics.incrementNumberOfNetworkFailedRequests(); + } + } failureReason = RetryReason.getAbbreviation(ex, -1, ""); retryPolicy = client.getRetryPolicy(failureReason); if (!retryPolicy.shouldRetry(retryCount, -1)) { + updateBackoffMetrics(retryCount, httpOperation.getStatusCode()); throw new InvalidAbfsRestOperationException(ex, retryCount); } - return false; } finally { int statusCode = httpOperation.getStatusCode(); @@ -388,26 +482,30 @@ private boolean executeHttpOperation(final int retryCount, */ @VisibleForTesting public void signRequest(final AbfsHttpOperation httpOperation, int bytesToSign) throws IOException { - switch(client.getAuthType()) { - case Custom: - case OAuth: - LOG.debug("Authenticating request with OAuth2 access token"); - httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, - client.getAccessToken()); - break; - case SAS: - // do nothing; the SAS token should already be appended to the query string - httpOperation.setMaskForSAS(); //mask sig/oid from url for logs - break; - case SharedKey: - default: - // sign the HTTP request - LOG.debug("Signing request with shared key"); - // sign the HTTP request - client.getSharedKeyCredentials().signRequest( - httpOperation.getConnection(), - bytesToSign); - break; + if (client.isSendMetricCall()) { + client.getMetricSharedkeyCredentials().signRequest(httpOperation.getConnection(), bytesToSign); + } else { + switch (client.getAuthType()) { + case Custom: + case OAuth: + LOG.debug("Authenticating request with OAuth2 access token"); + httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + client.getAccessToken()); + break; + case SAS: + // do nothing; the SAS token should already be appended to the query string + httpOperation.setMaskForSAS(); //mask sig/oid from url for logs + break; + case SharedKey: + default: + // sign the HTTP request + LOG.debug("Signing request with shared key"); + // sign the HTTP request + client.getSharedKeyCredentials().signRequest( + httpOperation.getConnection(), + bytesToSign); + break; + } } } @@ -436,6 +534,60 @@ private void incrementCounter(AbfsStatistic statistic, long value) { } } + /** + * Updates the count metrics based on the provided retry count. + * @param retryCount The retry count used to determine the metrics category. + * + * This method increments the number of succeeded requests for the specified retry count. + */ + private void updateCount(int retryCount){ + String retryCounter = getKey(retryCount); + metricsMap.get(retryCounter).incrementNumberOfRequestsSucceeded(); + } + + /** + * Updates backoff time metrics based on the provided retry count and sleep duration. + * @param retryCount The retry count used to determine the metrics category. + * @param sleepDuration The duration of sleep during backoff. + * + * This method calculates and updates various backoff time metrics, including minimum, maximum, + * and total backoff time, as well as the total number of requests for the specified retry count. + */ + private void updateBackoffTimeMetrics(int retryCount, long sleepDuration) { + synchronized (this) { + String retryCounter = getKey(retryCount); + AbfsBackoffMetrics abfsBackoffMetrics = metricsMap.get(retryCounter); + long minBackoffTime = Math.min(abfsBackoffMetrics.getMinBackoff(), sleepDuration); + long maxBackoffForTime = Math.max(abfsBackoffMetrics.getMaxBackoff(), sleepDuration); + long totalBackoffTime = abfsBackoffMetrics.getTotalBackoff() + sleepDuration; + abfsBackoffMetrics.incrementTotalRequests(); + abfsBackoffMetrics.setMinBackoff(minBackoffTime); + abfsBackoffMetrics.setMaxBackoff(maxBackoffForTime); + abfsBackoffMetrics.setTotalBackoff(totalBackoffTime); + metricsMap.put(retryCounter, abfsBackoffMetrics); + } + } + + /** + * Generates a key based on the provided retry count to categorize metrics. + * + * @param retryCount The retry count used to determine the key. + * @return A string key representing the metrics category for the given retry count. + * + * This method categorizes retry counts into different ranges and assigns a corresponding key. + */ + private String getKey(int retryCount) { + if (retryCount >= MIN_FIRST_RANGE && retryCount < MAX_FIRST_RANGE) { + return Integer.toString(retryCount); + } else if (retryCount >= MAX_FIRST_RANGE && retryCount < MAX_SECOND_RANGE) { + return "5_15"; + } else if (retryCount >= MAX_SECOND_RANGE && retryCount < MAX_THIRD_RANGE) { + return "15_25"; + } else { + return "25AndAbove"; + } + } + /** * Updating Client Side Throttling Metrics for relevant response status codes. * Following criteria is used to decide based on status code and failure reason. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java index bf7da69ec4..ca94c7f86b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java @@ -18,9 +18,11 @@ package org.apache.hadoop.fs.azurebfs.services; +/** + * Class for Timer Functionality. + */ public enum TimerFunctionality { RESUME, - SUSPEND } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/MetricFormat.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/MetricFormat.java new file mode 100644 index 0000000000..48c216ff6e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/MetricFormat.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.utils; + +public enum MetricFormat { + INTERNAL_BACKOFF_METRIC_FORMAT, // :: + // : + + INTERNAL_FOOTER_METRIC_FORMAT, // :: + // : + + INTERNAL_METRIC_FORMAT, // :: + // :: + + EMPTY; + + @Override + public String toString() { + return this == EMPTY ? "" : this.name(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index 3c54c204dd..b0a9a021c5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -63,6 +63,8 @@ public class TracingContext { private Listener listener = null; // null except when testing //final concatenated ID list set into x-ms-client-request-id header private String header = EMPTY_STRING; + private String metricResults = EMPTY_STRING; + private String metricHeader = EMPTY_STRING; /** * If {@link #primaryRequestId} is null, this field shall be set equal @@ -112,6 +114,15 @@ public TracingContext(String clientCorrelationID, String fileSystemID, } } + public TracingContext(String clientCorrelationID, String fileSystemID, + FSOperationType opType, boolean needsPrimaryReqId, + TracingHeaderFormat tracingHeaderFormat, Listener listener, String metricResults) { + this(clientCorrelationID, fileSystemID, opType, needsPrimaryReqId, tracingHeaderFormat, + listener); + this.metricResults = metricResults; + } + + public TracingContext(TracingContext originalTracingContext) { this.fileSystemID = originalTracingContext.fileSystemID; this.streamID = originalTracingContext.streamID; @@ -123,8 +134,8 @@ public TracingContext(TracingContext originalTracingContext) { if (originalTracingContext.listener != null) { this.listener = originalTracingContext.listener.getClone(); } + this.metricResults = originalTracingContext.metricResults; } - public static String validateClientCorrelationID(String clientCorrelationID) { if ((clientCorrelationID.length() > MAX_CLIENT_CORRELATION_ID_LENGTH) || (!clientCorrelationID.matches(CLIENT_CORRELATION_ID_PATTERN))) { @@ -181,17 +192,24 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail + getPrimaryRequestIdForHeader(retryCount > 0) + ":" + streamID + ":" + opType + ":" + retryCount; header = addFailureReasons(header, previousFailure, retryPolicyAbbreviation); + metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; break; case TWO_ID_FORMAT: header = clientCorrelationID + ":" + clientRequestId; + metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; break; default: - header = clientRequestId; //case SINGLE_ID_FORMAT + //case SINGLE_ID_FORMAT + header = clientRequestId; + metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : ""; } if (listener != null) { //for testing listener.callTracingHeaderValidator(header, format); } httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, header); + if (!metricHeader.equals(EMPTY_STRING)) { + httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_FECLIENT_METRICS, metricHeader); + } /* * In case the primaryRequestId is an empty-string and if it is the first try to * API call (previousFailure shall be null), maintain the last part of clientRequestId's diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 008cb14354..c0e20dfe16 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -1052,6 +1052,49 @@ Note that these performance numbers are also sent back to the ADLS Gen 2 API end in the `x-ms-abfs-client-latency` HTTP headers in subsequent requests. Azure uses these settings to track their end-to-end latency. +### Driver Metric Options + +Config `fs.azure.metric.format` provides an option to select the format of IDs included in the `header` for metrics. +This config accepts a String value corresponding to the following enum options. +`INTERNAL_METRIC_FORMAT` : backoff + footer metrics +`INTERNAL_BACKOFF_METRIC_FORMAT` : backoff metrics +`INTERNAL_FOOTER_METRIC_FORMAT` : footer metrics +`EMPTY` : default + +`fs.azure.metric.account.name`: This configuration parameter is used to specify the name of the account which will be +used to push the metrics to the backend. We can configure a separate account to push metrics to the store or use the +same for as the existing account on which other requests are made. + +```xml + + + fs.azure.metric.account.name + METRICACCOUNTNAME.dfs.core.windows.net + +``` + +`fs.azure.metric.account.key`: This is the access key for the storage account used for pushing metrics to the store. + +```xml + + + fs.azure.metric.account.key + ACCOUNTKEY + +``` + +`fs.azure.metric.uri`: This configuration provides the uri in the format of 'https://`` +.dfs.core.windows.net/``'. This should be a part of the config in order to prevent extra calls to create +the filesystem. We use an existing filsystem to push the metrics. + +```xml + + + fs.azure.metric.uri + https://METRICACCOUNTNAME.dfs.core.windows.net/CONTAINERNAME + +``` + ## Troubleshooting The problems associated with the connector usually come down to, in order diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index e8cbeb1255..afc92c111a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -34,7 +34,6 @@ import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.StoreStatisticNames; import org.apache.hadoop.io.IOUtils; - import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadFooterMetrics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadFooterMetrics.java new file mode 100644 index 0000000000..0071b90771 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadFooterMetrics.java @@ -0,0 +1,385 @@ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; +import org.junit.Test; + +import java.io.IOException; +import java.util.Random; +import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.statistics.IOStatisticsLogging; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; +import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; + +public class ITestAbfsReadFooterMetrics extends AbstractAbfsScaleTest { + + public ITestAbfsReadFooterMetrics() throws Exception { + } + + private static final String TEST_PATH = "/testfile"; + private static final String SLEEP_PERIOD = "90000"; + + /** + * Integration test for reading footer metrics with both Parquet and non-Parquet reads. + */ + @Test + public void testReadFooterMetricsWithParquetAndNonParquet() throws Exception { + testReadWriteAndSeek(8 * ONE_MB, DEFAULT_READ_BUFFER_SIZE, ONE_KB, 4 * ONE_KB); + } + + /** + * Configures the AzureBlobFileSystem with the given buffer size. + * + * @param bufferSize Buffer size to set for write and read operations. + * @return AbfsConfiguration used for configuration. + */ + private Configuration getConfiguration(int bufferSize) { + final Configuration configuration = getRawConfiguration(); + configuration.set(FS_AZURE_METRIC_FORMAT, String.valueOf(MetricFormat.INTERNAL_FOOTER_METRIC_FORMAT)); + configuration.setInt(AZURE_READ_BUFFER_SIZE, bufferSize); + configuration.setInt(AZURE_WRITE_BUFFER_SIZE, bufferSize); + return configuration; + } + + /** + * Writes data to the specified file path in the AzureBlobFileSystem. + * + * @param fs AzureBlobFileSystem instance. + * @param testPath Path to the file. + * @param data Data to write to the file. + */ + private void writeDataToFile(AzureBlobFileSystem fs, Path testPath, byte[] data) throws IOException { + FSDataOutputStream stream = fs.create(testPath); + try { + stream.write(data); + } finally { + stream.close(); + } + IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream); + } + + /** + * Asserts that the actual metrics obtained from the AzureBlobFileSystem match the expected metrics string. + * + * @param fs AzureBlobFileSystem instance. + * @param expectedMetrics Expected metrics string. + */ + private void assertMetricsEquality(AzureBlobFileSystem fs, String expectedMetrics) { + AbfsReadFooterMetrics actualMetrics = fs.getAbfsClient().getAbfsCounters().getAbfsReadFooterMetrics(); + assertNotNull("AbfsReadFooterMetrics is null", actualMetrics); + assertEquals("The computed metrics differs from the actual metrics", expectedMetrics, actualMetrics.toString()); + } + + /** + * Test for reading footer metrics with a non-Parquet file. + */ + @Test + public void testReadFooterMetrics() throws Exception { + // Initialize AzureBlobFileSystem and set buffer size for configuration. + int bufferSize = MIN_BUFFER_SIZE; + Configuration configuration = getConfiguration(bufferSize); + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); + + // Generate random data to write to the test file. + final byte[] b = new byte[2 * bufferSize]; + new Random().nextBytes(b); + + // Set up the test file path. + Path testPath = path(TEST_PATH); + + // Write random data to the test file. + writeDataToFile(fs, testPath, b); + + // Initialize a buffer for reading data. + final byte[] readBuffer = new byte[2 * bufferSize]; + int result; + + // Initialize statistics source for logging. + IOStatisticsSource statisticsSource = null; + + try (FSDataInputStream inputStream = fs.open(testPath)) { + // Register a listener for tracing header validation. + statisticsSource = inputStream; + ((AbfsInputStream) inputStream.getWrappedStream()).registerListener( + new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.READ, true, 0, + ((AbfsInputStream) inputStream.getWrappedStream()) + .getStreamID())); + + // Perform the first read operation with seek. + inputStream.seek(bufferSize); + result = inputStream.read(readBuffer, bufferSize, bufferSize); + assertNotEquals(-1, result); + + // To test tracingHeader for case with bypassReadAhead == true + inputStream.seek(0); + byte[] temp = new byte[5]; + int t = inputStream.read(temp, 0, 1); + + // Seek back to the beginning and perform another read operation. + inputStream.seek(0); + result = inputStream.read(readBuffer, 0, bufferSize); + } + + // Log IO statistics at the INFO level. + IOStatisticsLogging.logIOStatisticsAtLevel(LOG, + IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource); + + // Ensure data is read successfully and matches the written data. + assertNotEquals("data read in final read()", -1, result); + assertArrayEquals(readBuffer, b); + + // Get non-Parquet metrics and assert metrics equality. + AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics(); + String metrics = nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics); + assertMetricsEquality(fs, metrics); + + // Close the AzureBlobFileSystem. + fs.close(); + } + + /** + * Generates and returns an instance of AbfsReadFooterMetrics for non-Parquet files. + */ + private AbfsReadFooterMetrics getNonParquetMetrics() { + AbfsReadFooterMetrics nonParquetMetrics = new AbfsReadFooterMetrics(); + nonParquetMetrics.setIsParquetFile(false); + nonParquetMetrics.setSizeReadByFirstRead("16384.000_16384.000"); + nonParquetMetrics.setOffsetDiffBetweenFirstAndSecondRead("1.000_16384.000"); + nonParquetMetrics.setAvgFileLength(Double.parseDouble("32768.000")); + nonParquetMetrics.setAvgReadLenRequested(Double.parseDouble("16384.000")); + return nonParquetMetrics; + } + + /** + * Generates and returns an instance of AbfsReadFooterMetrics for parquet files. + */ + private AbfsReadFooterMetrics getParquetMetrics() { + AbfsReadFooterMetrics parquetMetrics = new AbfsReadFooterMetrics(); + parquetMetrics.setIsParquetFile(true); + parquetMetrics.setSizeReadByFirstRead("1024.000"); + parquetMetrics.setOffsetDiffBetweenFirstAndSecondRead("4096.000"); + parquetMetrics.setAvgFileLength(Double.parseDouble("8388608.000")); + parquetMetrics.setAvgReadLenRequested(0.000); + return parquetMetrics; + } + + /** + * Test for reading, writing, and seeking with footer metrics. + * + * This method performs the integration test for reading, writing, and seeking operations + * with footer metrics. It creates an AzureBlobFileSystem, configures it, writes random data + * to a test file, performs read and seek operations, and checks the footer metrics for both + * Parquet and non-Parquet scenarios. + * + * @param fileSize Size of the test file. + * @param bufferSize Size of the buffer used for read and write operations. + * @param seek1 The position to seek to in the test file. + * @param seek2 Additional position to seek to in the test file (if not 0). + */ + private void testReadWriteAndSeek(int fileSize, int bufferSize, Integer seek1, Integer seek2) throws Exception { + // Create an AzureBlobFileSystem instance. + Configuration configuration = getConfiguration(bufferSize); + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); + + // Generate random data to write to the test file. + final byte[] b = new byte[fileSize]; + new Random().nextBytes(b); + + // Define the path for the test file. + Path testPath = path("/testfile"); + + // Write the random data to the test file. + writeDataToFile(fs, testPath, b); + + // Initialize a buffer for reading. + final byte[] readBuffer = new byte[fileSize]; + + // Initialize a source for IO statistics. + IOStatisticsSource statisticsSource = null; + + // Open an input stream for the test file. + FSDataInputStream inputStream = fs.open(testPath); + statisticsSource = inputStream; + + // Register a listener for tracing headers. + ((AbfsInputStream) inputStream.getWrappedStream()).registerListener( + new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.READ, true, 0, + ((AbfsInputStream) inputStream.getWrappedStream()) + .getStreamID())); + + // Seek to the specified position in the test file and read data. + inputStream.seek(fileSize - seek1); + inputStream.read(readBuffer, 0, seek1); + + // If seek2 is non-zero, perform an additional seek and read. + if (seek2 != 0) { + inputStream.seek(fileSize - seek1 - seek2); + inputStream.read(readBuffer, 0, seek2); + } + + // Close the input stream. + inputStream.close(); + + // Set a new buffer size for read and write operations. + int bufferSize1 = MIN_BUFFER_SIZE; + abfsConfiguration.setWriteBufferSize(bufferSize1); + abfsConfiguration.setReadBufferSize(bufferSize1); + + // Generate new random data for a second test file. + final byte[] b1 = new byte[2 * bufferSize1]; + new Random().nextBytes(b1); + + // Define the path for the second test file. + Path testPath1 = path("/testfile1"); + + // Write the new random data to the second test file. + writeDataToFile(fs, testPath1, b1); + + // Initialize a buffer for reading from the second test file. + final byte[] readBuffer1 = new byte[2 * bufferSize1]; + + // Open an input stream for the second test file. + FSDataInputStream inputStream1 = fs.open(testPath1); + statisticsSource = inputStream1; + + // Register a listener for tracing headers. + ((AbfsInputStream) inputStream1.getWrappedStream()).registerListener( + new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.READ, true, 0, + ((AbfsInputStream) inputStream1.getWrappedStream()) + .getStreamID())); + + // Seek to a position in the second test file and read data. + inputStream1.seek(bufferSize1); + inputStream1.read(readBuffer1, bufferSize1, bufferSize1); + + // To test tracingHeader for case with bypassReadAhead == true. + inputStream1.seek(0); + byte[] temp = new byte[5]; + int t = inputStream1.read(temp, 0, 1); + + // Seek to the beginning of the second test file and read data. + inputStream1.seek(0); + inputStream1.read(readBuffer1, 0, bufferSize1); + + // Close the input stream for the second test file. + inputStream1.close(); + + // Get footer metrics for both Parquet and non-Parquet scenarios. + AbfsReadFooterMetrics parquetMetrics = getParquetMetrics(); + AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics(); + + // Concatenate and assert the metrics equality. + String metrics = parquetMetrics.getReadFooterMetrics(parquetMetrics); + metrics += nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics); + assertMetricsEquality(fs, metrics); + + // Close the AzureBlobFileSystem instance. + fs.close(); + } + + /** + * Test for reading footer metrics with an idle period. + * + * This method tests reading footer metrics with an idle period. It creates an AzureBlobFileSystem, + * configures it, writes random data to a test file, performs read operations, introduces an idle + * period, and checks the footer metrics for non-Parquet scenarios. + * + */ + @Test + public void testMetricWithIdlePeriod() throws Exception { + // Set the buffer size for the test. + int bufferSize = MIN_BUFFER_SIZE; + Configuration configuration = getConfiguration(bufferSize); + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); + + // Generate random data to write to the test file. + final byte[] b = new byte[2 * bufferSize]; + new Random().nextBytes(b); + + // Define the path for the test file. + Path testPath = path(TEST_PATH); + + // Write the random data to the test file. + writeDataToFile(fs, testPath, b); + + // Initialize a buffer for reading. + final byte[] readBuffer = new byte[2 * bufferSize]; + + // Initialize a source for IO statistics. + IOStatisticsSource statisticsSource = null; + + // Open an input stream for the test file. + try (FSDataInputStream inputStream = fs.open(testPath)) { + // Register a listener for tracing headers. + ((AbfsInputStream) inputStream.getWrappedStream()).registerListener( + new TracingHeaderValidator(abfsConfiguration.getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.READ, true, 0, + ((AbfsInputStream) inputStream.getWrappedStream()) + .getStreamID())); + + // Seek to the specified position in the test file and read data. + inputStream.seek(bufferSize); + inputStream.read(readBuffer, bufferSize, bufferSize); + + // Introduce an idle period by sleeping. + int sleepPeriod = Integer.parseInt(SLEEP_PERIOD); + Thread.sleep(sleepPeriod); + + // To test tracingHeader for case with bypassReadAhead == true. + inputStream.seek(0); + byte[] temp = new byte[5]; + int t = inputStream.read(temp, 0, 1); + + // Seek to the beginning of the test file and read data. + inputStream.seek(0); + inputStream.read(readBuffer, 0, bufferSize); + + // Get and assert the footer metrics for non-Parquet scenarios. + AbfsReadFooterMetrics nonParquetMetrics = getNonParquetMetrics(); + String metrics = nonParquetMetrics.getReadFooterMetrics(nonParquetMetrics); + assertMetricsEquality(fs, metrics); + + // Introduce an additional idle period by sleeping. + Thread.sleep(sleepPeriod); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java index b374193e9b..8cdd355e00 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java @@ -80,8 +80,8 @@ public ITestAzureBlobFileSystemListStatus() throws Exception { public void testListPath() throws Exception { Configuration config = new Configuration(this.getRawConfiguration()); config.set(AZURE_LIST_MAX_RESULTS, "5000"); - try (final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem - .newInstance(getFileSystem().getUri(), config)) { + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem + .newInstance(getFileSystem().getUri(), config); final List> tasks = new ArrayList<>(); ExecutorService es = Executors.newFixedThreadPool(10); @@ -108,7 +108,10 @@ public Void call() throws Exception { fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 0)); FileStatus[] files = fs.listStatus(new Path("/")); assertEquals(TEST_FILES_NUMBER, files.length /* user directory */); - } + fs.registerListener( + new TracingHeaderValidator(getConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.GET_ATTR, true, 0)); + fs.close(); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java index b1b093d670..2b60cb57fd 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; import java.util.HashSet; @@ -27,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; import org.assertj.core.api.Assertions; import org.mockito.AdditionalMatchers; import org.mockito.Mockito; @@ -112,9 +115,13 @@ public static void addGeneralMockBehaviourToRestOpAndHttpOp(final AbfsRestOperat public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient abfsClient, final ExponentialRetryPolicy exponentialRetryPolicy, final StaticRetryPolicy staticRetryPolicy, - final AbfsThrottlingIntercept intercept) throws IOException { + final AbfsThrottlingIntercept intercept) throws IOException, URISyntaxException { Mockito.doReturn(OAuth).when(abfsClient).getAuthType(); Mockito.doReturn("").when(abfsClient).getAccessToken(); + AbfsConfiguration abfsConfiguration = Mockito.mock(AbfsConfiguration.class); + Mockito.doReturn(abfsConfiguration).when(abfsClient).getAbfsConfiguration(); + AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); + Mockito.doReturn(abfsCounters).when(abfsClient).getAbfsCounters(); Mockito.doReturn(intercept).when(abfsClient).getIntercept(); Mockito.doNothing() diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index c16bbf7c53..ca2ea92388 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -22,11 +22,14 @@ import java.lang.reflect.Field; import java.net.HttpURLConnection; import java.net.ProtocolException; +import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; import java.util.List; import java.util.Random; import java.util.regex.Pattern; +import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; import org.assertj.core.api.Assertions; import org.junit.Test; import org.mockito.Mockito; @@ -133,8 +136,9 @@ public ITestAbfsClient() throws Exception { } private String getUserAgentString(AbfsConfiguration config, - boolean includeSSLProvider) throws IOException { - AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().build(); + boolean includeSSLProvider) throws IOException, URISyntaxException { + AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); + AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build(); AbfsClient client = new AbfsClient(new URL("https://azure.com"), null, config, (AccessTokenProvider) null, null, abfsClientContext); String sslProviderName = null; @@ -175,7 +179,7 @@ private void verifyBasicInfo(String userAgentStr) { @Test public void verifyUserAgentPrefix() - throws IOException, IllegalAccessException { + throws IOException, IllegalAccessException, URISyntaxException { final Configuration configuration = new Configuration(); configuration.addResource(TEST_CONFIGURATION_FILE_NAME); configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, FS_AZURE_USER_AGENT_PREFIX); @@ -209,7 +213,7 @@ public void verifyUserAgentPrefix() */ @Test public void verifyUserAgentExpectHeader() - throws IOException, IllegalAccessException { + throws IOException, IllegalAccessException, URISyntaxException { final Configuration configuration = new Configuration(); configuration.addResource(TEST_CONFIGURATION_FILE_NAME); configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, FS_AZURE_USER_AGENT_PREFIX); @@ -315,18 +319,20 @@ public void verifyUserAgentClusterType() throws Exception { public static AbfsClient createTestClientFromCurrentContext( AbfsClient baseAbfsClientInstance, - AbfsConfiguration abfsConfig) throws IOException { + AbfsConfiguration abfsConfig) throws IOException, URISyntaxException { AuthType currentAuthType = abfsConfig.getAuthType( abfsConfig.getAccountName()); AbfsPerfTracker tracker = new AbfsPerfTracker("test", abfsConfig.getAccountName(), abfsConfig); + AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsPerfTracker(tracker) .withExponentialRetryPolicy( new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries())) + .withAbfsCounters(abfsCounters) .build(); // Create test AbfsClient @@ -352,6 +358,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, AbfsConfiguration abfsConfig) throws Exception { AuthType currentAuthType = abfsConfig.getAuthType( abfsConfig.getAccountName()); + AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); org.junit.Assume.assumeTrue( (currentAuthType == AuthType.SharedKey) @@ -372,14 +379,18 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, when(client.createDefaultUriQueryBuilder()).thenCallRealMethod(); when(client.createRequestUrl(any(), any())).thenCallRealMethod(); + when(client.createRequestUrl(any(), any(), any())).thenCallRealMethod(); when(client.getAccessToken()).thenCallRealMethod(); when(client.getSharedKeyCredentials()).thenCallRealMethod(); when(client.createDefaultHeaders()).thenCallRealMethod(); when(client.getAbfsConfiguration()).thenReturn(abfsConfig); + when(client.getIntercept()).thenReturn( AbfsThrottlingInterceptFactory.getInstance( abfsConfig.getAccountName().substring(0, abfsConfig.getAccountName().indexOf(DOT)), abfsConfig)); + when(client.getAbfsCounters()).thenReturn(abfsCounters); + // override baseurl client = ITestAbfsClient.setAbfsClientField(client, "abfsConfiguration", abfsConfig); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 9027e56c9c..e4ed9881ff 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -19,11 +19,14 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.Optional; import java.util.Random; import java.util.concurrent.ExecutionException; +import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; @@ -98,9 +101,11 @@ private AbfsRestOperation getMockRestOp() { return op; } - private AbfsClient getMockAbfsClient() { + private AbfsClient getMockAbfsClient() throws URISyntaxException { // Mock failure for client.read() AbfsClient client = mock(AbfsClient.class); + AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); + Mockito.doReturn(abfsCounters).when(client).getAbfsCounters(); AbfsPerfTracker tracker = new AbfsPerfTracker( "test", this.getAccountName(), diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java new file mode 100644 index 0000000000..683528b9c5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperation.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; +import org.junit.Test; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT; +import static org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.DeletePath; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import java.util.ArrayList; +import java.util.Arrays; +import org.junit.Assert; +import java.net.HttpURLConnection; + +public class TestAbfsRestOperation extends + AbstractAbfsIntegrationTest { + + public TestAbfsRestOperation() throws Exception { + } + + /** + * Test for backoff retry metrics. + * + * This method tests backoff retry metrics by creating an AzureBlobFileSystem, initializing an + * AbfsClient, and performing mock operations on an AbfsRestOperation. The method then updates + * backoff metrics using the AbfsRestOperation. + * + */ + @Test + public void testBackoffRetryMetrics() throws Exception { + // Create an AzureBlobFileSystem instance. + final Configuration configuration = getRawConfiguration(); + configuration.set(FS_AZURE_METRIC_FORMAT, String.valueOf(MetricFormat.INTERNAL_BACKOFF_METRIC_FORMAT)); + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration(); + + // Get an instance of AbfsClient and AbfsRestOperation. + AbfsClient testClient = super.getAbfsClient(super.getAbfsStore(fs)); + AbfsRestOperation op = ITestAbfsClient.getRestOp( + DeletePath, testClient, HTTP_METHOD_DELETE, + ITestAbfsClient.getTestUrl(testClient, "/NonExistingPath"), ITestAbfsClient.getTestRequestHeaders(testClient)); + + // Mock retry counts and status code. + ArrayList retryCounts = new ArrayList<>(Arrays.asList("35", "28", "31", "45", "10", "2", "9")); + int statusCode = HttpURLConnection.HTTP_UNAVAILABLE; + + // Update backoff metrics. + for (String retryCount : retryCounts) { + op.updateBackoffMetrics(Integer.parseInt(retryCount), statusCode); + } + + // For retry count greater than the max configured value, the request should fail. + Assert.assertEquals("Number of failed requests does not match expected value.", + "3", String.valueOf(testClient.getAbfsCounters().getAbfsBackoffMetrics().getNumberOfRequestsFailed())); + + // Close the AzureBlobFileSystem. + fs.close(); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java index 078b42cf0d..966b34f872 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java @@ -23,7 +23,6 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.ArrayList; - import org.assertj.core.api.Assertions; import org.junit.Test; import org.mockito.Mockito; @@ -308,7 +307,7 @@ private void testClientRequestIdForStatusRetry(int status, int[] statusCount = new int[1]; statusCount[0] = 0; Mockito.doAnswer(answer -> { - if (statusCount[0] <= 5) { + if (statusCount[0] <= 10) { statusCount[0]++; return status; }