HADOOP-18883. [ABFS]: Expect-100 JDK bug resolution: prevent multiple server calls (#6022)

Address JDK bug JDK-8314978 related to handling of HTTP 100
responses. 

https://bugs.openjdk.org/browse/JDK-8314978

In the AbfsHttpOperation, after sendRequest() we call processResponse()
method from AbfsRestOperation.
Even if the conn.getOutputStream() fails due to expect-100 error, 
we consume the exception and let the code go ahead.
This may call getHeaderField() / getHeaderFields() / getHeaderFieldLong() after
getOutputStream() has failed. These invocation all lead to server calls.

This commit aims to prevent this.
If connection.getOutputStream() fails due to an Expect-100 error,
the ABFS client does not invoke getHeaderField(), getHeaderFields(),
getHeaderFieldLong() or getInputStream().

getResponseCode() is safe as on the failure it sets the
responseCode variable in HttpUrlConnection object.

Contributed by Pranav Saxena
This commit is contained in:
Pranav Saxena 2024-01-21 11:14:54 -08:00 committed by GitHub
parent d274f778c1
commit 7dc166ddc7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 109 additions and 9 deletions

View File

@ -69,6 +69,7 @@ public final class AbfsHttpConstants {
* and should qualify for retry. * and should qualify for retry.
*/ */
public static final int HTTP_CONTINUE = 100; public static final int HTTP_CONTINUE = 100;
public static final String EXPECT_100_JDK_ERROR = "Server rejected operation";
// Abfs generic constants // Abfs generic constants
public static final String SINGLE_WHITE_SPACE = " "; public static final String SINGLE_WHITE_SPACE = " ";

View File

@ -22,12 +22,14 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.net.URL; import java.net.URL;
import java.util.List; import java.util.List;
import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.SSLSocketFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
@ -43,6 +45,7 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
@ -83,6 +86,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
private long sendRequestTimeMs; private long sendRequestTimeMs;
private long recvResponseTimeMs; private long recvResponseTimeMs;
private boolean shouldMask = false; private boolean shouldMask = false;
private boolean connectionDisconnectedOnError = false;
public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult( public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(
final URL url, final URL url,
@ -324,14 +328,26 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
*/ */
outputStream = getConnOutputStream(); outputStream = getConnOutputStream();
} catch (IOException e) { } catch (IOException e) {
/* If getOutputStream fails with an exception and expect header connectionDisconnectedOnError = true;
is enabled, we return back without throwing an exception to /* If getOutputStream fails with an expect-100 exception , we return back
the caller. The caller is responsible for setting the correct status code. without throwing an exception to the caller. Else, we throw back the exception.
If expect header is not enabled, we throw back the exception.
*/ */
String expectHeader = getConnProperty(EXPECT); String expectHeader = getConnProperty(EXPECT);
if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) { if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)
&& e instanceof ProtocolException
&& EXPECT_100_JDK_ERROR.equals(e.getMessage())) {
LOG.debug("Getting output stream failed with expect header enabled, returning back ", e); LOG.debug("Getting output stream failed with expect header enabled, returning back ", e);
/*
* In case expect-100 assertion has failed, headers and inputStream should not
* be parsed. Reason being, conn.getHeaderField(), conn.getHeaderFields(),
* conn.getInputStream() will lead to repeated server call.
* ref: https://bugs.openjdk.org/browse/JDK-8314978.
* Reading conn.responseCode() and conn.getResponseMessage() is safe in
* case of Expect-100 error. Reason being, in JDK, it stores the responseCode
* in the HttpUrlConnection object before throwing exception to the caller.
*/
this.statusCode = getConnResponseCode();
this.statusDescription = getConnResponseMessage();
return; return;
} else { } else {
LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e); LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e);
@ -364,7 +380,17 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
* @throws IOException if an error occurs. * @throws IOException if an error occurs.
*/ */
public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException { public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException {
if (connectionDisconnectedOnError) {
LOG.debug("This connection was not successful or has been disconnected, "
+ "hence not parsing headers and inputStream");
return;
}
processConnHeadersAndInputStreams(buffer, offset, length);
}
void processConnHeadersAndInputStreams(final byte[] buffer,
final int offset,
final int length) throws IOException {
// get the response // get the response
long startTime = 0; long startTime = 0;
startTime = System.nanoTime(); startTime = System.nanoTime();
@ -608,6 +634,11 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
return connection.getResponseMessage(); return connection.getResponseMessage();
} }
@VisibleForTesting
Boolean getConnectionDisconnectedOnError() {
return connectionDisconnectedOnError;
}
public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation { public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation {
/** /**
* Creates an instance to represent fixed results. * Creates an instance to represent fixed results.

View File

@ -338,7 +338,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
*/ */
AppendRequestParameters reqParams = new AppendRequestParameters( AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled); offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled);
AbfsRestOperation op = client.append(path, AbfsRestOperation op = getClient().append(path,
blockUploadData.toByteArray(), reqParams, cachedSasToken.get(), blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
contextEncryptionAdapter, new TracingContext(tracingContext)); contextEncryptionAdapter, new TracingContext(tracingContext));
cachedSasToken.update(op.getSasToken()); cachedSasToken.update(op.getSasToken());
@ -655,7 +655,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
AbfsPerfTracker tracker = client.getAbfsPerfTracker(); AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) { "flushWrittenBytesToServiceInternal", "flush")) {
AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, AbfsRestOperation op = getClient().flush(path, offset, retainUncommitedData,
isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter, isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter,
new TracingContext(tracingContext)); new TracingContext(tracingContext));
cachedSasToken.update(op.getSasToken()); cachedSasToken.update(op.getSasToken());
@ -795,4 +795,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
ListeningExecutorService getExecutorService() { ListeningExecutorService getExecutorService() {
return executorService; return executorService;
} }
@VisibleForTesting
AbfsClient getClient() {
return client;
}
} }

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
@ -586,7 +587,7 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
.getConnResponseMessage(); .getConnResponseMessage();
// Make the getOutputStream throw IOException to see it returns from the sendRequest correctly. // Make the getOutputStream throw IOException to see it returns from the sendRequest correctly.
Mockito.doThrow(new ProtocolException("Server rejected Operation")) Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
.when(abfsHttpOperation) .when(abfsHttpOperation)
.getConnOutputStream(); .getConnOutputStream();

