diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java index ef04feca69..b6b08fe009 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java @@ -316,6 +316,30 @@ public final class StoreStatisticNames { public static final String ACTION_HTTP_GET_REQUEST = "action_http_get_request"; + /** + * An HTTP DELETE request was made: {@value}. + */ + public static final String ACTION_HTTP_DELETE_REQUEST + = "action_http_delete_request"; + + /** + * An HTTP PUT request was made: {@value}. + */ + public static final String ACTION_HTTP_PUT_REQUEST + = "action_http_put_request"; + + /** + * An HTTP PATCH request was made: {@value}. + */ + public static final String ACTION_HTTP_PATCH_REQUEST + = "action_http_patch_request"; + + /** + * An HTTP POST request was made: {@value}. + */ + public static final String ACTION_HTTP_POST_REQUEST + = "action_http_post_request"; + /** * An HTTP HEAD request was made: {@value}. */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java index 357f53b611..a86913a007 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java @@ -19,24 +19,23 @@ package org.apache.hadoop.fs.azurebfs; import java.net.URI; -import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; -import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStoreBuilder; import org.apache.hadoop.metrics2.MetricStringBuilder; -import org.apache.hadoop.metrics2.MetricsCollector; -import org.apache.hadoop.metrics2.MetricsInfo; -import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import org.apache.hadoop.metrics2.MetricsTag; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableMetric; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; /** * Instrumentation of Abfs counters. @@ -62,6 +61,8 @@ public class AbfsCountersImpl implements AbfsCounters { private final MetricsRegistry registry = new MetricsRegistry("abfsMetrics").setContext(CONTEXT); + private final IOStatisticsStore ioStatisticsStore; + private static final AbfsStatistic[] STATISTIC_LIST = { CALL_CREATE, CALL_OPEN, @@ -85,7 +86,17 @@ public class AbfsCountersImpl implements AbfsCounters { BYTES_SENT, BYTES_RECEIVED, READ_THROTTLES, - WRITE_THROTTLES + WRITE_THROTTLES, + SERVER_UNAVAILABLE + }; + + private static final AbfsStatistic[] DURATION_TRACKER_LIST = { + HTTP_HEAD_REQUEST, + HTTP_GET_REQUEST, + HTTP_DELETE_REQUEST, + HTTP_PUT_REQUEST, + HTTP_PATCH_REQUEST, + HTTP_POST_REQUEST }; public AbfsCountersImpl(URI uri) { @@ -95,9 +106,17 @@ public AbfsCountersImpl(URI uri) { fileSystemInstanceId.toString()); registry.tag(METRIC_BUCKET, "Hostname from the FS URL", uri.getHost()); + IOStatisticsStoreBuilder ioStatisticsStoreBuilder = iostatisticsStore(); + // Declaring the counters. for (AbfsStatistic stats : STATISTIC_LIST) { + ioStatisticsStoreBuilder.withCounters(stats.getStatName()); createCounter(stats); } + // Declaring the DurationTrackers. + for (AbfsStatistic durationStats : DURATION_TRACKER_LIST) { + ioStatisticsStoreBuilder.withDurationTracking(durationStats.getStatName()); + } + ioStatisticsStore = ioStatisticsStoreBuilder.build(); } /** @@ -149,6 +168,7 @@ private MutableCounterLong createCounter(AbfsStatistic stats) { */ @Override public void incrementCounter(AbfsStatistic statistic, long value) { + ioStatisticsStore.incrementCounter(statistic.getStatName(), value); MutableCounterLong counter = lookupCounter(statistic.getStatName()); if (counter != null) { counter.incr(value); @@ -189,98 +209,35 @@ public String formString(String prefix, String separator, String suffix, /** * {@inheritDoc} * - * Creating a map of all the counters for testing. + * Map of all the counters for testing. * - * @return a map of the metrics. + * @return a map of the IOStatistics counters. */ @VisibleForTesting @Override public Map toMap() { - MetricsToMap metricBuilder = new MetricsToMap(null); - registry.snapshot(metricBuilder, true); - return metricBuilder.getMap(); + return ioStatisticsStore.counters(); } - protected static class MetricsToMap extends MetricsRecordBuilder { - private final MetricsCollector parent; - private final Map map = - new HashMap<>(); + /** + * Returning the instance of IOStatisticsStore used to collect the metrics + * in AbfsCounters. + * + * @return instance of IOStatistics. + */ + @Override + public IOStatistics getIOStatistics() { + return ioStatisticsStore; + } - MetricsToMap(MetricsCollector parent) { - this.parent = parent; - } - - @Override - public MetricsRecordBuilder tag(MetricsInfo info, String value) { - return this; - } - - @Override - public MetricsRecordBuilder add(MetricsTag tag) { - return this; - } - - @Override - public MetricsRecordBuilder add(AbstractMetric metric) { - return this; - } - - @Override - public MetricsRecordBuilder setContext(String value) { - return this; - } - - @Override - public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { - return tuple(info, value); - } - - @Override - public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { - return tuple(info, value); - } - - @Override - public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { - return tuple(info, value); - } - - @Override - public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { - return tuple(info, value); - } - - public MetricsToMap tuple(MetricsInfo info, long value) { - return tuple(info.name(), value); - } - - public MetricsToMap tuple(String name, long value) { - map.put(name, value); - return this; - } - - @Override - public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { - return tuple(info, (long) value); - } - - @Override - public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { - return tuple(info, (long) value); - } - - @Override - public MetricsCollector parent() { - return parent; - } - - /** - * Get the map. - * - * @return the map of metrics. - */ - public Map getMap() { - return map; - } + /** + * Tracks the duration of a statistic. + * + * @param key name of the statistic. + * @return DurationTracker for that statistic. + */ + @Override + public DurationTracker trackDuration(String key) { + return ioStatisticsStore.trackDuration(key); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java index 2935cd7543..bb65b0c902 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java @@ -18,7 +18,12 @@ package org.apache.hadoop.fs.azurebfs; +import java.util.HashMap; +import java.util.Map; + import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; /** * Statistic which are collected in Abfs. @@ -73,11 +78,45 @@ public enum AbfsStatistic { READ_THROTTLES("read_throttles", "Total number of times a read operation is throttled."), WRITE_THROTTLES("write_throttles", - "Total number of times a write operation is throttled."); + "Total number of times a write operation is throttled."), + SERVER_UNAVAILABLE("server_unavailable", + "Total number of times HTTP 503 status code is received in response."), + + // HTTP Duration Trackers + HTTP_HEAD_REQUEST(StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST, + "Time taken to complete a HEAD request", + AbfsHttpConstants.HTTP_METHOD_HEAD), + HTTP_GET_REQUEST(StoreStatisticNames.ACTION_HTTP_GET_REQUEST, + "Time taken to complete a GET request", + AbfsHttpConstants.HTTP_METHOD_GET), + HTTP_DELETE_REQUEST(StoreStatisticNames.ACTION_HTTP_DELETE_REQUEST, + "Time taken to complete a DELETE request", + AbfsHttpConstants.HTTP_METHOD_DELETE), + HTTP_PUT_REQUEST(StoreStatisticNames.ACTION_HTTP_PUT_REQUEST, + "Time taken to complete a PUT request", + AbfsHttpConstants.HTTP_METHOD_PUT), + HTTP_PATCH_REQUEST(StoreStatisticNames.ACTION_HTTP_PATCH_REQUEST, + "Time taken to complete a PATCH request", + AbfsHttpConstants.HTTP_METHOD_PATCH), + HTTP_POST_REQUEST(StoreStatisticNames.ACTION_HTTP_POST_REQUEST, + "Time taken to complete a POST request", + AbfsHttpConstants.HTTP_METHOD_POST); private String statName; private String statDescription; + //For http call stats only. + private String httpCall; + private static final Map HTTP_CALL_TO_NAME_MAP = new HashMap<>(); + + static { + for (AbfsStatistic statistic : values()) { + if (statistic.getHttpCall() != null) { + HTTP_CALL_TO_NAME_MAP.put(statistic.getHttpCall(), statistic.getStatName()); + } + } + } + /** * Constructor of AbfsStatistic to set statistic name and description. * @@ -89,6 +128,19 @@ public enum AbfsStatistic { this.statDescription = statDescription; } + /** + * Constructor for AbfsStatistic for HTTP durationTrackers. + * + * @param statName Name of the statistic. + * @param statDescription Description of the statistic. + * @param httpCall HTTP call associated with the stat name. + */ + AbfsStatistic(String statName, String statDescription, String httpCall) { + this.statName = statName; + this.statDescription = statDescription; + this.httpCall = httpCall; + } + /** * Getter for statistic name. * @@ -106,4 +158,23 @@ public String getStatName() { public String getStatDescription() { return statDescription; } + + /** + * Getter for http call for HTTP duration trackers. + * + * @return http call of a statistic. + */ + public String getHttpCall() { + return httpCall; + } + + /** + * Get the statistic name using the http call name. + * + * @param httpCall The HTTP call used to get the statistic name. + * @return Statistic name. + */ + public static String getStatNameFromHttpCall(String httpCall) { + return HTTP_CALL_TO_NAME_MAP.get(httpCall); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 9ad85b5bc4..877198186e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -82,6 +82,8 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; @@ -93,13 +95,15 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString; /** * A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on Windows Azure */ @InterfaceStability.Evolving -public class AzureBlobFileSystem extends FileSystem { +public class AzureBlobFileSystem extends FileSystem + implements IOStatisticsSource { public static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class); private URI uri; private Path workingDir; @@ -162,11 +166,8 @@ public String toString() { sb.append("uri=").append(uri); sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); - if (abfsCounters != null) { - sb.append(", Statistics: {").append(abfsCounters.formString("{", "=", - "}", true)); - sb.append("}"); - } + sb.append(", \nIOStatistics: {").append(ioStatisticsToString(getIOStatistics())); + sb.append("}"); sb.append('}'); return sb.toString(); } @@ -425,7 +426,9 @@ private void statIncrement(AbfsStatistic statistic) { * @param statistic the Statistic to be incremented. */ private void incrementStatistic(AbfsStatistic statistic) { - abfsCounters.incrementCounter(statistic, 1); + if (abfsCounters != null) { + abfsCounters.incrementCounter(statistic, 1); + } } /** @@ -489,7 +492,9 @@ public synchronized void close() throws IOException { LOG.debug("AzureBlobFileSystem.close"); IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager); this.isClosed = true; - LOG.debug("Closing Abfs: " + toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("Closing Abfs: {}", toString()); + } } @Override @@ -1311,6 +1316,12 @@ boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException { return abfsStore.getIsNamespaceEnabled(); } + /** + * Returns the counter() map in IOStatistics containing all the counters + * and their values. + * + * @return Map of IOStatistics counters. + */ @VisibleForTesting Map getInstrumentationMap() { return abfsCounters.toMap(); @@ -1331,4 +1342,14 @@ public boolean hasPathCapability(final Path path, final String capability) return super.hasPathCapability(p, capability); } } + + /** + * Getter for IOStatistic instance in AzureBlobFilesystem. + * + * @return the IOStatistic instance from abfsCounters. + */ + @Override + public IOStatistics getIOStatistics() { + return abfsCounters != null ? abfsCounters.getIOStatistics() : null; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java index 73996f5df8..2dac63b166 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java @@ -25,13 +25,16 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.azurebfs.AbfsStatistic; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; /** * An interface for Abfs counters. */ @InterfaceAudience.Private @InterfaceStability.Unstable -public interface AbfsCounters { +public interface AbfsCounters extends IOStatisticsSource, DurationTrackerFactory { /** * Increment a AbfsStatistic by a long value. @@ -63,4 +66,12 @@ String formString(String prefix, String separator, String suffix, @VisibleForTesting Map toMap(); + /** + * Start a DurationTracker for a request. + * + * @param key Name of the DurationTracker statistic. + * @return an instance of DurationTracker. + */ + @Override + DurationTracker trackDuration(String key); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 0dd3dcf065..c05ba0d947 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -39,9 +39,6 @@ import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; -import org.apache.hadoop.fs.statistics.StoreStatisticNames; -import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; -import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import static java.lang.Math.max; import static java.lang.Math.min; @@ -485,10 +482,8 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); - op = IOStatisticsBinding.trackDuration((IOStatisticsStore) ioStatistics, - StoreStatisticNames.ACTION_HTTP_GET_REQUEST, - () -> client.read(path, position, b, offset, length, - tolerateOobAppends ? "*" : eTag, cachedSasToken.get())); + op = client.read(path, position, b, offset, length, + tolerateOobAppends ? "*" : eTag, cachedSasToken.get()); cachedSasToken.update(op.getSasToken()); if (streamStatistics != null) { streamStatistics.remoteReadOperation(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 80b35ee4d3..110764c5df 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -45,9 +45,6 @@ import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSource; -import org.apache.hadoop.fs.statistics.StreamStatisticNames; -import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; -import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FSExceptionMessages; @@ -450,32 +447,29 @@ private synchronized void writeCurrentBufferToService(boolean isFlush, boolean i } } final Future job = - completionService.submit(IOStatisticsBinding - .trackDurationOfCallable((IOStatisticsStore) ioStatistics, - StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST, - () -> { - AbfsPerfTracker tracker = client.getAbfsPerfTracker(); - try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, - "writeCurrentBufferToService", "append")) { - AppendRequestParameters.Mode - mode = APPEND_MODE; - if (isFlush & isClose) { - mode = FLUSH_CLOSE_MODE; - } else if (isFlush) { - mode = FLUSH_MODE; - } - AppendRequestParameters reqParams = new AppendRequestParameters( - offset, 0, bytesLength, mode, false, leaseId); - AbfsRestOperation op = client.append(path, bytes, reqParams, - cachedSasToken.get()); - cachedSasToken.update(op.getSasToken()); - perfInfo.registerResult(op.getResult()); - byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); - perfInfo.registerSuccess(true); - return null; - } - }) - ); + completionService.submit(() -> { + AbfsPerfTracker tracker = + client.getAbfsPerfTracker(); + try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, + "writeCurrentBufferToService", "append")) { + AppendRequestParameters.Mode + mode = APPEND_MODE; + if (isFlush & isClose) { + mode = FLUSH_CLOSE_MODE; + } else if (isFlush) { + mode = FLUSH_MODE; + } + AppendRequestParameters reqParams = new AppendRequestParameters( + offset, 0, bytesLength, mode, false, leaseId); + AbfsRestOperation op = client.append(path, bytes, reqParams, + cachedSasToken.get()); + cachedSasToken.update(op.getSasToken()); + perfInfo.registerResult(op.getResult()); + byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); + perfInfo.registerSuccess(true); + return null; + } + }); if (outputStreamStatistics != null) { if (job.isCancelled()) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index b046cbc03a..4c24c37a0d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -19,12 +19,12 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.HttpURLConnection; import java.net.URL; import java.net.UnknownHostException; import java.util.List; -import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; /** * The AbfsRestOperation for Rest AbfsClient. @@ -167,12 +168,30 @@ String getSasToken() { this.abfsCounters = client.getAbfsCounters(); } + /** + * Execute a AbfsRestOperation. Track the Duration of a request if + * abfsCounters isn't null. + * + */ + public void execute() throws AzureBlobFileSystemException { + + try { + IOStatisticsBinding.trackDurationOfInvocation(abfsCounters, + AbfsStatistic.getStatNameFromHttpCall(method), + () -> completeExecute()); + } catch (AzureBlobFileSystemException aze) { + throw aze; + } catch (IOException e) { + throw new UncheckedIOException("Error while tracking Duration of an " + + "AbfsRestOperation call", e); + } + } + /** * Executes the REST operation with retry, by issuing one or more * HTTP operations. */ - @VisibleForTesting - public void execute() throws AzureBlobFileSystemException { + private void completeExecute() throws AzureBlobFileSystemException { // see if we have latency reports from the previous requests String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency(); if (latencyHeader != null && !latencyHeader.isEmpty()) { @@ -259,6 +278,8 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS && httpOperation.getStatusCode() <= HttpURLConnection.HTTP_PARTIAL) { incrementCounter(AbfsStatistic.BYTES_RECEIVED, httpOperation.getBytesReceived()); + } else if (httpOperation.getStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) { + incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1); } } catch (UnknownHostException ex) { String hostname = null; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsDurationTrackers.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsDurationTrackers.java new file mode 100644 index 0000000000..c8b687d233 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsDurationTrackers.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; +import org.apache.hadoop.io.IOUtils; + +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_DELETE_REQUEST; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_GET_REQUEST; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_HEAD_REQUEST; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PUT_REQUEST; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; + +public class ITestAbfsDurationTrackers extends AbstractAbfsIntegrationTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsDurationTrackers.class); + private static final AbfsStatistic[] HTTP_DURATION_TRACKER_LIST = { + HTTP_HEAD_REQUEST, + HTTP_GET_REQUEST, + HTTP_DELETE_REQUEST, + HTTP_PUT_REQUEST, + }; + + public ITestAbfsDurationTrackers() throws Exception { + } + + /** + * Test to check if DurationTrackers for Abfs HTTP calls work correctly and + * track the duration of the http calls. + */ + @Test + public void testAbfsHttpCallsDurations() throws IOException { + describe("test to verify if the DurationTrackers for abfs http calls " + + "work as expected."); + + AzureBlobFileSystem fs = getFileSystem(); + Path testFilePath = path(getMethodName()); + + // Declaring output and input stream. + AbfsOutputStream out = null; + AbfsInputStream in = null; + try { + // PUT the file. + out = createAbfsOutputStreamWithFlushEnabled(fs, testFilePath); + out.write('a'); + out.hflush(); + + // GET the file. + in = fs.getAbfsStore().openFileForRead(testFilePath, fs.getFsStatistics()); + int res = in.read(); + LOG.info("Result of Read: {}", res); + + // DELETE the file. + fs.delete(testFilePath, false); + + // extract the IOStatistics from the filesystem. + IOStatistics ioStatistics = extractStatistics(fs); + LOG.info(ioStatisticsToPrettyString(ioStatistics)); + assertDurationTracker(ioStatistics); + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + } + + /** + * A method to assert that all the DurationTrackers for the http calls are + * working correctly. + * + * @param ioStatistics the IOStatisticsSource in use. + */ + private void assertDurationTracker(IOStatistics ioStatistics) { + for (AbfsStatistic abfsStatistic : HTTP_DURATION_TRACKER_LIST) { + Assertions.assertThat(lookupMeanStatistic(ioStatistics, + abfsStatistic.getStatName() + StoreStatisticNames.SUFFIX_MEAN).mean()) + .describedAs("The DurationTracker Named " + abfsStatistic.getStatName() + + " Doesn't match the expected value.") + .isGreaterThan(0.0); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index a33a76ecef..4d47d5a6ed 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -31,8 +31,14 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; import org.apache.hadoop.io.IOUtils; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; + public class ITestAbfsInputStreamStatistics extends AbstractAbfsIntegrationTest { private static final int OPERATIONS = 10; @@ -386,12 +392,13 @@ public void testActionHttpGetRequest() throws IOException { abfsInputStream = abfss.openFileForRead(actionHttpGetRequestPath, fs.getFsStatistics()); abfsInputStream.read(); - AbfsInputStreamStatisticsImpl abfsInputStreamStatistics = - (AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics(); - - LOG.info("AbfsInputStreamStats info: {}", abfsInputStreamStatistics.toString()); + IOStatistics ioStatistics = extractStatistics(fs); + LOG.info("AbfsInputStreamStats info: {}", + ioStatisticsToPrettyString(ioStatistics)); Assertions.assertThat( - abfsInputStreamStatistics.getActionHttpGetRequest()) + lookupMeanStatistic(ioStatistics, + AbfsStatistic.HTTP_GET_REQUEST.getStatName() + + StoreStatisticNames.SUFFIX_MEAN).mean()) .describedAs("Mismatch in time taken by a GET request") .isGreaterThan(0.0); } finally { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java index 392e80a0a7..8be997ce69 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java @@ -28,6 +28,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; + +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; /** * Test AbfsOutputStream statistics. @@ -241,10 +247,13 @@ public void testAbfsOutputStreamDurationTrackerPutRequest() throws IOException { outputStream.write('a'); outputStream.hflush(); - AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = - getAbfsOutputStreamStatistics(outputStream); - LOG.info("AbfsOutputStreamStats info: {}", abfsOutputStreamStatistics.toString()); - Assertions.assertThat(abfsOutputStreamStatistics.getTimeSpentOnPutRequest()) + IOStatistics ioStatistics = extractStatistics(fs); + LOG.info("AbfsOutputStreamStats info: {}", + ioStatisticsToPrettyString(ioStatistics)); + Assertions.assertThat( + lookupMeanStatistic(ioStatistics, + AbfsStatistic.HTTP_PUT_REQUEST.getStatName() + + StoreStatisticNames.SUFFIX_MEAN).mean()) .describedAs("Mismatch in timeSpentOnPutRequest DurationTracker") .isGreaterThan(0.0); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java index 42205807c1..ccfc9f4539 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.statistics.IOStatistics; /** * Tests AzureBlobFileSystem Statistics. @@ -46,14 +47,21 @@ public void testInitialStatsValues() throws IOException { AbfsCounters abfsCounters = new AbfsCountersImpl(getFileSystem().getUri()); - Map metricMap = abfsCounters.toMap(); + IOStatistics ioStatistics = abfsCounters.getIOStatistics(); - for (Map.Entry entry : metricMap.entrySet()) { - String key = entry.getKey(); - Long value = entry.getValue(); + //Initial value verification for counters + for (Map.Entry entry : ioStatistics.counters().entrySet()) { + checkInitialValue(entry.getKey(), entry.getValue(), 0); + } - //Verify if initial value of statistic is 0. - checkInitialValue(key, value); + //Initial value verification for gauges + for (Map.Entry entry : ioStatistics.gauges().entrySet()) { + checkInitialValue(entry.getKey(), entry.getValue(), 0); + } + + //Initial value verifications for DurationTrackers + for (Map.Entry entry : ioStatistics.maximums().entrySet()) { + checkInitialValue(entry.getKey(), entry.getValue(), -1); } } @@ -251,8 +259,10 @@ Testing exists() calls and rename calls. Since both were called 2 * * @param statName name of the statistic to be checked. * @param statValue value of the statistic. + * @param expectedInitialValue initial value expected from this statistic. */ - private void checkInitialValue(String statName, long statValue) { - assertEquals("Mismatch in " + statName, 0, statValue); + private void checkInitialValue(String statName, long statValue, + long expectedInitialValue) { + assertEquals("Mismatch in " + statName, expectedInitialValue, statValue); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java index 395a456124..7eadb4bb8f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStreamStatistics.java @@ -39,13 +39,12 @@ public ITestAbfsStreamStatistics() throws Exception { private static final Logger LOG = LoggerFactory.getLogger(ITestAbfsStreamStatistics.class); - private static final int LARGE_NUMBER_OF_OPS = 999999; + private static final int LARGE_NUMBER_OF_OPS = 99; /*** * Testing {@code incrementReadOps()} in class {@code AbfsInputStream} and * {@code incrementWriteOps()} in class {@code AbfsOutputStream}. * - * @throws Exception */ @Test public void testAbfsStreamOps() throws Exception { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java index 0639cf2f82..628ad30863 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java @@ -21,13 +21,31 @@ import java.io.IOException; import java.util.Map; +import org.assertj.core.api.Assertions; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; + +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PATCH_REQUEST; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_POST_REQUEST; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic; public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestAbfsNetworkStatistics.class); private static final int LARGE_OPERATIONS = 1000; + private static final AbfsStatistic[] HTTP_DURATION_TRACKER_LIST = { + HTTP_POST_REQUEST, + HTTP_PATCH_REQUEST + }; public TestAbfsNetworkStatistics() throws Exception { } @@ -64,4 +82,58 @@ public void testAbfsThrottlingStatistics() throws IOException { assertAbfsStatistics(AbfsStatistic.WRITE_THROTTLES, LARGE_OPERATIONS, metricMap); } + + /** + * Test to check if the DurationTrackers are tracking as expected whilst + * doing some work. + */ + @Test + public void testAbfsNetworkDurationTrackers() + throws IOException, InterruptedException { + describe("Test to verify the actual values of DurationTrackers are " + + "greater than 0.0 while tracking some work."); + + AbfsCounters abfsCounters = new AbfsCountersImpl(getFileSystem().getUri()); + // Start dummy work for the DurationTrackers and start tracking. + try (DurationTracker ignoredPatch = + abfsCounters.trackDuration(AbfsStatistic.getStatNameFromHttpCall(AbfsHttpConstants.HTTP_METHOD_PATCH)); + DurationTracker ignoredPost = + abfsCounters.trackDuration(AbfsStatistic.getStatNameFromHttpCall(AbfsHttpConstants.HTTP_METHOD_POST)) + ) { + // Emulates doing some work. + Thread.sleep(10); + LOG.info("Execute some Http requests..."); + } + + // Extract the iostats from the abfsCounters instance. + IOStatistics ioStatistics = extractStatistics(abfsCounters); + // Asserting that the durationTrackers have mean > 0.0. + for (AbfsStatistic abfsStatistic : HTTP_DURATION_TRACKER_LIST) { + Assertions.assertThat(lookupMeanStatistic(ioStatistics, + abfsStatistic.getStatName() + StoreStatisticNames.SUFFIX_MEAN).mean()) + .describedAs("The DurationTracker Named " + abfsStatistic.getStatName() + + " Doesn't match the expected value") + .isGreaterThan(0.0); + } + } + + /** + * Test to check if abfs counter for HTTP 503 statusCode works correctly + * when incremented. + */ + @Test + public void testAbfsHTTP503ErrorCounter() throws IOException { + describe("tests to verify the expected value of the HTTP 503 error " + + "counter is equal to number of times incremented."); + + AbfsCounters abfsCounters = new AbfsCountersImpl(getFileSystem().getUri()); + // Incrementing the server_unavailable counter. + for (int i = 0; i < LARGE_OPERATIONS; i++) { + abfsCounters.incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1); + } + // Getting the IOStatistics counter map from abfsCounters. + Map metricsMap = abfsCounters.toMap(); + assertAbfsStatistics(AbfsStatistic.SERVER_UNAVAILABLE, LARGE_OPERATIONS, + metricsMap); + } }