diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java index 97ea2a6486..1c6ce17a38 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java @@ -19,9 +19,12 @@ package org.apache.hadoop.fs.azurebfs.services; import java.net.HttpURLConnection; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; + /** * Throttles Azure Blob File System read and write operations to achieve maximum * throughput by minimizing errors. The errors occur when the account ingress @@ -37,6 +40,7 @@ public final class AbfsClientThrottlingIntercept { private static final Logger LOG = LoggerFactory.getLogger( AbfsClientThrottlingIntercept.class); + private static final String RANGE_PREFIX = "bytes="; private static AbfsClientThrottlingIntercept singleton = null; private AbfsClientThrottlingAnalyzer readThrottler = null; private AbfsClientThrottlingAnalyzer writeThrottler = null; @@ -82,7 +86,8 @@ static void updateMetrics(AbfsRestOperationType operationType, } break; case ReadFile: - contentLength = abfsHttpOperation.getBytesReceived(); + String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE); + contentLength = getContentLengthIfKnown(range); if (contentLength > 0) { singleton.readThrottler.addBytesTransferred(contentLength, isFailedOperation); @@ -114,4 +119,17 @@ static void sendingRequest(AbfsRestOperationType operationType) { break; } } -} \ No newline at end of file + + private static long getContentLengthIfKnown(String range) { + long contentLength = 0; + // Format is "bytes=%d-%d" + if (range != null && range.startsWith(RANGE_PREFIX)) { + String[] offsets = range.substring(RANGE_PREFIX.length()).split("-"); + if (offsets.length == 2) { + contentLength = Long.parseLong(offsets[1]) - Long.parseLong(offsets[0]) + + 1; + } + } + return contentLength; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 9a7187996d..3f5717ee7e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -156,9 +156,10 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS client.getAccessToken()); } + AbfsClientThrottlingIntercept.sendingRequest(operationType); + if (hasRequestBody) { // HttpUrlConnection requires - AbfsClientThrottlingIntercept.sendingRequest(operationType); httpOperation.sendRequest(buffer, bufferOffset, bufferLength); }