The ABFS Filesystem and its input and output streams now implement the IOStatisticSource interface and provide IOStatistics on their interactions with Azure Storage. This includes the min/max/mean durations of all REST API calls. Contributed by Mehakmeet Singh <mehakmeet.singh@cloudera.com>
This commit is contained in:
parent
77fddcfcb1
commit
389d3034c6
@ -316,6 +316,30 @@ public final class StoreStatisticNames {
|
||||
public static final String ACTION_HTTP_GET_REQUEST
|
||||
= "action_http_get_request";
|
||||
|
||||
/**
|
||||
* An HTTP DELETE request was made: {@value}.
|
||||
*/
|
||||
public static final String ACTION_HTTP_DELETE_REQUEST
|
||||
= "action_http_delete_request";
|
||||
|
||||
/**
|
||||
* An HTTP PUT request was made: {@value}.
|
||||
*/
|
||||
public static final String ACTION_HTTP_PUT_REQUEST
|
||||
= "action_http_put_request";
|
||||
|
||||
/**
|
||||
* An HTTP PATCH request was made: {@value}.
|
||||
*/
|
||||
public static final String ACTION_HTTP_PATCH_REQUEST
|
||||
= "action_http_patch_request";
|
||||
|
||||
/**
|
||||
* An HTTP POST request was made: {@value}.
|
||||
*/
|
||||
public static final String ACTION_HTTP_POST_REQUEST
|
||||
= "action_http_post_request";
|
||||
|
||||
/**
|
||||
* An HTTP HEAD request was made: {@value}.
|
||||
*/
|
||||
|
@ -19,24 +19,23 @@
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
||||
import org.apache.hadoop.metrics2.AbstractMetric;
|
||||
import org.apache.hadoop.fs.statistics.DurationTracker;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
|
||||
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStoreBuilder;
|
||||
import org.apache.hadoop.metrics2.MetricStringBuilder;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.MetricsTag;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableMetric;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
|
||||
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
|
||||
|
||||
/**
|
||||
* Instrumentation of Abfs counters.
|
||||
@ -62,6 +61,8 @@ public class AbfsCountersImpl implements AbfsCounters {
|
||||
private final MetricsRegistry registry =
|
||||
new MetricsRegistry("abfsMetrics").setContext(CONTEXT);
|
||||
|
||||
private final IOStatisticsStore ioStatisticsStore;
|
||||
|
||||
private static final AbfsStatistic[] STATISTIC_LIST = {
|
||||
CALL_CREATE,
|
||||
CALL_OPEN,
|
||||
@ -85,7 +86,17 @@ public class AbfsCountersImpl implements AbfsCounters {
|
||||
BYTES_SENT,
|
||||
BYTES_RECEIVED,
|
||||
READ_THROTTLES,
|
||||
WRITE_THROTTLES
|
||||
WRITE_THROTTLES,
|
||||
SERVER_UNAVAILABLE
|
||||
};
|
||||
|
||||
private static final AbfsStatistic[] DURATION_TRACKER_LIST = {
|
||||
HTTP_HEAD_REQUEST,
|
||||
HTTP_GET_REQUEST,
|
||||
HTTP_DELETE_REQUEST,
|
||||
HTTP_PUT_REQUEST,
|
||||
HTTP_PATCH_REQUEST,
|
||||
HTTP_POST_REQUEST
|
||||
};
|
||||
|
||||
public AbfsCountersImpl(URI uri) {
|
||||
@ -95,9 +106,17 @@ public AbfsCountersImpl(URI uri) {
|
||||
fileSystemInstanceId.toString());
|
||||
registry.tag(METRIC_BUCKET, "Hostname from the FS URL", uri.getHost());
|
||||
|
||||
IOStatisticsStoreBuilder ioStatisticsStoreBuilder = iostatisticsStore();
|
||||
// Declaring the counters.
|
||||
for (AbfsStatistic stats : STATISTIC_LIST) {
|
||||
ioStatisticsStoreBuilder.withCounters(stats.getStatName());
|
||||
createCounter(stats);
|
||||
}
|
||||
// Declaring the DurationTrackers.
|
||||
for (AbfsStatistic durationStats : DURATION_TRACKER_LIST) {
|
||||
ioStatisticsStoreBuilder.withDurationTracking(durationStats.getStatName());
|
||||
}
|
||||
ioStatisticsStore = ioStatisticsStoreBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -149,6 +168,7 @@ private MutableCounterLong createCounter(AbfsStatistic stats) {
|
||||
*/
|
||||
@Override
|
||||
public void incrementCounter(AbfsStatistic statistic, long value) {
|
||||
ioStatisticsStore.incrementCounter(statistic.getStatName(), value);
|
||||
MutableCounterLong counter = lookupCounter(statistic.getStatName());
|
||||
if (counter != null) {
|
||||
counter.incr(value);
|
||||
@ -189,98 +209,35 @@ public String formString(String prefix, String separator, String suffix,
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*
|
||||
* Creating a map of all the counters for testing.
|
||||
* Map of all the counters for testing.
|
||||
*
|
||||
* @return a map of the metrics.
|
||||
* @return a map of the IOStatistics counters.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@Override
|
||||
public Map<String, Long> toMap() {
|
||||
MetricsToMap metricBuilder = new MetricsToMap(null);
|
||||
registry.snapshot(metricBuilder, true);
|
||||
return metricBuilder.getMap();
|
||||
return ioStatisticsStore.counters();
|
||||
}
|
||||
|
||||
protected static class MetricsToMap extends MetricsRecordBuilder {
|
||||
private final MetricsCollector parent;
|
||||
private final Map<String, Long> map =
|
||||
new HashMap<>();
|
||||
/**
|
||||
* Returning the instance of IOStatisticsStore used to collect the metrics
|
||||
* in AbfsCounters.
|
||||
*
|
||||
* @return instance of IOStatistics.
|
||||
*/
|
||||
@Override
|
||||
public IOStatistics getIOStatistics() {
|
||||
return ioStatisticsStore;
|
||||
}
|
||||
|
||||
MetricsToMap(MetricsCollector parent) {
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsRecordBuilder tag(MetricsInfo info, String value) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsRecordBuilder add(MetricsTag tag) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsRecordBuilder add(AbstractMetric metric) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsRecordBuilder setContext(String value) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
|
||||
return tuple(info, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
|
||||
return tuple(info, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
|
||||
return tuple(info, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
|
||||
return tuple(info, value);
|
||||
}
|
||||
|
||||
public MetricsToMap tuple(MetricsInfo info, long value) {
|
||||
return tuple(info.name(), value);
|
||||
}
|
||||
|
||||
public MetricsToMap tuple(String name, long value) {
|
||||
map.put(name, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
|
||||
return tuple(info, (long) value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
|
||||
return tuple(info, (long) value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsCollector parent() {
|
||||
return parent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the map.
|
||||
*
|
||||
* @return the map of metrics.
|
||||
*/
|
||||
public Map<String, Long> getMap() {
|
||||
return map;
|
||||
}
|
||||
/**
|
||||
* Tracks the duration of a statistic.
|
||||
*
|
||||
* @param key name of the statistic.
|
||||
* @return DurationTracker for that statistic.
|
||||
*/
|
||||
@Override
|
||||
public DurationTracker trackDuration(String key) {
|
||||
return ioStatisticsStore.trackDuration(key);
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,12 @@
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
||||
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
|
||||
|
||||
/**
|
||||
* Statistic which are collected in Abfs.
|
||||
@ -73,11 +78,45 @@ public enum AbfsStatistic {
|
||||
READ_THROTTLES("read_throttles",
|
||||
"Total number of times a read operation is throttled."),
|
||||
WRITE_THROTTLES("write_throttles",
|
||||
"Total number of times a write operation is throttled.");
|
||||
"Total number of times a write operation is throttled."),
|
||||
SERVER_UNAVAILABLE("server_unavailable",
|
||||
"Total number of times HTTP 503 status code is received in response."),
|
||||
|
||||
// HTTP Duration Trackers
|
||||
HTTP_HEAD_REQUEST(StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST,
|
||||
"Time taken to complete a HEAD request",
|
||||
AbfsHttpConstants.HTTP_METHOD_HEAD),
|
||||
HTTP_GET_REQUEST(StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
||||
"Time taken to complete a GET request",
|
||||
AbfsHttpConstants.HTTP_METHOD_GET),
|
||||
HTTP_DELETE_REQUEST(StoreStatisticNames.ACTION_HTTP_DELETE_REQUEST,
|
||||
"Time taken to complete a DELETE request",
|
||||
AbfsHttpConstants.HTTP_METHOD_DELETE),
|
||||
HTTP_PUT_REQUEST(StoreStatisticNames.ACTION_HTTP_PUT_REQUEST,
|
||||
"Time taken to complete a PUT request",
|
||||
AbfsHttpConstants.HTTP_METHOD_PUT),
|
||||
HTTP_PATCH_REQUEST(StoreStatisticNames.ACTION_HTTP_PATCH_REQUEST,
|
||||
"Time taken to complete a PATCH request",
|
||||
AbfsHttpConstants.HTTP_METHOD_PATCH),
|
||||
HTTP_POST_REQUEST(StoreStatisticNames.ACTION_HTTP_POST_REQUEST,
|
||||
"Time taken to complete a POST request",
|
||||
AbfsHttpConstants.HTTP_METHOD_POST);
|
||||
|
||||
private String statName;
|
||||
private String statDescription;
|
||||
|
||||
//For http call stats only.
|
||||
private String httpCall;
|
||||
private static final Map<String, String> HTTP_CALL_TO_NAME_MAP = new HashMap<>();
|
||||
|
||||
static {
|
||||
for (AbfsStatistic statistic : values()) {
|
||||
if (statistic.getHttpCall() != null) {
|
||||
HTTP_CALL_TO_NAME_MAP.put(statistic.getHttpCall(), statistic.getStatName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor of AbfsStatistic to set statistic name and description.
|
||||
*
|
||||
@ -89,6 +128,19 @@ public enum AbfsStatistic {
|
||||
this.statDescription = statDescription;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for AbfsStatistic for HTTP durationTrackers.
|
||||
*
|
||||
* @param statName Name of the statistic.
|
||||
* @param statDescription Description of the statistic.
|
||||
* @param httpCall HTTP call associated with the stat name.
|
||||
*/
|
||||
AbfsStatistic(String statName, String statDescription, String httpCall) {
|
||||
this.statName = statName;
|
||||
this.statDescription = statDescription;
|
||||
this.httpCall = httpCall;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter for statistic name.
|
||||
*
|
||||
@ -106,4 +158,23 @@ public String getStatName() {
|
||||
public String getStatDescription() {
|
||||
return statDescription;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter for http call for HTTP duration trackers.
|
||||
*
|
||||
* @return http call of a statistic.
|
||||
*/
|
||||
public String getHttpCall() {
|
||||
return httpCall;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the statistic name using the http call name.
|
||||
*
|
||||
* @param httpCall The HTTP call used to get the statistic name.
|
||||
* @return Statistic name.
|
||||
*/
|
||||
public static String getStatNameFromHttpCall(String httpCall) {
|
||||
return HTTP_CALL_TO_NAME_MAP.get(httpCall);
|
||||
}
|
||||
}
|
||||
|
@ -82,6 +82,8 @@
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
@ -93,13 +95,15 @@
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
|
||||
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
|
||||
|
||||
/**
|
||||
* A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on <a
|
||||
* href="http://store.azure.com/">Windows Azure</a>
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public class AzureBlobFileSystem extends FileSystem {
|
||||
public class AzureBlobFileSystem extends FileSystem
|
||||
implements IOStatisticsSource {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class);
|
||||
private URI uri;
|
||||
private Path workingDir;
|
||||
@ -162,11 +166,8 @@ public String toString() {
|
||||
sb.append("uri=").append(uri);
|
||||
sb.append(", user='").append(abfsStore.getUser()).append('\'');
|
||||
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
|
||||
if (abfsCounters != null) {
|
||||
sb.append(", Statistics: {").append(abfsCounters.formString("{", "=",
|
||||
"}", true));
|
||||
sb.append("}");
|
||||
}
|
||||
sb.append(", \nIOStatistics: {").append(ioStatisticsToString(getIOStatistics()));
|
||||
sb.append("}");
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
@ -425,7 +426,9 @@ private void statIncrement(AbfsStatistic statistic) {
|
||||
* @param statistic the Statistic to be incremented.
|
||||
*/
|
||||
private void incrementStatistic(AbfsStatistic statistic) {
|
||||
abfsCounters.incrementCounter(statistic, 1);
|
||||
if (abfsCounters != null) {
|
||||
abfsCounters.incrementCounter(statistic, 1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -489,7 +492,9 @@ public synchronized void close() throws IOException {
|
||||
LOG.debug("AzureBlobFileSystem.close");
|
||||
IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager);
|
||||
this.isClosed = true;
|
||||
LOG.debug("Closing Abfs: " + toString());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Closing Abfs: {}", toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1311,6 +1316,12 @@ boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {
|
||||
return abfsStore.getIsNamespaceEnabled();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the counter() map in IOStatistics containing all the counters
|
||||
* and their values.
|
||||
*
|
||||
* @return Map of IOStatistics counters.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
Map<String, Long> getInstrumentationMap() {
|
||||
return abfsCounters.toMap();
|
||||
@ -1331,4 +1342,14 @@ public boolean hasPathCapability(final Path path, final String capability)
|
||||
return super.hasPathCapability(p, capability);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter for IOStatistic instance in AzureBlobFilesystem.
|
||||
*
|
||||
* @return the IOStatistic instance from abfsCounters.
|
||||
*/
|
||||
@Override
|
||||
public IOStatistics getIOStatistics() {
|
||||
return abfsCounters != null ? abfsCounters.getIOStatistics() : null;
|
||||
}
|
||||
}
|
||||
|
@ -25,13 +25,16 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
|
||||
import org.apache.hadoop.fs.statistics.DurationTracker;
|
||||
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
|
||||
/**
|
||||
* An interface for Abfs counters.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public interface AbfsCounters {
|
||||
public interface AbfsCounters extends IOStatisticsSource, DurationTrackerFactory {
|
||||
|
||||
/**
|
||||
* Increment a AbfsStatistic by a long value.
|
||||
@ -63,4 +66,12 @@ String formString(String prefix, String separator, String suffix,
|
||||
@VisibleForTesting
|
||||
Map<String, Long> toMap();
|
||||
|
||||
/**
|
||||
* Start a DurationTracker for a request.
|
||||
*
|
||||
* @param key Name of the DurationTracker statistic.
|
||||
* @return an instance of DurationTracker.
|
||||
*/
|
||||
@Override
|
||||
DurationTracker trackDuration(String key);
|
||||
}
|
||||
|
@ -39,9 +39,6 @@
|
||||
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
|
||||
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
|
||||
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
|
||||
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
@ -485,10 +482,8 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
|
||||
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
||||
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
|
||||
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
|
||||
op = IOStatisticsBinding.trackDuration((IOStatisticsStore) ioStatistics,
|
||||
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
||||
() -> client.read(path, position, b, offset, length,
|
||||
tolerateOobAppends ? "*" : eTag, cachedSasToken.get()));
|
||||
op = client.read(path, position, b, offset, length,
|
||||
tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
|
||||
cachedSasToken.update(op.getSasToken());
|
||||
if (streamStatistics != null) {
|
||||
streamStatistics.remoteReadOperation();
|
||||
|
@ -45,9 +45,6 @@
|
||||
import org.apache.hadoop.fs.statistics.DurationTracker;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
|
||||
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
|
||||
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
|
||||
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
|
||||
import org.apache.hadoop.io.ElasticByteBufferPool;
|
||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
@ -450,32 +447,29 @@ private synchronized void writeCurrentBufferToService(boolean isFlush, boolean i
|
||||
}
|
||||
}
|
||||
final Future<Void> job =
|
||||
completionService.submit(IOStatisticsBinding
|
||||
.trackDurationOfCallable((IOStatisticsStore) ioStatistics,
|
||||
StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST,
|
||||
() -> {
|
||||
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
|
||||
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
||||
"writeCurrentBufferToService", "append")) {
|
||||
AppendRequestParameters.Mode
|
||||
mode = APPEND_MODE;
|
||||
if (isFlush & isClose) {
|
||||
mode = FLUSH_CLOSE_MODE;
|
||||
} else if (isFlush) {
|
||||
mode = FLUSH_MODE;
|
||||
}
|
||||
AppendRequestParameters reqParams = new AppendRequestParameters(
|
||||
offset, 0, bytesLength, mode, false, leaseId);
|
||||
AbfsRestOperation op = client.append(path, bytes, reqParams,
|
||||
cachedSasToken.get());
|
||||
cachedSasToken.update(op.getSasToken());
|
||||
perfInfo.registerResult(op.getResult());
|
||||
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
||||
perfInfo.registerSuccess(true);
|
||||
return null;
|
||||
}
|
||||
})
|
||||
);
|
||||
completionService.submit(() -> {
|
||||
AbfsPerfTracker tracker =
|
||||
client.getAbfsPerfTracker();
|
||||
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
||||
"writeCurrentBufferToService", "append")) {
|
||||
AppendRequestParameters.Mode
|
||||
mode = APPEND_MODE;
|
||||
if (isFlush & isClose) {
|
||||
mode = FLUSH_CLOSE_MODE;
|
||||
} else if (isFlush) {
|
||||
mode = FLUSH_MODE;
|
||||
}
|
||||
AppendRequestParameters reqParams = new AppendRequestParameters(
|
||||
offset, 0, bytesLength, mode, false, leaseId);
|
||||
AbfsRestOperation op = client.append(path, bytes, reqParams,
|
||||
cachedSasToken.get());
|
||||
cachedSasToken.update(op.getSasToken());
|
||||
perfInfo.registerResult(op.getResult());
|
||||
byteBufferPool.putBuffer(ByteBuffer.wrap(bytes));
|
||||
perfInfo.registerSuccess(true);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
if (outputStreamStatistics != null) {
|
||||
if (job.isCancelled()) {
|
||||
|
@ -19,12 +19,12 @@
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -34,6 +34,7 @@
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
||||
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
|
||||
|
||||
/**
|
||||
* The AbfsRestOperation for Rest AbfsClient.
|
||||
@ -167,12 +168,30 @@ String getSasToken() {
|
||||
this.abfsCounters = client.getAbfsCounters();
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a AbfsRestOperation. Track the Duration of a request if
|
||||
* abfsCounters isn't null.
|
||||
*
|
||||
*/
|
||||
public void execute() throws AzureBlobFileSystemException {
|
||||
|
||||
try {
|
||||
IOStatisticsBinding.trackDurationOfInvocation(abfsCounters,
|
||||
AbfsStatistic.getStatNameFromHttpCall(method),
|
||||
() -> completeExecute());
|
||||
} catch (AzureBlobFileSystemException aze) {
|
||||
throw aze;
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException("Error while tracking Duration of an "
|
||||
+ "AbfsRestOperation call", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the REST operation with retry, by issuing one or more
|
||||
* HTTP operations.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void execute() throws AzureBlobFileSystemException {
|
||||
private void completeExecute() throws AzureBlobFileSystemException {
|
||||
// see if we have latency reports from the previous requests
|
||||
String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency();
|
||||
if (latencyHeader != null && !latencyHeader.isEmpty()) {
|
||||
@ -259,6 +278,8 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS
|
||||
&& httpOperation.getStatusCode() <= HttpURLConnection.HTTP_PARTIAL) {
|
||||
incrementCounter(AbfsStatistic.BYTES_RECEIVED,
|
||||
httpOperation.getBytesReceived());
|
||||
} else if (httpOperation.getStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
|
||||
incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1);
|
||||
}
|
||||
} catch (UnknownHostException ex) {
|
||||
String hostname = null;
|
||||
|
@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_DELETE_REQUEST;
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_GET_REQUEST;
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_HEAD_REQUEST;
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PUT_REQUEST;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
|
||||
|
||||
public class ITestAbfsDurationTrackers extends AbstractAbfsIntegrationTest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ITestAbfsDurationTrackers.class);
|
||||
private static final AbfsStatistic[] HTTP_DURATION_TRACKER_LIST = {
|
||||
HTTP_HEAD_REQUEST,
|
||||
HTTP_GET_REQUEST,
|
||||
HTTP_DELETE_REQUEST,
|
||||
HTTP_PUT_REQUEST,
|
||||
};
|
||||
|
||||
public ITestAbfsDurationTrackers() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check if DurationTrackers for Abfs HTTP calls work correctly and
|
||||
* track the duration of the http calls.
|
||||
*/
|
||||
@Test
|
||||
public void testAbfsHttpCallsDurations() throws IOException {
|
||||
describe("test to verify if the DurationTrackers for abfs http calls "
|
||||
+ "work as expected.");
|
||||
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
Path testFilePath = path(getMethodName());
|
||||
|
||||
// Declaring output and input stream.
|
||||
AbfsOutputStream out = null;
|
||||
AbfsInputStream in = null;
|
||||
try {
|
||||
// PUT the file.
|
||||
out = createAbfsOutputStreamWithFlushEnabled(fs, testFilePath);
|
||||
out.write('a');
|
||||
out.hflush();
|
||||
|
||||
// GET the file.
|
||||
in = fs.getAbfsStore().openFileForRead(testFilePath, fs.getFsStatistics());
|
||||
int res = in.read();
|
||||
LOG.info("Result of Read: {}", res);
|
||||
|
||||
// DELETE the file.
|
||||
fs.delete(testFilePath, false);
|
||||
|
||||
// extract the IOStatistics from the filesystem.
|
||||
IOStatistics ioStatistics = extractStatistics(fs);
|
||||
LOG.info(ioStatisticsToPrettyString(ioStatistics));
|
||||
assertDurationTracker(ioStatistics);
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, out, in);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A method to assert that all the DurationTrackers for the http calls are
|
||||
* working correctly.
|
||||
*
|
||||
* @param ioStatistics the IOStatisticsSource in use.
|
||||
*/
|
||||
private void assertDurationTracker(IOStatistics ioStatistics) {
|
||||
for (AbfsStatistic abfsStatistic : HTTP_DURATION_TRACKER_LIST) {
|
||||
Assertions.assertThat(lookupMeanStatistic(ioStatistics,
|
||||
abfsStatistic.getStatName() + StoreStatisticNames.SUFFIX_MEAN).mean())
|
||||
.describedAs("The DurationTracker Named " + abfsStatistic.getStatName()
|
||||
+ " Doesn't match the expected value.")
|
||||
.isGreaterThan(0.0);
|
||||
}
|
||||
}
|
||||
}
|
@ -31,8 +31,14 @@
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
|
||||
|
||||
public class ITestAbfsInputStreamStatistics
|
||||
extends AbstractAbfsIntegrationTest {
|
||||
private static final int OPERATIONS = 10;
|
||||
@ -386,12 +392,13 @@ public void testActionHttpGetRequest() throws IOException {
|
||||
abfsInputStream =
|
||||
abfss.openFileForRead(actionHttpGetRequestPath, fs.getFsStatistics());
|
||||
abfsInputStream.read();
|
||||
AbfsInputStreamStatisticsImpl abfsInputStreamStatistics =
|
||||
(AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics();
|
||||
|
||||
LOG.info("AbfsInputStreamStats info: {}", abfsInputStreamStatistics.toString());
|
||||
IOStatistics ioStatistics = extractStatistics(fs);
|
||||
LOG.info("AbfsInputStreamStats info: {}",
|
||||
ioStatisticsToPrettyString(ioStatistics));
|
||||
Assertions.assertThat(
|
||||
abfsInputStreamStatistics.getActionHttpGetRequest())
|
||||
lookupMeanStatistic(ioStatistics,
|
||||
AbfsStatistic.HTTP_GET_REQUEST.getStatName()
|
||||
+ StoreStatisticNames.SUFFIX_MEAN).mean())
|
||||
.describedAs("Mismatch in time taken by a GET request")
|
||||
.isGreaterThan(0.0);
|
||||
} finally {
|
||||
|
@ -28,6 +28,12 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
|
||||
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
|
||||
|
||||
/**
|
||||
* Test AbfsOutputStream statistics.
|
||||
@ -241,10 +247,13 @@ public void testAbfsOutputStreamDurationTrackerPutRequest() throws IOException {
|
||||
outputStream.write('a');
|
||||
outputStream.hflush();
|
||||
|
||||
AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics =
|
||||
getAbfsOutputStreamStatistics(outputStream);
|
||||
LOG.info("AbfsOutputStreamStats info: {}", abfsOutputStreamStatistics.toString());
|
||||
Assertions.assertThat(abfsOutputStreamStatistics.getTimeSpentOnPutRequest())
|
||||
IOStatistics ioStatistics = extractStatistics(fs);
|
||||
LOG.info("AbfsOutputStreamStats info: {}",
|
||||
ioStatisticsToPrettyString(ioStatistics));
|
||||
Assertions.assertThat(
|
||||
lookupMeanStatistic(ioStatistics,
|
||||
AbfsStatistic.HTTP_PUT_REQUEST.getStatName()
|
||||
+ StoreStatisticNames.SUFFIX_MEAN).mean())
|
||||
.describedAs("Mismatch in timeSpentOnPutRequest DurationTracker")
|
||||
.isGreaterThan(0.0);
|
||||
}
|
||||
|
@ -26,6 +26,7 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
|
||||
/**
|
||||
* Tests AzureBlobFileSystem Statistics.
|
||||
@ -46,14 +47,21 @@ public void testInitialStatsValues() throws IOException {
|
||||
|
||||
AbfsCounters abfsCounters =
|
||||
new AbfsCountersImpl(getFileSystem().getUri());
|
||||
Map<String, Long> metricMap = abfsCounters.toMap();
|
||||
IOStatistics ioStatistics = abfsCounters.getIOStatistics();
|
||||
|
||||
for (Map.Entry<String, Long> entry : metricMap.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
Long value = entry.getValue();
|
||||
//Initial value verification for counters
|
||||
for (Map.Entry<String, Long> entry : ioStatistics.counters().entrySet()) {
|
||||
checkInitialValue(entry.getKey(), entry.getValue(), 0);
|
||||
}
|
||||
|
||||
//Verify if initial value of statistic is 0.
|
||||
checkInitialValue(key, value);
|
||||
//Initial value verification for gauges
|
||||
for (Map.Entry<String, Long> entry : ioStatistics.gauges().entrySet()) {
|
||||
checkInitialValue(entry.getKey(), entry.getValue(), 0);
|
||||
}
|
||||
|
||||
//Initial value verifications for DurationTrackers
|
||||
for (Map.Entry<String, Long> entry : ioStatistics.maximums().entrySet()) {
|
||||
checkInitialValue(entry.getKey(), entry.getValue(), -1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -251,8 +259,10 @@ Testing exists() calls and rename calls. Since both were called 2
|
||||
*
|
||||
* @param statName name of the statistic to be checked.
|
||||
* @param statValue value of the statistic.
|
||||
* @param expectedInitialValue initial value expected from this statistic.
|
||||
*/
|
||||
private void checkInitialValue(String statName, long statValue) {
|
||||
assertEquals("Mismatch in " + statName, 0, statValue);
|
||||
private void checkInitialValue(String statName, long statValue,
|
||||
long expectedInitialValue) {
|
||||
assertEquals("Mismatch in " + statName, expectedInitialValue, statValue);
|
||||
}
|
||||
}
|
||||
|
@ -39,13 +39,12 @@ public ITestAbfsStreamStatistics() throws Exception {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);
|
||||
|
||||
private static final int LARGE_NUMBER_OF_OPS = 999999;
|
||||
private static final int LARGE_NUMBER_OF_OPS = 99;
|
||||
|
||||
/***
|
||||
* Testing {@code incrementReadOps()} in class {@code AbfsInputStream} and
|
||||
* {@code incrementWriteOps()} in class {@code AbfsOutputStream}.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testAbfsStreamOps() throws Exception {
|
||||
|
@ -21,13 +21,31 @@
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
||||
import org.apache.hadoop.fs.statistics.DurationTracker;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PATCH_REQUEST;
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_POST_REQUEST;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
|
||||
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
|
||||
|
||||
public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestAbfsNetworkStatistics.class);
|
||||
private static final int LARGE_OPERATIONS = 1000;
|
||||
private static final AbfsStatistic[] HTTP_DURATION_TRACKER_LIST = {
|
||||
HTTP_POST_REQUEST,
|
||||
HTTP_PATCH_REQUEST
|
||||
};
|
||||
|
||||
public TestAbfsNetworkStatistics() throws Exception {
|
||||
}
|
||||
@ -64,4 +82,58 @@ public void testAbfsThrottlingStatistics() throws IOException {
|
||||
assertAbfsStatistics(AbfsStatistic.WRITE_THROTTLES, LARGE_OPERATIONS,
|
||||
metricMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check if the DurationTrackers are tracking as expected whilst
|
||||
* doing some work.
|
||||
*/
|
||||
@Test
|
||||
public void testAbfsNetworkDurationTrackers()
|
||||
throws IOException, InterruptedException {
|
||||
describe("Test to verify the actual values of DurationTrackers are "
|
||||
+ "greater than 0.0 while tracking some work.");
|
||||
|
||||
AbfsCounters abfsCounters = new AbfsCountersImpl(getFileSystem().getUri());
|
||||
// Start dummy work for the DurationTrackers and start tracking.
|
||||
try (DurationTracker ignoredPatch =
|
||||
abfsCounters.trackDuration(AbfsStatistic.getStatNameFromHttpCall(AbfsHttpConstants.HTTP_METHOD_PATCH));
|
||||
DurationTracker ignoredPost =
|
||||
abfsCounters.trackDuration(AbfsStatistic.getStatNameFromHttpCall(AbfsHttpConstants.HTTP_METHOD_POST))
|
||||
) {
|
||||
// Emulates doing some work.
|
||||
Thread.sleep(10);
|
||||
LOG.info("Execute some Http requests...");
|
||||
}
|
||||
|
||||
// Extract the iostats from the abfsCounters instance.
|
||||
IOStatistics ioStatistics = extractStatistics(abfsCounters);
|
||||
// Asserting that the durationTrackers have mean > 0.0.
|
||||
for (AbfsStatistic abfsStatistic : HTTP_DURATION_TRACKER_LIST) {
|
||||
Assertions.assertThat(lookupMeanStatistic(ioStatistics,
|
||||
abfsStatistic.getStatName() + StoreStatisticNames.SUFFIX_MEAN).mean())
|
||||
.describedAs("The DurationTracker Named " + abfsStatistic.getStatName()
|
||||
+ " Doesn't match the expected value")
|
||||
.isGreaterThan(0.0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check if abfs counter for HTTP 503 statusCode works correctly
|
||||
* when incremented.
|
||||
*/
|
||||
@Test
|
||||
public void testAbfsHTTP503ErrorCounter() throws IOException {
|
||||
describe("tests to verify the expected value of the HTTP 503 error "
|
||||
+ "counter is equal to number of times incremented.");
|
||||
|
||||
AbfsCounters abfsCounters = new AbfsCountersImpl(getFileSystem().getUri());
|
||||
// Incrementing the server_unavailable counter.
|
||||
for (int i = 0; i < LARGE_OPERATIONS; i++) {
|
||||
abfsCounters.incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1);
|
||||
}
|
||||
// Getting the IOStatistics counter map from abfsCounters.
|
||||
Map<String, Long> metricsMap = abfsCounters.toMap();
|
||||
assertAbfsStatistics(AbfsStatistic.SERVER_UNAVAILABLE, LARGE_OPERATIONS,
|
||||
metricsMap);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user