View File

@ -18,15 +18,19 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
@ -34,6 +38,8 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED;
/** /**
* Test create operation. * Test create operation.
*/ */
@ -148,6 +154,61 @@ public class ITestAbfsOutputStream extends AbstractAbfsIntegrationTest {
} }
} }
@Test
public void testExpect100ContinueFailureInAppend() throws Exception {
Configuration configuration = new Configuration(getRawConfiguration());
configuration.set(FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, "true");
AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration);
Path path = new Path("/testFile");
AbfsOutputStream os = Mockito.spy(
(AbfsOutputStream) fs.create(path).getWrappedStream());
AbfsClient spiedClient = Mockito.spy(os.getClient());
AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2];
mockSetupForAppend(httpOpForAppendTest, spiedClient);
Mockito.doReturn(spiedClient).when(os).getClient();
fs.delete(path, true);
os.write(1);
LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
os.close();
});
Assertions.assertThat(httpOpForAppendTest[0].getConnectionDisconnectedOnError())
.describedAs("First try from AbfsClient will have expect-100 "
+ "header and should fail with expect-100 error.").isTrue();
Mockito.verify(httpOpForAppendTest[0], Mockito.times(0))
.processConnHeadersAndInputStreams(Mockito.any(byte[].class),
Mockito.anyInt(), Mockito.anyInt());
Assertions.assertThat(httpOpForAppendTest[1].getConnectionDisconnectedOnError())
.describedAs("The retried operation from AbfsClient should not "
+ "fail with expect-100 error. The retried operation does not have"
+ "expect-100 header.").isFalse();
Mockito.verify(httpOpForAppendTest[1], Mockito.times(1))
.processConnHeadersAndInputStreams(Mockito.any(byte[].class),
Mockito.anyInt(), Mockito.anyInt());
}
private void mockSetupForAppend(final AbfsHttpOperation[] httpOpForAppendTest,
final AbfsClient spiedClient) {
int[] index = new int[1];
index[0] = 0;
Mockito.doAnswer(abfsRestOpAppendGetInvocation -> {
AbfsRestOperation op = Mockito.spy(
(AbfsRestOperation) abfsRestOpAppendGetInvocation.callRealMethod());
Mockito.doAnswer(createHttpOpInvocation -> {
httpOpForAppendTest[index[0]] = Mockito.spy(
(AbfsHttpOperation) createHttpOpInvocation.callRealMethod());
return httpOpForAppendTest[index[0]++];
}).when(op).createHttpOperation();
return op;
})
.when(spiedClient)
.getAbfsRestOperation(Mockito.any(AbfsRestOperationType.class),
Mockito.anyString(), Mockito.any(
URL.class), Mockito.anyList(), Mockito.any(byte[].class),
Mockito.anyInt(), Mockito.anyInt(), Mockito.nullable(String.class));
}
/** /**
* Separate method to create an outputStream using a local FS instance so * Separate method to create an outputStream using a local FS instance so
* that once this method has returned, the FS instance can be eligible for GC. * that once this method has returned, the FS instance can be eligible for GC.

View File

@ -49,6 +49,7 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
@ -232,7 +233,7 @@ public class ITestAbfsRestOperation extends AbstractAbfsIntegrationTest {
Mockito.doReturn(responseMessage) Mockito.doReturn(responseMessage)
.when(abfsHttpOperation) .when(abfsHttpOperation)
.getConnResponseMessage(); .getConnResponseMessage();
Mockito.doThrow(new ProtocolException("Server rejected Operation")) Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
.when(abfsHttpOperation) .when(abfsHttpOperation)
.getConnOutputStream(); .getConnOutputStream();
break; break;