HADOOP-18146: ABFS: Added changes for expect hundred continue header #4039
This change lets the client react pre-emptively to server load without getting to 503 and the exponential backoff which follows. This stops performance suffering so much as capacity limits are approached for an account. Contributed by Anmol Asranii
This commit is contained in:
parent
2b156c2b32
commit
6306f5b2bc
@ -48,7 +48,11 @@
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
|
||||
<suppress checks="ParameterNumber|VisibilityModifier"
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]ITestSmallWriteOptimization.java"/>
|
||||
<suppress checks="VisibilityModifier"
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]ITestAbfsRestOperation.java"/>
|
||||
<!-- allow tests to use _ for ordering. -->
|
||||
<suppress checks="MethodName"
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
|
||||
<suppress checks="ParameterNumber"
|
||||
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
|
||||
</suppressions>
|
||||
|
@ -117,6 +117,11 @@ public class AbfsConfiguration{
|
||||
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
|
||||
private boolean optimizeFooterRead;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(
|
||||
ConfigurationKey = FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED,
|
||||
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED)
|
||||
private boolean isExpectHeaderEnabled;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
|
||||
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
|
||||
private boolean accountThrottlingEnabled;
|
||||
@ -706,6 +711,10 @@ public String getAppendBlobDirs() {
|
||||
return this.azureAppendBlobDirs;
|
||||
}
|
||||
|
||||
public boolean isExpectHeaderEnabled() {
|
||||
return this.isExpectHeaderEnabled;
|
||||
}
|
||||
|
||||
public boolean accountThrottlingEnabled() {
|
||||
return accountThrottlingEnabled;
|
||||
}
|
||||
|
@ -693,6 +693,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
|
||||
}
|
||||
return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
|
||||
.withWriteBufferSize(bufferSize)
|
||||
.enableExpectHeader(abfsConfiguration.isExpectHeaderEnabled())
|
||||
.enableFlush(abfsConfiguration.isFlushEnabled())
|
||||
.enableSmallWriteOptimization(abfsConfiguration.isSmallWriteOptimizationEnabled())
|
||||
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
|
||||
|
@ -64,6 +64,11 @@ public final class AbfsHttpConstants {
|
||||
public static final String HTTP_METHOD_PATCH = "PATCH";
|
||||
public static final String HTTP_METHOD_POST = "POST";
|
||||
public static final String HTTP_METHOD_PUT = "PUT";
|
||||
/**
|
||||
* All status codes less than http 100 signify error
|
||||
* and should qualify for retry.
|
||||
*/
|
||||
public static final int HTTP_CONTINUE = 100;
|
||||
|
||||
// Abfs generic constants
|
||||
public static final String SINGLE_WHITE_SPACE = " ";
|
||||
@ -103,6 +108,9 @@ public final class AbfsHttpConstants {
|
||||
public static final String DEFAULT_SCOPE = "default:";
|
||||
public static final String PERMISSION_FORMAT = "%04d";
|
||||
public static final String SUPER_USER = "$superuser";
|
||||
// The HTTP 100 Continue informational status response code indicates that everything so far
|
||||
// is OK and that the client should continue with the request or ignore it if it is already finished.
|
||||
public static final String HUNDRED_CONTINUE = "100-continue";
|
||||
|
||||
public static final char CHAR_FORWARD_SLASH = '/';
|
||||
public static final char CHAR_EXCLAMATION_POINT = '!';
|
||||
|
@ -35,6 +35,11 @@ public final class ConfigurationKeys {
|
||||
* path to determine HNS status.
|
||||
*/
|
||||
public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled";
|
||||
/**
|
||||
* Enable or disable expect hundred continue header.
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = "fs.azure.account.expect.header.enabled";
|
||||
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
|
||||
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
|
||||
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
|
||||
|
@ -32,7 +32,7 @@
|
||||
public final class FileSystemConfigurations {
|
||||
|
||||
public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";
|
||||
|
||||
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
|
||||
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";
|
||||
|
||||
private static final int SIXTY_SECONDS = 60 * 1000;
|
||||
|
@ -70,6 +70,7 @@ public final class HttpHeaderConfigurations {
|
||||
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
|
||||
public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
|
||||
public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
|
||||
public static final String EXPECT = "Expect";
|
||||
|
||||
private HttpHeaderConfigurations() {}
|
||||
}
|
||||
|
@ -30,6 +30,9 @@
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class InvalidAbfsRestOperationException extends AbfsRestOperationException {
|
||||
|
||||
private static final String ERROR_MESSAGE = "InvalidAbfsRestOperationException";
|
||||
|
||||
public InvalidAbfsRestOperationException(
|
||||
final Exception innerException) {
|
||||
super(
|
||||
@ -37,7 +40,23 @@ public InvalidAbfsRestOperationException(
|
||||
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
|
||||
innerException != null
|
||||
? innerException.toString()
|
||||
: "InvalidAbfsRestOperationException",
|
||||
: ERROR_MESSAGE,
|
||||
innerException);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the retry count along with the exception.
|
||||
* @param innerException The inner exception which is originally caught.
|
||||
* @param retryCount The retry count when the exception was thrown.
|
||||
*/
|
||||
public InvalidAbfsRestOperationException(
|
||||
final Exception innerException, int retryCount) {
|
||||
super(
|
||||
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
|
||||
AzureServiceErrorCode.UNKNOWN.getErrorCode(),
|
||||
innerException != null
|
||||
? innerException.toString()
|
||||
: ERROR_MESSAGE + " RetryCount: " + retryCount,
|
||||
innerException);
|
||||
}
|
||||
}
|
||||
|
@ -34,19 +34,22 @@ public enum Mode {
|
||||
private final Mode mode;
|
||||
private final boolean isAppendBlob;
|
||||
private final String leaseId;
|
||||
private boolean isExpectHeaderEnabled;
|
||||
|
||||
public AppendRequestParameters(final long position,
|
||||
final int offset,
|
||||
final int length,
|
||||
final Mode mode,
|
||||
final boolean isAppendBlob,
|
||||
final String leaseId) {
|
||||
final String leaseId,
|
||||
final boolean isExpectHeaderEnabled) {
|
||||
this.position = position;
|
||||
this.offset = offset;
|
||||
this.length = length;
|
||||
this.mode = mode;
|
||||
this.isAppendBlob = isAppendBlob;
|
||||
this.leaseId = leaseId;
|
||||
this.isExpectHeaderEnabled = isExpectHeaderEnabled;
|
||||
}
|
||||
|
||||
public long getPosition() {
|
||||
@ -72,4 +75,12 @@ public boolean isAppendBlob() {
|
||||
public String getLeaseId() {
|
||||
return this.leaseId;
|
||||
}
|
||||
|
||||
public boolean isExpectHeaderEnabled() {
|
||||
return isExpectHeaderEnabled;
|
||||
}
|
||||
|
||||
public void setExpectHeaderEnabled(boolean expectHeaderEnabled) {
|
||||
isExpectHeaderEnabled = expectHeaderEnabled;
|
||||
}
|
||||
}
|
||||
|
@ -77,6 +77,7 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
|
||||
|
||||
/**
|
||||
@ -656,6 +657,9 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
|
||||
throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
addCustomerProvidedKeyHeaders(requestHeaders);
|
||||
if (reqParams.isExpectHeaderEnabled()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
|
||||
}
|
||||
// JDK7 does not support PATCH, so to workaround the issue we will use
|
||||
// PUT and specify the real method in the X-Http-Method-Override header.
|
||||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
|
||||
@ -681,29 +685,7 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
|
||||
abfsUriQueryBuilder, cachedSasToken);
|
||||
|
||||
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
|
||||
final AbfsRestOperation op = new AbfsRestOperation(
|
||||
AbfsRestOperationType.Append,
|
||||
this,
|
||||
HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders,
|
||||
buffer,
|
||||
reqParams.getoffset(),
|
||||
reqParams.getLength(),
|
||||
sasTokenForReuse);
|
||||
try {
|
||||
op.execute(tracingContext);
|
||||
} catch (AzureBlobFileSystemException e) {
|
||||
// If we have no HTTP response, throw the original exception.
|
||||
if (!op.hasResult()) {
|
||||
throw e;
|
||||
}
|
||||
if (reqParams.isAppendBlob()
|
||||
&& appendSuccessCheckOp(op, path,
|
||||
(reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
|
||||
final AbfsRestOperation successOp = new AbfsRestOperation(
|
||||
AbfsRestOperationType.Append,
|
||||
this,
|
||||
final AbfsRestOperation op = getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
|
||||
HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders,
|
||||
@ -711,6 +693,41 @@ && appendSuccessCheckOp(op, path,
|
||||
reqParams.getoffset(),
|
||||
reqParams.getLength(),
|
||||
sasTokenForReuse);
|
||||
try {
|
||||
op.execute(tracingContext);
|
||||
} catch (AzureBlobFileSystemException e) {
|
||||
/*
|
||||
If the http response code indicates a user error we retry
|
||||
the same append request with expect header being disabled.
|
||||
When "100-continue" header is enabled but a non Http 100 response comes,
|
||||
the response message might not get set correctly by the server.
|
||||
So, this handling is to avoid breaking of backward compatibility
|
||||
if someone has taken dependency on the exception message,
|
||||
which is created using the error string present in the response header.
|
||||
*/
|
||||
int responseStatusCode = ((AbfsRestOperationException) e).getStatusCode();
|
||||
if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) {
|
||||
LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path);
|
||||
reqParams.setExpectHeaderEnabled(false);
|
||||
return this.append(path, buffer, reqParams, cachedSasToken,
|
||||
tracingContext);
|
||||
}
|
||||
// If we have no HTTP response, throw the original exception.
|
||||
if (!op.hasResult()) {
|
||||
throw e;
|
||||
}
|
||||
if (reqParams.isAppendBlob()
|
||||
&& appendSuccessCheckOp(op, path,
|
||||
(reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
|
||||
final AbfsRestOperation successOp = getAbfsRestOperationForAppend(
|
||||
AbfsRestOperationType.Append,
|
||||
HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders,
|
||||
buffer,
|
||||
reqParams.getoffset(),
|
||||
reqParams.getLength(),
|
||||
sasTokenForReuse);
|
||||
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
|
||||
return successOp;
|
||||
}
|
||||
@ -720,6 +737,48 @@ && appendSuccessCheckOp(op, path,
|
||||
return op;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the rest operation for append.
|
||||
* @param operationType The AbfsRestOperationType.
|
||||
* @param httpMethod specifies the httpMethod.
|
||||
* @param url specifies the url.
|
||||
* @param requestHeaders This includes the list of request headers.
|
||||
* @param buffer The buffer to write into.
|
||||
* @param bufferOffset The buffer offset.
|
||||
* @param bufferLength The buffer Length.
|
||||
* @param sasTokenForReuse The sasToken.
|
||||
* @return AbfsRestOperation op.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
AbfsRestOperation getAbfsRestOperationForAppend(final AbfsRestOperationType operationType,
|
||||
final String httpMethod,
|
||||
final URL url,
|
||||
final List<AbfsHttpHeader> requestHeaders,
|
||||
final byte[] buffer,
|
||||
final int bufferOffset,
|
||||
final int bufferLength,
|
||||
final String sasTokenForReuse) {
|
||||
return new AbfsRestOperation(
|
||||
operationType,
|
||||
this,
|
||||
httpMethod,
|
||||
url,
|
||||
requestHeaders,
|
||||
buffer,
|
||||
bufferOffset,
|
||||
bufferLength, sasTokenForReuse);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the status code lies in the range of user error.
|
||||
* @param responseStatusCode http response status code.
|
||||
* @return True or False.
|
||||
*/
|
||||
private boolean checkUserError(int responseStatusCode) {
|
||||
return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
|
||||
&& responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
// For AppendBlob its possible that the append succeeded in the backend but the request failed.
|
||||
// However a retry would fail with an InvalidQueryParameterValue
|
||||
// (as the current offset would be unacceptable).
|
||||
|
@ -28,6 +28,8 @@
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
|
||||
|
||||
/**
|
||||
* Throttles Azure Blob File System read and write operations to achieve maximum
|
||||
* throughput by minimizing errors. The errors occur when the account ingress
|
||||
@ -60,7 +62,7 @@ public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsC
|
||||
|
||||
// Hide default constructor
|
||||
private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
|
||||
//Account name is kept as empty as same instance is shared across all accounts
|
||||
// Account name is kept as empty as same instance is shared across all accounts.
|
||||
this.accountName = "";
|
||||
this.readThrottler = setAnalyzer("read", abfsConfiguration);
|
||||
this.writeThrottler = setAnalyzer("write", abfsConfiguration);
|
||||
@ -114,6 +116,18 @@ static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsC
|
||||
return singleton;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the metrics for the case when response code signifies throttling
|
||||
* but there are some expected bytes to be sent.
|
||||
* @param isThrottledOperation returns true if status code is HTTP_UNAVAILABLE
|
||||
* @param abfsHttpOperation Used for status code and data transferred.
|
||||
* @return true if the operation is throttled and has some bytes to transfer.
|
||||
*/
|
||||
private boolean updateBytesTransferred(boolean isThrottledOperation,
|
||||
AbfsHttpOperation abfsHttpOperation) {
|
||||
return isThrottledOperation && abfsHttpOperation.getExpectedBytesToBeSent() > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the metrics for successful and failed read and write operations.
|
||||
* @param operationType Only applicable for read and write operations.
|
||||
@ -134,9 +148,22 @@ public void updateMetrics(AbfsRestOperationType operationType,
|
||||
boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
|
||||
|| status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
|
||||
|
||||
// If status code is 503, it is considered as a throttled operation.
|
||||
boolean isThrottledOperation = (status == HTTP_UNAVAILABLE);
|
||||
|
||||
switch (operationType) {
|
||||
case Append:
|
||||
contentLength = abfsHttpOperation.getBytesSent();
|
||||
if (contentLength == 0) {
|
||||
/*
|
||||
Signifies the case where we could not update the bytesSent due to
|
||||
throttling but there were some expectedBytesToBeSent.
|
||||
*/
|
||||
if (updateBytesTransferred(isThrottledOperation, abfsHttpOperation)) {
|
||||
LOG.debug("Updating metrics due to throttling for path {}", abfsHttpOperation.getConnUrl().getPath());
|
||||
contentLength = abfsHttpOperation.getExpectedBytesToBeSent();
|
||||
}
|
||||
}
|
||||
if (contentLength > 0) {
|
||||
writeThrottler.addBytesTransferred(contentLength,
|
||||
isFailedOperation);
|
||||
|
@ -42,6 +42,9 @@
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
|
||||
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
|
||||
|
||||
/**
|
||||
* Represents an HTTP operation.
|
||||
*/
|
||||
@ -72,6 +75,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
||||
|
||||
// metrics
|
||||
private int bytesSent;
|
||||
private int expectedBytesToBeSent;
|
||||
private long bytesReceived;
|
||||
|
||||
// optional trace enabled metrics
|
||||
@ -154,6 +158,10 @@ public int getBytesSent() {
|
||||
return bytesSent;
|
||||
}
|
||||
|
||||
public int getExpectedBytesToBeSent() {
|
||||
return expectedBytesToBeSent;
|
||||
}
|
||||
|
||||
public long getBytesReceived() {
|
||||
return bytesReceived;
|
||||
}
|
||||
@ -281,7 +289,7 @@ public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttp
|
||||
this.connection.setRequestMethod(method);
|
||||
|
||||
for (AbfsHttpHeader header : requestHeaders) {
|
||||
this.connection.setRequestProperty(header.getName(), header.getValue());
|
||||
setRequestProperty(header.getName(), header.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@ -313,13 +321,44 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
|
||||
if (this.isTraceEnabled) {
|
||||
startTime = System.nanoTime();
|
||||
}
|
||||
try (OutputStream outputStream = this.connection.getOutputStream()) {
|
||||
// update bytes sent before they are sent so we may observe
|
||||
// attempted sends as well as successful sends via the
|
||||
// accompanying statusCode
|
||||
OutputStream outputStream = null;
|
||||
// Updates the expected bytes to be sent based on length.
|
||||
this.expectedBytesToBeSent = length;
|
||||
try {
|
||||
try {
|
||||
/* Without expect header enabled, if getOutputStream() throws
|
||||
an exception, it gets caught by the restOperation. But with
|
||||
expect header enabled we return back without throwing an exception
|
||||
for the correct response code processing.
|
||||
*/
|
||||
outputStream = getConnOutputStream();
|
||||
} catch (IOException e) {
|
||||
/* If getOutputStream fails with an exception and expect header
|
||||
is enabled, we return back without throwing an exception to
|
||||
the caller. The caller is responsible for setting the correct status code.
|
||||
If expect header is not enabled, we throw back the exception.
|
||||
*/
|
||||
String expectHeader = getConnProperty(EXPECT);
|
||||
if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
|
||||
LOG.debug("Getting output stream failed with expect header enabled, returning back ", e);
|
||||
return;
|
||||
} else {
|
||||
LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
// update bytes sent for successful as well as failed attempts via the
|
||||
// accompanying statusCode.
|
||||
this.bytesSent = length;
|
||||
|
||||
// If this fails with or without expect header enabled,
|
||||
// it throws an IOException.
|
||||
outputStream.write(buffer, offset, length);
|
||||
} finally {
|
||||
// Closing the opened output stream
|
||||
if (outputStream != null) {
|
||||
outputStream.close();
|
||||
}
|
||||
if (this.isTraceEnabled) {
|
||||
this.sendRequestTimeMs = elapsedTimeMs(startTime);
|
||||
}
|
||||
@ -343,13 +382,13 @@ public void processResponse(final byte[] buffer, final int offset, final int len
|
||||
startTime = System.nanoTime();
|
||||
}
|
||||
|
||||
this.statusCode = this.connection.getResponseCode();
|
||||
this.statusCode = getConnResponseCode();
|
||||
|
||||
if (this.isTraceEnabled) {
|
||||
this.recvResponseTimeMs = elapsedTimeMs(startTime);
|
||||
}
|
||||
|
||||
this.statusDescription = this.connection.getResponseMessage();
|
||||
this.statusDescription = getConnResponseMessage();
|
||||
|
||||
this.requestId = this.connection.getHeaderField(HttpHeaderConfigurations.X_MS_REQUEST_ID);
|
||||
if (this.requestId == null) {
|
||||
@ -542,6 +581,58 @@ private boolean isNullInputStream(InputStream stream) {
|
||||
return stream == null ? true : false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection request property for a key.
|
||||
* @param key The request property key.
|
||||
* @return request peoperty value.
|
||||
*/
|
||||
String getConnProperty(String key) {
|
||||
return connection.getRequestProperty(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection url.
|
||||
* @return url.
|
||||
*/
|
||||
URL getConnUrl() {
|
||||
return connection.getURL();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection request method.
|
||||
* @return request method.
|
||||
*/
|
||||
String getConnRequestMethod() {
|
||||
return connection.getRequestMethod();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection response code.
|
||||
* @return response code.
|
||||
* @throws IOException
|
||||
*/
|
||||
Integer getConnResponseCode() throws IOException {
|
||||
return connection.getResponseCode();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection output stream.
|
||||
* @return output stream.
|
||||
* @throws IOException
|
||||
*/
|
||||
OutputStream getConnOutputStream() throws IOException {
|
||||
return connection.getOutputStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the connection response message.
|
||||
* @return response message.
|
||||
* @throws IOException
|
||||
*/
|
||||
String getConnResponseMessage() throws IOException {
|
||||
return connection.getResponseMessage();
|
||||
}
|
||||
|
||||
public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation {
|
||||
/**
|
||||
* Creates an instance to represent fixed results.
|
||||
|
@ -80,6 +80,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
|
||||
private boolean disableOutputStreamFlush;
|
||||
private boolean enableSmallWriteOptimization;
|
||||
private boolean isAppendBlob;
|
||||
private boolean isExpectHeaderEnabled;
|
||||
private volatile IOException lastError;
|
||||
|
||||
private long lastFlushOffset;
|
||||
@ -133,6 +134,7 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
|
||||
this.position = abfsOutputStreamContext.getPosition();
|
||||
this.closed = false;
|
||||
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
|
||||
this.isExpectHeaderEnabled = abfsOutputStreamContext.isExpectHeaderEnabled();
|
||||
this.disableOutputStreamFlush = abfsOutputStreamContext
|
||||
.isDisableOutputStreamFlush();
|
||||
this.enableSmallWriteOptimization
|
||||
@ -327,7 +329,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
|
||||
* leaseId - The AbfsLeaseId for this request.
|
||||
*/
|
||||
AppendRequestParameters reqParams = new AppendRequestParameters(
|
||||
offset, 0, bytesLength, mode, false, leaseId);
|
||||
offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled);
|
||||
AbfsRestOperation op =
|
||||
client.append(path, blockUploadData.toByteArray(), reqParams,
|
||||
cachedSasToken.get(), new TracingContext(tracingContext));
|
||||
@ -573,7 +575,7 @@ private void writeAppendBlobCurrentBufferToService() throws IOException {
|
||||
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
|
||||
"writeCurrentBufferToService", "append")) {
|
||||
AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
|
||||
bytesLength, APPEND_MODE, true, leaseId);
|
||||
bytesLength, APPEND_MODE, true, leaseId, isExpectHeaderEnabled);
|
||||
AbfsRestOperation op = client.append(path, uploadData.toByteArray(), reqParams,
|
||||
cachedSasToken.get(), new TracingContext(tracingContext));
|
||||
cachedSasToken.update(op.getSasToken());
|
||||
|
@ -33,6 +33,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
|
||||
|
||||
private boolean enableFlush;
|
||||
|
||||
private boolean enableExpectHeader;
|
||||
|
||||
private boolean enableSmallWriteOptimization;
|
||||
|
||||
private boolean disableOutputStreamFlush;
|
||||
@ -78,6 +80,11 @@ public AbfsOutputStreamContext enableFlush(final boolean enableFlush) {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsOutputStreamContext enableExpectHeader(final boolean enableExpectHeader) {
|
||||
this.enableExpectHeader = enableExpectHeader;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsOutputStreamContext enableSmallWriteOptimization(final boolean enableSmallWriteOptimization) {
|
||||
this.enableSmallWriteOptimization = enableSmallWriteOptimization;
|
||||
return this;
|
||||
@ -184,6 +191,10 @@ public boolean isEnableFlush() {
|
||||
return enableFlush;
|
||||
}
|
||||
|
||||
public boolean isExpectHeaderEnabled() {
|
||||
return enableExpectHeader;
|
||||
}
|
||||
|
||||
public boolean isDisableOutputStreamFlush() {
|
||||
return disableOutputStreamFlush;
|
||||
}
|
||||
|
@ -25,6 +25,7 @@
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -38,6 +39,8 @@
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
|
||||
|
||||
/**
|
||||
* The AbfsRestOperation for Rest AbfsClient.
|
||||
*/
|
||||
@ -236,11 +239,21 @@ private void completeExecute(TracingContext tracingContext)
|
||||
}
|
||||
}
|
||||
|
||||
if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) {
|
||||
int status = result.getStatusCode();
|
||||
/*
|
||||
If even after exhausting all retries, the http status code has an
|
||||
invalid value it qualifies for InvalidAbfsRestOperationException.
|
||||
All http status code less than 1xx range are considered as invalid
|
||||
status codes.
|
||||
*/
|
||||
if (status < HTTP_CONTINUE) {
|
||||
throw new InvalidAbfsRestOperationException(null, retryCount);
|
||||
}
|
||||
|
||||
if (status >= HttpURLConnection.HTTP_BAD_REQUEST) {
|
||||
throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(),
|
||||
result.getStorageErrorMessage(), null, result);
|
||||
}
|
||||
|
||||
LOG.trace("{} REST operation complete", operationType);
|
||||
}
|
||||
|
||||
@ -268,7 +281,7 @@ private boolean executeHttpOperation(final int retryCount,
|
||||
case Custom:
|
||||
case OAuth:
|
||||
LOG.debug("Authenticating request with OAuth2 access token");
|
||||
httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
|
||||
httpOperation.setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
|
||||
client.getAccessToken());
|
||||
break;
|
||||
case SAS:
|
||||
@ -319,7 +332,7 @@ private boolean executeHttpOperation(final int retryCount,
|
||||
LOG.warn("Unknown host name: {}. Retrying to resolve the host name...",
|
||||
hostname);
|
||||
if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
|
||||
throw new InvalidAbfsRestOperationException(ex);
|
||||
throw new InvalidAbfsRestOperationException(ex, retryCount);
|
||||
}
|
||||
return false;
|
||||
} catch (IOException ex) {
|
||||
@ -330,12 +343,25 @@ private boolean executeHttpOperation(final int retryCount,
|
||||
failureReason = RetryReason.getAbbreviation(ex, -1, "");
|
||||
|
||||
if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
|
||||
throw new InvalidAbfsRestOperationException(ex);
|
||||
throw new InvalidAbfsRestOperationException(ex, retryCount);
|
||||
}
|
||||
|
||||
return false;
|
||||
} finally {
|
||||
intercept.updateMetrics(operationType, httpOperation);
|
||||
int status = httpOperation.getStatusCode();
|
||||
/*
|
||||
A status less than 300 (2xx range) or greater than or equal
|
||||
to 500 (5xx range) should contribute to throttling metrics being updated.
|
||||
Less than 200 or greater than or equal to 500 show failed operations. 2xx
|
||||
range contributes to successful operations. 3xx range is for redirects
|
||||
and 4xx range is for user errors. These should not be a part of
|
||||
throttling backoff computation.
|
||||
*/
|
||||
boolean updateMetricsResponseCode = (status < HttpURLConnection.HTTP_MULT_CHOICE
|
||||
|| status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
|
||||
if (updateMetricsResponseCode) {
|
||||
intercept.updateMetrics(operationType, httpOperation);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);
|
||||
|
@ -24,6 +24,8 @@
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
|
||||
|
||||
/**
|
||||
* Retry policy used by AbfsClient.
|
||||
* */
|
||||
@ -118,7 +120,9 @@ public ExponentialRetryPolicy(final int retryCount, final int minBackoff, final
|
||||
|
||||
/**
|
||||
* Returns if a request should be retried based on the retry count, current response,
|
||||
* and the current strategy.
|
||||
* and the current strategy. The valid http status code lies in the range of 1xx-5xx.
|
||||
* But an invalid status code might be set due to network or timeout kind of issues.
|
||||
* Such invalid status code also qualify for retry.
|
||||
*
|
||||
* @param retryCount The current retry attempt count.
|
||||
* @param statusCode The status code of the response, or -1 for socket error.
|
||||
@ -126,7 +130,7 @@ public ExponentialRetryPolicy(final int retryCount, final int minBackoff, final
|
||||
*/
|
||||
public boolean shouldRetry(final int retryCount, final int statusCode) {
|
||||
return retryCount < this.retryCount
|
||||
&& (statusCode == -1
|
||||
&& (statusCode < HTTP_CONTINUE
|
||||
|| statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT
|
||||
|| (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR
|
||||
&& statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED
|
||||
|
@ -139,6 +139,10 @@ public void setOperation(FSOperationType operation) {
|
||||
this.opType = operation;
|
||||
}
|
||||
|
||||
public int getRetryCount() {
|
||||
return retryCount;
|
||||
}
|
||||
|
||||
public void setRetryCount(int retryCount) {
|
||||
this.retryCount = retryCount;
|
||||
}
|
||||
|
@ -767,6 +767,17 @@ Hflush() being the only documented API that can provide persistent data
|
||||
transfer, Flush() also attempting to persist buffered data will lead to
|
||||
performance issues.
|
||||
|
||||
### <a name="100continueconfigoptions"></a> Hundred Continue Options
|
||||
|
||||
`fs.azure.account.expect.header.enabled`: This configuration parameter is used
|
||||
to specify whether you wish to send a expect 100 continue header with each
|
||||
append request or not. It is configured to true by default. This flag configures
|
||||
the client to check with the Azure store before uploading a block of data from
|
||||
an output stream. This allows the client to throttle back gracefully -before
|
||||
actually attempting to upload the block. In experiments this provides
|
||||
significant throughput improvements under heavy load. For more information :
|
||||
- https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Expect
|
||||
|
||||
|
||||
### <a name="accountlevelthrottlingoptions"></a> Account level throttling Options
|
||||
|
||||
|
@ -42,7 +42,7 @@
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
|
||||
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
|
||||
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
|
||||
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
|
||||
@ -254,7 +254,7 @@ public Hashtable<String, String> call() throws Exception {
|
||||
}
|
||||
|
||||
public AccessTokenProvider getAccessTokenProvider(final AzureBlobFileSystem fs) {
|
||||
return TestAbfsClient.getAccessTokenProvider(fs.getAbfsStore().getClient());
|
||||
return ITestAbfsClient.getAccessTokenProvider(fs.getAbfsStore().getClient());
|
||||
}
|
||||
|
||||
public void loadConfiguredFileSystem() throws Exception {
|
||||
|
@ -43,7 +43,7 @@
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
|
||||
|
||||
@ -362,7 +362,7 @@ public void testNegativeScenariosForCreateOverwriteDisabled()
|
||||
// Get mock AbfsClient with current config
|
||||
AbfsClient
|
||||
mockClient
|
||||
= TestAbfsClient.getMockAbfsClient(
|
||||
= ITestAbfsClient.getMockAbfsClient(
|
||||
fs.getAbfsStore().getClient(),
|
||||
fs.getAbfsStore().getAbfsConfiguration());
|
||||
|
||||
|
@ -35,7 +35,7 @@
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsPerfTracker;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TestMockHelpers;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
@ -176,7 +176,7 @@ public void testDeleteIdempotency() throws Exception {
|
||||
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
AbfsClient abfsClient = fs.getAbfsStore().getClient();
|
||||
AbfsClient testClient = TestAbfsClient.createTestClientFromCurrentContext(
|
||||
AbfsClient testClient = ITestAbfsClient.createTestClientFromCurrentContext(
|
||||
abfsClient,
|
||||
abfsConfig);
|
||||
|
||||
@ -223,7 +223,7 @@ public void testDeleteIdempotency() throws Exception {
|
||||
public void testDeleteIdempotencyTriggerHttp404() throws Exception {
|
||||
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
AbfsClient client = TestAbfsClient.createTestClientFromCurrentContext(
|
||||
AbfsClient client = ITestAbfsClient.createTestClientFromCurrentContext(
|
||||
fs.getAbfsStore().getClient(),
|
||||
this.getConfiguration());
|
||||
|
||||
@ -242,7 +242,7 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
|
||||
getTestTracingContext(fs, true)));
|
||||
|
||||
// mock idempotency check to mimic retried case
|
||||
AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
|
||||
AbfsClient mockClient = ITestAbfsClient.getMockAbfsClient(
|
||||
fs.getAbfsStore().getClient(),
|
||||
this.getConfiguration());
|
||||
AzureBlobFileSystemStore mockStore = mock(AzureBlobFileSystemStore.class);
|
||||
@ -257,10 +257,10 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
|
||||
|
||||
// Case 2: Mimic retried case
|
||||
// Idempotency check on Delete always returns success
|
||||
AbfsRestOperation idempotencyRetOp = TestAbfsClient.getRestOp(
|
||||
AbfsRestOperation idempotencyRetOp = ITestAbfsClient.getRestOp(
|
||||
DeletePath, mockClient, HTTP_METHOD_DELETE,
|
||||
TestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
|
||||
TestAbfsClient.getTestRequestHeaders(mockClient));
|
||||
ITestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
|
||||
ITestAbfsClient.getTestRequestHeaders(mockClient));
|
||||
idempotencyRetOp.hardSetResult(HTTP_OK);
|
||||
|
||||
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
|
||||
|
@ -203,7 +203,7 @@ public void testAppendWithCPK() throws Exception {
|
||||
// Trying to append with correct CPK headers
|
||||
AppendRequestParameters appendRequestParameters =
|
||||
new AppendRequestParameters(
|
||||
0, 0, 5, Mode.APPEND_MODE, false, null);
|
||||
0, 0, 5, Mode.APPEND_MODE, false, null, true);
|
||||
byte[] buffer = getRandomBytesArray(5);
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
AbfsRestOperation abfsRestOperation = abfsClient
|
||||
@ -248,7 +248,7 @@ public void testAppendWithoutCPK() throws Exception {
|
||||
// Trying to append without CPK headers
|
||||
AppendRequestParameters appendRequestParameters =
|
||||
new AppendRequestParameters(
|
||||
0, 0, 5, Mode.APPEND_MODE, false, null);
|
||||
0, 0, 5, Mode.APPEND_MODE, false, null, true);
|
||||
byte[] buffer = getRandomBytesArray(5);
|
||||
AbfsClient abfsClient = fs.getAbfsClient();
|
||||
AbfsRestOperation abfsRestOperation = abfsClient
|
||||
|
@ -20,20 +20,43 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.ProtocolException;
|
||||
import java.net.URL;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
|
||||
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
@ -59,14 +82,19 @@
|
||||
* Test useragent of abfs client.
|
||||
*
|
||||
*/
|
||||
public final class TestAbfsClient {
|
||||
public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
||||
|
||||
private static final String ACCOUNT_NAME = "bogusAccountName.dfs.core.windows.net";
|
||||
private static final String FS_AZURE_USER_AGENT_PREFIX = "Partner Service";
|
||||
private static final String TEST_PATH = "/testfile";
|
||||
public static final int REDUCED_RETRY_COUNT = 2;
|
||||
public static final int REDUCED_BACKOFF_INTERVAL = 100;
|
||||
public static final int BUFFER_LENGTH = 5;
|
||||
public static final int BUFFER_OFFSET = 0;
|
||||
|
||||
private final Pattern userAgentStringPattern;
|
||||
|
||||
public TestAbfsClient(){
|
||||
public ITestAbfsClient() throws Exception {
|
||||
StringBuilder regEx = new StringBuilder();
|
||||
regEx.append("^");
|
||||
regEx.append(APN_VERSION);
|
||||
@ -124,7 +152,7 @@ public void verifybBasicInfo() throws Exception {
|
||||
}
|
||||
|
||||
private void verifybBasicInfo(String userAgentStr) {
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string [" + userAgentStr
|
||||
+ "] should be of the pattern: " + this.userAgentStringPattern.pattern())
|
||||
.matches(this.userAgentStringPattern)
|
||||
@ -153,7 +181,7 @@ public void verifyUserAgentPrefix()
|
||||
String userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should contain " + FS_AZURE_USER_AGENT_PREFIX)
|
||||
.contains(FS_AZURE_USER_AGENT_PREFIX);
|
||||
|
||||
@ -163,7 +191,7 @@ public void verifyUserAgentPrefix()
|
||||
userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should not contain " + FS_AZURE_USER_AGENT_PREFIX)
|
||||
.doesNotContain(FS_AZURE_USER_AGENT_PREFIX);
|
||||
}
|
||||
@ -179,14 +207,14 @@ public void verifyUserAgentWithoutSSLProvider() throws Exception {
|
||||
String userAgentStr = getUserAgentString(abfsConfiguration, true);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should contain sslProvider")
|
||||
.contains(DelegatingSSLSocketFactory.getDefaultFactory().getProviderName());
|
||||
|
||||
userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should not contain sslProvider")
|
||||
.doesNotContain(DelegatingSSLSocketFactory.getDefaultFactory().getProviderName());
|
||||
}
|
||||
@ -202,7 +230,7 @@ public void verifyUserAgentClusterName() throws Exception {
|
||||
String userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should contain cluster name")
|
||||
.contains(clusterName);
|
||||
|
||||
@ -212,7 +240,7 @@ public void verifyUserAgentClusterName() throws Exception {
|
||||
userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should not contain cluster name")
|
||||
.doesNotContain(clusterName)
|
||||
.describedAs("User-Agent string should contain UNKNOWN as cluster name config is absent")
|
||||
@ -230,7 +258,7 @@ public void verifyUserAgentClusterType() throws Exception {
|
||||
String userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should contain cluster type")
|
||||
.contains(clusterType);
|
||||
|
||||
@ -240,7 +268,7 @@ public void verifyUserAgentClusterType() throws Exception {
|
||||
userAgentStr = getUserAgentString(abfsConfiguration, false);
|
||||
|
||||
verifybBasicInfo(userAgentStr);
|
||||
assertThat(userAgentStr)
|
||||
Assertions.assertThat(userAgentStr)
|
||||
.describedAs("User-Agent string should not contain cluster type")
|
||||
.doesNotContain(clusterType)
|
||||
.describedAs("User-Agent string should contain UNKNOWN as cluster type config is absent")
|
||||
@ -311,24 +339,23 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
|
||||
AbfsThrottlingInterceptFactory.getInstance(
|
||||
abfsConfig.getAccountName().substring(0,
|
||||
abfsConfig.getAccountName().indexOf(DOT)), abfsConfig));
|
||||
|
||||
// override baseurl
|
||||
client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
|
||||
client = ITestAbfsClient.setAbfsClientField(client, "abfsConfiguration",
|
||||
abfsConfig);
|
||||
|
||||
// override baseurl
|
||||
client = TestAbfsClient.setAbfsClientField(client, "baseUrl",
|
||||
client = ITestAbfsClient.setAbfsClientField(client, "baseUrl",
|
||||
baseAbfsClientInstance.getBaseUrl());
|
||||
|
||||
// override auth provider
|
||||
if (currentAuthType == AuthType.SharedKey) {
|
||||
client = TestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials",
|
||||
client = ITestAbfsClient.setAbfsClientField(client, "sharedKeyCredentials",
|
||||
new SharedKeyCredentials(
|
||||
abfsConfig.getAccountName().substring(0,
|
||||
abfsConfig.getAccountName().indexOf(DOT)),
|
||||
abfsConfig.getStorageAccountKey()));
|
||||
} else {
|
||||
client = TestAbfsClient.setAbfsClientField(client, "tokenProvider",
|
||||
client = ITestAbfsClient.setAbfsClientField(client, "tokenProvider",
|
||||
abfsConfig.getTokenProvider());
|
||||
}
|
||||
|
||||
@ -336,7 +363,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
|
||||
String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild "
|
||||
+ "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; "
|
||||
+ "UNKNOWN/UNKNOWN) MSFT";
|
||||
client = TestAbfsClient.setAbfsClientField(client, "userAgent", userAgent);
|
||||
client = ITestAbfsClient.setAbfsClientField(client, "userAgent", userAgent);
|
||||
|
||||
return client;
|
||||
}
|
||||
@ -404,4 +431,156 @@ public static AbfsRestOperation getRestOp(AbfsRestOperationType type,
|
||||
public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) {
|
||||
return client.getTokenProvider();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test helper method to get random bytes array.
|
||||
* @param length The length of byte buffer.
|
||||
* @return byte buffer.
|
||||
*/
|
||||
private byte[] getRandomBytesArray(int length) {
|
||||
final byte[] b = new byte[length];
|
||||
new Random().nextBytes(b);
|
||||
return b;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to verify that client retries append request without
|
||||
* expect header enabled if append with expect header enabled fails
|
||||
* with 4xx kind of error.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testExpectHundredContinue() throws Exception {
|
||||
// Get the filesystem.
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
final Configuration configuration = new Configuration();
|
||||
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
|
||||
AbfsClient abfsClient = fs.getAbfsStore().getClient();
|
||||
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
|
||||
configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
|
||||
|
||||
// Update the configuration with reduced retry count and reduced backoff interval.
|
||||
AbfsConfiguration abfsConfig
|
||||
= TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
|
||||
abfsConfiguration,
|
||||
REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
|
||||
|
||||
// Gets the client.
|
||||
AbfsClient testClient = Mockito.spy(
|
||||
ITestAbfsClient.createTestClientFromCurrentContext(
|
||||
abfsClient,
|
||||
abfsConfig));
|
||||
|
||||
// Create the append request params with expect header enabled initially.
|
||||
AppendRequestParameters appendRequestParameters
|
||||
= new AppendRequestParameters(
|
||||
BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
|
||||
AppendRequestParameters.Mode.APPEND_MODE, false, null, true);
|
||||
|
||||
byte[] buffer = getRandomBytesArray(BUFFER_LENGTH);
|
||||
|
||||
// Create a test container to upload the data.
|
||||
Path testPath = path(TEST_PATH);
|
||||
fs.create(testPath);
|
||||
String finalTestPath = testPath.toString()
|
||||
.substring(testPath.toString().lastIndexOf("/"));
|
||||
|
||||
// Creates a list of request headers.
|
||||
final List<AbfsHttpHeader> requestHeaders
|
||||
= ITestAbfsClient.getTestRequestHeaders(testClient);
|
||||
requestHeaders.add(
|
||||
new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH));
|
||||
if (appendRequestParameters.isExpectHeaderEnabled()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
|
||||
}
|
||||
|
||||
// Updates the query parameters.
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder
|
||||
= testClient.createDefaultUriQueryBuilder();
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION,
|
||||
Long.toString(appendRequestParameters.getPosition()));
|
||||
|
||||
// Creates the url for the specified path.
|
||||
URL url = testClient.createRequestUrl(finalTestPath, abfsUriQueryBuilder.toString());
|
||||
|
||||
// Create a mock of the AbfsRestOperation to set the urlConnection in the corresponding httpOperation.
|
||||
AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
|
||||
AbfsRestOperationType.Append,
|
||||
testClient,
|
||||
HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders, buffer,
|
||||
appendRequestParameters.getoffset(),
|
||||
appendRequestParameters.getLength(), null));
|
||||
|
||||
AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url,
|
||||
HTTP_METHOD_PUT, requestHeaders));
|
||||
|
||||
// Sets the expect request property if expect header is enabled.
|
||||
if (appendRequestParameters.isExpectHeaderEnabled()) {
|
||||
Mockito.doReturn(HUNDRED_CONTINUE).when(abfsHttpOperation)
|
||||
.getConnProperty(EXPECT);
|
||||
}
|
||||
|
||||
HttpURLConnection urlConnection = mock(HttpURLConnection.class);
|
||||
Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito
|
||||
.any(), Mockito.any());
|
||||
Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod();
|
||||
Mockito.doReturn(url).when(urlConnection).getURL();
|
||||
Mockito.doReturn(urlConnection).when(abfsHttpOperation).getConnection();
|
||||
|
||||
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito
|
||||
.any(), Mockito.any());
|
||||
Mockito.doReturn(url).when(abfsHttpOperation).getConnUrl();
|
||||
|
||||
// Give user error code 404 when processResponse is called.
|
||||
Mockito.doReturn(HTTP_METHOD_PUT).when(abfsHttpOperation).getConnRequestMethod();
|
||||
Mockito.doReturn(HTTP_NOT_FOUND).when(abfsHttpOperation).getConnResponseCode();
|
||||
Mockito.doReturn("Resource Not Found")
|
||||
.when(abfsHttpOperation)
|
||||
.getConnResponseMessage();
|
||||
|
||||
// Make the getOutputStream throw IOException to see it returns from the sendRequest correctly.
|
||||
Mockito.doThrow(new ProtocolException("Server rejected Operation"))
|
||||
.when(abfsHttpOperation)
|
||||
.getConnOutputStream();
|
||||
|
||||
// Sets the httpOperation for the rest operation.
|
||||
Mockito.doReturn(abfsHttpOperation)
|
||||
.when(op)
|
||||
.createHttpOperation();
|
||||
|
||||
// Mock the restOperation for the client.
|
||||
Mockito.doReturn(op)
|
||||
.when(testClient)
|
||||
.getAbfsRestOperationForAppend(Mockito.any(),
|
||||
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
|
||||
Mockito.nullable(int.class), Mockito.nullable(int.class),
|
||||
Mockito.any());
|
||||
|
||||
TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
|
||||
"abcde", FSOperationType.APPEND,
|
||||
TracingHeaderFormat.ALL_ID_FORMAT, null));
|
||||
|
||||
// Check that expect header is enabled before the append call.
|
||||
Assertions.assertThat(appendRequestParameters.isExpectHeaderEnabled())
|
||||
.describedAs("The expect header is not true before the append call")
|
||||
.isTrue();
|
||||
|
||||
intercept(AzureBlobFileSystemException.class,
|
||||
() -> testClient.append(finalTestPath, buffer, appendRequestParameters, null, tracingContext));
|
||||
|
||||
// Verify that the request was not exponentially retried because of user error.
|
||||
Assertions.assertThat(tracingContext.getRetryCount())
|
||||
.describedAs("The retry count is incorrect")
|
||||
.isEqualTo(0);
|
||||
|
||||
// Verify that the same request was retried with expect header disabled.
|
||||
Assertions.assertThat(appendRequestParameters.isExpectHeaderEnabled())
|
||||
.describedAs("The expect header is not false")
|
||||
.isFalse();
|
||||
}
|
||||
}
|
@ -0,0 +1,358 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.ProtocolException;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class ITestAbfsRestOperation extends AbstractAbfsIntegrationTest {
|
||||
|
||||
// Specifies whether getOutputStream() or write() throws IOException.
|
||||
public enum ErrorType {OUTPUTSTREAM, WRITE};
|
||||
|
||||
private static final int HTTP_EXPECTATION_FAILED = 417;
|
||||
private static final int HTTP_ERROR = 0;
|
||||
private static final int ZERO = 0;
|
||||
private static final int REDUCED_RETRY_COUNT = 2;
|
||||
private static final int REDUCED_BACKOFF_INTERVAL = 100;
|
||||
private static final int BUFFER_LENGTH = 5;
|
||||
private static final int BUFFER_OFFSET = 0;
|
||||
private static final String TEST_PATH = "/testfile";
|
||||
|
||||
// Specifies whether the expect header is enabled or not.
|
||||
@Parameterized.Parameter
|
||||
public boolean expectHeaderEnabled;
|
||||
|
||||
// Gives the http response code.
|
||||
@Parameterized.Parameter(1)
|
||||
public int responseCode;
|
||||
|
||||
// Gives the http response message.
|
||||
@Parameterized.Parameter(2)
|
||||
public String responseMessage;
|
||||
|
||||
// Gives the errorType based on the enum.
|
||||
@Parameterized.Parameter(3)
|
||||
public ErrorType errorType;
|
||||
|
||||
// The intercept.
|
||||
private AbfsThrottlingIntercept intercept;
|
||||
|
||||
/*
|
||||
HTTP_OK = 200,
|
||||
HTTP_UNAVAILABLE = 503,
|
||||
HTTP_NOT_FOUND = 404,
|
||||
HTTP_EXPECTATION_FAILED = 417,
|
||||
HTTP_ERROR = 0.
|
||||
*/
|
||||
@Parameterized.Parameters(name = "expect={0}-code={1}-ErrorType={3}")
|
||||
public static Iterable<Object[]> params() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{true, HTTP_OK, "OK", ErrorType.WRITE},
|
||||
{false, HTTP_OK, "OK", ErrorType.WRITE},
|
||||
{true, HTTP_UNAVAILABLE, "ServerBusy", ErrorType.OUTPUTSTREAM},
|
||||
{true, HTTP_NOT_FOUND, "Resource Not Found", ErrorType.OUTPUTSTREAM},
|
||||
{true, HTTP_EXPECTATION_FAILED, "Expectation Failed", ErrorType.OUTPUTSTREAM},
|
||||
{true, HTTP_ERROR, "Error", ErrorType.OUTPUTSTREAM}
|
||||
});
|
||||
}
|
||||
|
||||
public ITestAbfsRestOperation() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test helper method to get random bytes array.
|
||||
* @param length The length of byte buffer
|
||||
* @return byte buffer
|
||||
*/
|
||||
private byte[] getRandomBytesArray(int length) {
|
||||
final byte[] b = new byte[length];
|
||||
new Random().nextBytes(b);
|
||||
return b;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gives the AbfsRestOperation.
|
||||
* @return abfsRestOperation.
|
||||
*/
|
||||
private AbfsRestOperation getRestOperation() throws Exception {
|
||||
// Get the filesystem.
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
final Configuration configuration = new Configuration();
|
||||
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
|
||||
AbfsClient abfsClient = fs.getAbfsStore().getClient();
|
||||
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
|
||||
configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME));
|
||||
|
||||
// Update the configuration with reduced retry count and reduced backoff interval.
|
||||
AbfsConfiguration abfsConfig
|
||||
= TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
|
||||
abfsConfiguration,
|
||||
REDUCED_RETRY_COUNT, REDUCED_BACKOFF_INTERVAL);
|
||||
|
||||
intercept = Mockito.mock(AbfsThrottlingIntercept.class);
|
||||
Mockito.doNothing().when(intercept).updateMetrics(Mockito.any(), Mockito.any());
|
||||
|
||||
// Gets the client.
|
||||
AbfsClient testClient = Mockito.spy(ITestAbfsClient.createTestClientFromCurrentContext(
|
||||
abfsClient,
|
||||
abfsConfig));
|
||||
|
||||
Mockito.doReturn(intercept).when(testClient).getIntercept();
|
||||
|
||||
// Expect header is enabled or not based on the parameter.
|
||||
AppendRequestParameters appendRequestParameters
|
||||
= new AppendRequestParameters(
|
||||
BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH,
|
||||
AppendRequestParameters.Mode.APPEND_MODE, false, null,
|
||||
expectHeaderEnabled);
|
||||
|
||||
byte[] buffer = getRandomBytesArray(5);
|
||||
|
||||
// Create a test container to upload the data.
|
||||
Path testPath = path(TEST_PATH);
|
||||
fs.create(testPath);
|
||||
String finalTestPath = testPath.toString().substring(testPath.toString().lastIndexOf("/"));
|
||||
|
||||
// Creates a list of request headers.
|
||||
final List<AbfsHttpHeader> requestHeaders = ITestAbfsClient.getTestRequestHeaders(testClient);
|
||||
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH));
|
||||
if (appendRequestParameters.isExpectHeaderEnabled()) {
|
||||
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE));
|
||||
}
|
||||
|
||||
// Updates the query parameters.
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = testClient.createDefaultUriQueryBuilder();
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(appendRequestParameters.getPosition()));
|
||||
|
||||
// Creates the url for the specified path.
|
||||
URL url = testClient.createRequestUrl(finalTestPath, abfsUriQueryBuilder.toString());
|
||||
|
||||
// Create a mock of the AbfsRestOperation to set the urlConnection in the corresponding httpOperation.
|
||||
AbfsRestOperation op = Mockito.spy(new AbfsRestOperation(
|
||||
AbfsRestOperationType.Append,
|
||||
testClient,
|
||||
HTTP_METHOD_PUT,
|
||||
url,
|
||||
requestHeaders, buffer,
|
||||
appendRequestParameters.getoffset(),
|
||||
appendRequestParameters.getLength(), null));
|
||||
|
||||
AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url, HTTP_METHOD_PUT, requestHeaders));
|
||||
|
||||
// Sets the expect request property if expect header is enabled.
|
||||
if (expectHeaderEnabled) {
|
||||
Mockito.doReturn(HUNDRED_CONTINUE)
|
||||
.when(abfsHttpOperation)
|
||||
.getConnProperty(EXPECT);
|
||||
}
|
||||
|
||||
HttpURLConnection urlConnection = mock(HttpURLConnection.class);
|
||||
Mockito.doNothing().when(urlConnection).setRequestProperty(Mockito
|
||||
.any(), Mockito.any());
|
||||
Mockito.doReturn(HTTP_METHOD_PUT).when(urlConnection).getRequestMethod();
|
||||
Mockito.doReturn(url).when(urlConnection).getURL();
|
||||
Mockito.doReturn(urlConnection).when(abfsHttpOperation).getConnection();
|
||||
|
||||
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito
|
||||
.any(), Mockito.any());
|
||||
Mockito.doReturn(url).when(abfsHttpOperation).getConnUrl();
|
||||
Mockito.doReturn(HTTP_METHOD_PUT).when(abfsHttpOperation).getConnRequestMethod();
|
||||
|
||||
switch (errorType) {
|
||||
case OUTPUTSTREAM:
|
||||
// If the getOutputStream() throws IOException and Expect Header is
|
||||
// enabled, it returns back to processResponse and hence we have
|
||||
// mocked the response code and the response message to check different
|
||||
// behaviour based on response code.
|
||||
Mockito.doReturn(responseCode).when(abfsHttpOperation).getConnResponseCode();
|
||||
Mockito.doReturn(responseMessage)
|
||||
.when(abfsHttpOperation)
|
||||
.getConnResponseMessage();
|
||||
Mockito.doThrow(new ProtocolException("Server rejected Operation"))
|
||||
.when(abfsHttpOperation)
|
||||
.getConnOutputStream();
|
||||
break;
|
||||
case WRITE:
|
||||
// If write() throws IOException and Expect Header is
|
||||
// enabled or not, it should throw back the exception.
|
||||
OutputStream outputStream = Mockito.spy(new OutputStream() {
|
||||
@Override
|
||||
public void write(final int i) throws IOException {
|
||||
}
|
||||
});
|
||||
Mockito.doReturn(outputStream).when(abfsHttpOperation).getConnOutputStream();
|
||||
Mockito.doThrow(new IOException())
|
||||
.when(outputStream)
|
||||
.write(buffer, appendRequestParameters.getoffset(),
|
||||
appendRequestParameters.getLength());
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
// Sets the httpOperation for the rest operation.
|
||||
Mockito.doReturn(abfsHttpOperation)
|
||||
.when(op)
|
||||
.createHttpOperation();
|
||||
return op;
|
||||
}
|
||||
|
||||
void assertTraceContextState(int retryCount, int assertRetryCount, int bytesSent, int assertBytesSent,
|
||||
int expectedBytesSent, int assertExpectedBytesSent) {
|
||||
// Assert that the request is retried or not.
|
||||
Assertions.assertThat(retryCount)
|
||||
.describedAs("The retry count is incorrect")
|
||||
.isEqualTo(assertRetryCount);
|
||||
|
||||
// Assert that metrics will be updated correctly.
|
||||
Assertions.assertThat(bytesSent)
|
||||
.describedAs("The bytes sent is incorrect")
|
||||
.isEqualTo(assertBytesSent);
|
||||
Assertions.assertThat(expectedBytesSent)
|
||||
.describedAs("The expected bytes sent is incorrect")
|
||||
.isEqualTo(assertExpectedBytesSent);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the functionalities based on whether getOutputStream() or write()
|
||||
* throws exception and what is the corresponding response code.
|
||||
*/
|
||||
@Test
|
||||
public void testExpectHundredContinue() throws Exception {
|
||||
// Gets the AbfsRestOperation.
|
||||
AbfsRestOperation op = getRestOperation();
|
||||
AbfsHttpOperation httpOperation = op.createHttpOperation();
|
||||
|
||||
TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
|
||||
"abcde", FSOperationType.APPEND,
|
||||
TracingHeaderFormat.ALL_ID_FORMAT, null));
|
||||
|
||||
switch (errorType) {
|
||||
case WRITE:
|
||||
// If write() throws IOException and Expect Header is
|
||||
// enabled or not, it should throw back the exception
|
||||
// which is caught and exponential retry logic comes into place.
|
||||
intercept(IOException.class,
|
||||
() -> op.execute(tracingContext));
|
||||
|
||||
// Asserting update of metrics and retries.
|
||||
assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, httpOperation.getBytesSent(), BUFFER_LENGTH,
|
||||
0, 0);
|
||||
break;
|
||||
case OUTPUTSTREAM:
|
||||
switch (responseCode) {
|
||||
case HTTP_UNAVAILABLE:
|
||||
// In the case of 503 i.e. throttled case, we should retry.
|
||||
intercept(IOException.class,
|
||||
() -> op.execute(tracingContext));
|
||||
|
||||
// Asserting update of metrics and retries.
|
||||
assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, httpOperation.getBytesSent(), ZERO,
|
||||
httpOperation.getExpectedBytesToBeSent(), BUFFER_LENGTH);
|
||||
|
||||
// Verifies that update Metrics call is made for throttle case and for the first without retry +
|
||||
// for the retried cases as well.
|
||||
Mockito.verify(intercept, times(REDUCED_RETRY_COUNT + 1))
|
||||
.updateMetrics(Mockito.any(), Mockito.any());
|
||||
break;
|
||||
case HTTP_ERROR:
|
||||
// In the case of http status code 0 i.e. ErrorType case, we should retry.
|
||||
intercept(IOException.class,
|
||||
() -> op.execute(tracingContext));
|
||||
|
||||
// Asserting update of metrics and retries.
|
||||
assertTraceContextState(tracingContext.getRetryCount(), REDUCED_RETRY_COUNT, httpOperation.getBytesSent(),
|
||||
ZERO, 0, 0);
|
||||
|
||||
// Verifies that update Metrics call is made for ErrorType case and for the first without retry +
|
||||
// for the retried cases as well.
|
||||
Mockito.verify(intercept, times(REDUCED_RETRY_COUNT + 1))
|
||||
.updateMetrics(Mockito.any(), Mockito.any());
|
||||
break;
|
||||
case HTTP_NOT_FOUND:
|
||||
case HTTP_EXPECTATION_FAILED:
|
||||
// In the case of 4xx ErrorType. i.e. user ErrorType, retry should not happen.
|
||||
intercept(AzureBlobFileSystemException.class,
|
||||
() -> op.execute(tracingContext));
|
||||
|
||||
// Asserting update of metrics and retries.
|
||||
assertTraceContextState(tracingContext.getRetryCount(), ZERO, 0,
|
||||
0, 0, 0);
|
||||
|
||||
// Verifies that update Metrics call is not made for user ErrorType case.
|
||||
Mockito.verify(intercept, never())
|
||||
.updateMetrics(Mockito.any(), Mockito.any());
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
@ -72,6 +72,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
|
||||
boolean isFlushEnabled,
|
||||
boolean disableOutputStreamFlush,
|
||||
boolean isAppendBlob,
|
||||
boolean isExpectHeaderEnabled,
|
||||
AbfsClient client,
|
||||
String path,
|
||||
TracingContext tracingContext,
|
||||
@ -89,6 +90,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
|
||||
|
||||
return new AbfsOutputStreamContext(2)
|
||||
.withWriteBufferSize(writeBufferSize)
|
||||
.enableExpectHeader(isExpectHeaderEnabled)
|
||||
.enableFlush(isFlushEnabled)
|
||||
.disableOutputStreamFlush(disableOutputStreamFlush)
|
||||
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
|
||||
@ -129,6 +131,7 @@ public void verifyShortWriteRequest() throws Exception {
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
|
||||
@ -149,9 +152,9 @@ public void verifyShortWriteRequest() throws Exception {
|
||||
out.hsync();
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, WRITE_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, WRITE_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null);
|
||||
WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(),
|
||||
@ -190,6 +193,7 @@ public void verifyWriteRequest() throws Exception {
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
tracingContext,
|
||||
@ -203,9 +207,9 @@ public void verifyWriteRequest() throws Exception {
|
||||
out.close();
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(),
|
||||
@ -264,6 +268,7 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
tracingContext,
|
||||
@ -277,9 +282,9 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
|
||||
out.close();
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class));
|
||||
@ -335,6 +340,7 @@ public void verifyWriteRequestOfBufferSize() throws Exception {
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
|
||||
@ -350,9 +356,9 @@ public void verifyWriteRequestOfBufferSize() throws Exception {
|
||||
Thread.sleep(1000);
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class));
|
||||
@ -390,6 +396,7 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
|
||||
@ -405,9 +412,9 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
|
||||
Thread.sleep(1000);
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, true, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, true, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null);
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class));
|
||||
@ -449,6 +456,7 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
|
||||
@ -464,9 +472,9 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
|
||||
out.hflush();
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class));
|
||||
@ -518,6 +526,7 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
client,
|
||||
PATH,
|
||||
new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
|
||||
@ -535,9 +544,9 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
|
||||
Thread.sleep(1000);
|
||||
|
||||
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
|
||||
BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true);
|
||||
|
||||
verify(client, times(1)).append(
|
||||
eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(TracingContext.class));
|
||||
|
@ -58,7 +58,7 @@ public void testRenameFailuresDueToIncompleteMetadata() throws Exception {
|
||||
String destNoParentPath = "/NoParent/Dest";
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
|
||||
AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
|
||||
AbfsClient mockClient = ITestAbfsClient.getMockAbfsClient(
|
||||
fs.getAbfsStore().getClient(),
|
||||
fs.getAbfsStore().getAbfsConfiguration());
|
||||
|
||||
|
@ -103,7 +103,7 @@ public void testThrottlingIntercept() throws Exception {
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
|
||||
"dummy.dfs.core.windows.net");
|
||||
AbfsThrottlingIntercept intercept;
|
||||
AbfsClient abfsClient = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration);
|
||||
AbfsClient abfsClient = ITestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration);
|
||||
intercept = abfsClient.getIntercept();
|
||||
Assertions.assertThat(intercept)
|
||||
.describedAs("AbfsNoOpThrottlingIntercept instance expected")
|
||||
@ -114,7 +114,7 @@ public void testThrottlingIntercept() throws Exception {
|
||||
// On disabling throttling AbfsClientThrottlingIntercept object is returned
|
||||
AbfsConfiguration abfsConfiguration1 = new AbfsConfiguration(configuration,
|
||||
"dummy1.dfs.core.windows.net");
|
||||
AbfsClient abfsClient1 = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1);
|
||||
AbfsClient abfsClient1 = ITestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1);
|
||||
intercept = abfsClient1.getIntercept();
|
||||
Assertions.assertThat(intercept)
|
||||
.describedAs("AbfsClientThrottlingIntercept instance expected")
|
||||
|
Loading…
Reference in New Issue
Block a user