HADOOP-19033. S3A: disable checksums when fs.s3a.checksum.validation = false (#6441)

Add new option fs.s3a.checksum.validation, default false, which
is used when creating s3 clients to enable/disable checksum
validation.

When false, GET response processing is measurably faster.

Contributed by Steve Loughran.
This commit is contained in:
Steve Loughran 2024-01-17 18:34:14 +00:00 committed by GitHub
parent 9634bd31e6
commit eeb657e85f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 180 additions and 8 deletions

View File

@ -1568,4 +1568,19 @@ private Constants() {
* is true: {@value}.
*/
public static final String HTTP_SIGNER_CLASS_NAME = "fs.s3a.http.signer.class";
/**
* Should checksums be validated on download?
* This is slower and not needed on TLS connections.
* Value: {@value}.
*/
public static final String CHECKSUM_VALIDATION =
"fs.s3a.checksum.validation";
/**
* Default value of {@link #CHECKSUM_VALIDATION}.
* Value: {@value}.
*/
public static final boolean CHECKSUM_VALIDATION_DEFAULT = false;
}

View File

@ -179,11 +179,15 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
configureEndpointAndRegion(builder, parameters, conf);
S3Configuration serviceConfiguration = S3Configuration.builder()
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
.build();
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
.checksumValidationEnabled(parameters.isChecksumValidationEnabled())
.build();
final ClientOverrideConfiguration.Builder override =
createClientOverrideConfiguration(parameters, conf);
S3BaseClientBuilder s3BaseClientBuilder = builder
.overrideConfiguration(createClientOverrideConfiguration(parameters, conf))
.overrideConfiguration(override.build())
.credentialsProvider(parameters.getCredentialSet())
.disableS3ExpressSessionAuth(!parameters.isExpressCreateSession())
.serviceConfiguration(serviceConfiguration);
@ -204,8 +208,9 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> Build
* @throws IOException any IOE raised, or translated exception
* @throws RuntimeException some failures creating an http signer
* @return the override configuration
* @throws IOException any IOE raised, or translated exception
*/
protected ClientOverrideConfiguration createClientOverrideConfiguration(
protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
S3ClientCreationParameters parameters, Configuration conf) throws IOException {
final ClientOverrideConfiguration.Builder clientOverrideConfigBuilder =
AWSClientConfig.createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3);
@ -237,7 +242,7 @@ protected ClientOverrideConfiguration createClientOverrideConfiguration(
final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf);
clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build());
return clientOverrideConfigBuilder.build();
return clientOverrideConfigBuilder;
}
/**

View File

@ -1055,7 +1055,9 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
.withRegion(configuredRegion)
.withFipsEnabled(fipsEnabled)
.withExpressCreateSession(
conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT));
conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT))
.withChecksumValidationEnabled(
conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT));
S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
s3Client = clientFactory.createS3Client(getUri(), parameters);

View File

@ -1304,6 +1304,17 @@ public IOStatistics getIOStatistics() {
return ioStatistics;
}
/**
* Get the wrapped stream.
* This is for testing only.
*
* @return the wrapped stream, or null if there is none.
*/
@VisibleForTesting
public ResponseInputStream<GetObjectResponse> getWrappedStream() {
return wrappedStream;
}
/**
* Callbacks for input stream IO.
*/

View File

@ -176,6 +176,11 @@ final class S3ClientCreationParameters {
*/
private boolean expressCreateSession = S3EXPRESS_CREATE_SESSION_DEFAULT;
/**
* Enable checksum validation.
*/
private boolean checksumValidationEnabled;
/**
* Is FIPS enabled?
*/
@ -451,6 +456,20 @@ public S3ClientCreationParameters withExpressCreateSession(final boolean value)
return this;
}
/**
* Set builder value.
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withChecksumValidationEnabled(final boolean value) {
checksumValidationEnabled = value;
return this;
}
public boolean isChecksumValidationEnabled() {
return checksumValidationEnabled;
}
@Override
public String toString() {
return "S3ClientCreationParameters{" +
@ -464,6 +483,7 @@ public String toString() {
", multipartCopy=" + multipartCopy +
", region='" + region + '\'' +
", expressCreateSession=" + expressCreateSession +
", checksumValidationEnabled=" + checksumValidationEnabled +
'}';
}

View File

@ -73,13 +73,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.internal.io.ChecksumValidatingInputStream;
import software.amazon.awssdk.services.s3.internal.checksums.S3ChecksumValidatingInputStream;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import java.io.Closeable;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
@ -1663,6 +1669,54 @@ public static S3AInputStream getS3AInputStream(
}
}
/**
* Get the inner stream of a FilterInputStream.
* Uses reflection to access a protected field.
* @param fis input stream.
* @return the inner stream.
*/
public static InputStream getInnerStream(FilterInputStream fis) {
try {
final Field field = FilterInputStream.class.getDeclaredField("in");
field.setAccessible(true);
return (InputStream) field.get(fis);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new AssertionError("Failed to get inner stream: " + e, e);
}
}
/**
* Get the innermost stream of a chain of FilterInputStreams.
* This allows tests into the internals of an AWS SDK stream chain.
* @param fis input stream.
* @return the inner stream.
*/
public static InputStream getInnermostStream(FilterInputStream fis) {
InputStream inner = fis;
while (inner instanceof FilterInputStream) {
inner = getInnerStream((FilterInputStream) inner);
}
return inner;
}
/**
* Verify that an s3a stream is not checksummed.
* The inner stream must be active.
*/
public static void assertStreamIsNotChecksummed(final S3AInputStream wrappedS3A) {
final ResponseInputStream<GetObjectResponse> wrappedStream =
wrappedS3A.getWrappedStream();
Assertions.assertThat(wrappedStream)
.describedAs("wrapped stream is not open: call read() on %s", wrappedS3A)
.isNotNull();
final InputStream inner = getInnermostStream(wrappedStream);
Assertions.assertThat(inner)
.describedAs("innermost stream of %s", wrappedS3A)
.isNotInstanceOf(ChecksumValidatingInputStream.class)
.isNotInstanceOf(S3ChecksumValidatingInputStream.class);
}
/**
* Disable Prefetching streams from S3AFileSystem in tests.
* @param conf Configuration to remove the prefetch property from.

View File

@ -20,6 +20,7 @@
import java.io.EOFException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
@ -29,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileStatus;
@ -45,8 +47,15 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
@ -79,6 +88,16 @@ public ITestS3AOpenCost() {
super(true);
}
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
CHECKSUM_VALIDATION);
conf.setBoolean(CHECKSUM_VALIDATION, false);
disableFilesystemCaching(conf);
return conf;
}
/**
* Setup creates a test file, saves is status and length
* to fields.
@ -139,6 +158,34 @@ public void testOpenFileWithStatusOfOtherFS() throws Throwable {
assertEquals("bytes read from file", fileLength, readLen);
}
@Test
public void testStreamIsNotChecksummed() throws Throwable {
describe("Verify that an opened stream is not checksummed");
S3AFileSystem fs = getFileSystem();
// open the file
try (FSDataInputStream in = verifyMetrics(() ->
fs.openFile(testFile)
.must(FS_OPTION_OPENFILE_READ_POLICY,
FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
.mustLong(FS_OPTION_OPENFILE_LENGTH, fileLength)
.build()
.get(),
always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0))) {
// if prefetching is enabled, skip this test
final InputStream wrapped = in.getWrappedStream();
if (!(wrapped instanceof S3AInputStream)) {
skip("Not an S3AInputStream: " + wrapped);
}
// open the stream.
in.read();
// now examine the innermost stream and make sure it doesn't have a checksum
assertStreamIsNotChecksummed(getS3AInputStream(in));
}
}
@Test
public void testOpenFileShorterLength() throws Throwable {
// do a second read with the length declared as short.

View File

@ -43,6 +43,7 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
@ -84,6 +85,11 @@ public class ITestUnbufferDraining extends AbstractS3ACostTest {
*/
public static final int ATTEMPTS = 10;
/**
* Should checksums be enabled?
*/
public static final boolean CHECKSUMS = false;
/**
* Test FS with a tiny connection pool and
* no recovery.
@ -102,6 +108,7 @@ public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
ASYNC_DRAIN_THRESHOLD,
CHECKSUM_VALIDATION,
ESTABLISH_TIMEOUT,
INPUT_FADVISE,
MAX_ERROR_RETRIES,
@ -111,7 +118,7 @@ public Configuration createConfiguration() {
REQUEST_TIMEOUT,
RETRY_LIMIT,
SOCKET_TIMEOUT);
conf.setBoolean(CHECKSUM_VALIDATION, CHECKSUMS);
return conf;
}
@ -132,6 +139,7 @@ public void setup() throws Exception {
conf.setInt(MAX_ERROR_RETRIES, 1);
conf.setInt(READAHEAD_RANGE, READAHEAD);
conf.setInt(RETRY_LIMIT, 1);
conf.setBoolean(CHECKSUM_VALIDATION, CHECKSUMS);
setDurationAsSeconds(conf, ESTABLISH_TIMEOUT,
Duration.ofSeconds(1));
@ -221,12 +229,22 @@ private static long lookupCounter(
*/
private static void assertReadPolicy(final FSDataInputStream in,
final S3AInputPolicy policy) {
S3AInputStream inner = (S3AInputStream) in.getWrappedStream();
S3AInputStream inner = getS3AInputStream(in);
Assertions.assertThat(inner.getInputPolicy())
.describedAs("input policy of %s", inner)
.isEqualTo(policy);
}
/**
* Extract the inner stream from an FSDataInputStream.
* Because prefetching is disabled, this is always an S3AInputStream.
* @param in input stream
* @return the inner stream cast to an S3AInputStream.
*/
private static S3AInputStream getS3AInputStream(final FSDataInputStream in) {
return (S3AInputStream) in.getWrappedStream();
}
/**
* Test stream close performance/behavior with unbuffer
* aborting rather than draining.