diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 636438afef..4408cf68a4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1568,4 +1568,19 @@ private Constants() { * is true: {@value}. */ public static final String HTTP_SIGNER_CLASS_NAME = "fs.s3a.http.signer.class"; + + /** + * Should checksums be validated on download? + * This is slower and not needed on TLS connections. + * Value: {@value}. + */ + public static final String CHECKSUM_VALIDATION = + "fs.s3a.checksum.validation"; + + /** + * Default value of {@link #CHECKSUM_VALIDATION}. + * Value: {@value}. + */ + public static final boolean CHECKSUM_VALIDATION_DEFAULT = false; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index 0fde93e654..0a3267a9fe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -179,11 +179,15 @@ private , ClientT> Build configureEndpointAndRegion(builder, parameters, conf); S3Configuration serviceConfiguration = S3Configuration.builder() - .pathStyleAccessEnabled(parameters.isPathStyleAccess()) - .build(); + .pathStyleAccessEnabled(parameters.isPathStyleAccess()) + .checksumValidationEnabled(parameters.isChecksumValidationEnabled()) + .build(); + + final ClientOverrideConfiguration.Builder override = + createClientOverrideConfiguration(parameters, conf); S3BaseClientBuilder s3BaseClientBuilder = builder - .overrideConfiguration(createClientOverrideConfiguration(parameters, conf)) + .overrideConfiguration(override.build()) .credentialsProvider(parameters.getCredentialSet()) .disableS3ExpressSessionAuth(!parameters.isExpressCreateSession()) .serviceConfiguration(serviceConfiguration); @@ -204,8 +208,9 @@ private , ClientT> Build * @throws IOException any IOE raised, or translated exception * @throws RuntimeException some failures creating an http signer * @return the override configuration + * @throws IOException any IOE raised, or translated exception */ - protected ClientOverrideConfiguration createClientOverrideConfiguration( + protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration( S3ClientCreationParameters parameters, Configuration conf) throws IOException { final ClientOverrideConfiguration.Builder clientOverrideConfigBuilder = AWSClientConfig.createClientConfigBuilder(conf, AWS_SERVICE_IDENTIFIER_S3); @@ -237,7 +242,7 @@ protected ClientOverrideConfiguration createClientOverrideConfiguration( final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf); clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build()); - return clientOverrideConfigBuilder.build(); + return clientOverrideConfigBuilder; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 1aad1ad2f8..c5e6e09a83 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1055,7 +1055,9 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException { .withRegion(configuredRegion) .withFipsEnabled(fipsEnabled) .withExpressCreateSession( - conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT)); + conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT)) + .withChecksumValidationEnabled( + conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT)); S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf); s3Client = clientFactory.createS3Client(getUri(), parameters); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 3d2ecc7737..9f04e11d94 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -1304,6 +1304,17 @@ public IOStatistics getIOStatistics() { return ioStatistics; } + /** + * Get the wrapped stream. + * This is for testing only. + * + * @return the wrapped stream, or null if there is none. + */ + @VisibleForTesting + public ResponseInputStream getWrappedStream() { + return wrappedStream; + } + /** * Callbacks for input stream IO. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index 404a255528..0b01876ae5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -176,6 +176,11 @@ final class S3ClientCreationParameters { */ private boolean expressCreateSession = S3EXPRESS_CREATE_SESSION_DEFAULT; + /** + * Enable checksum validation. + */ + private boolean checksumValidationEnabled; + /** * Is FIPS enabled? */ @@ -451,6 +456,20 @@ public S3ClientCreationParameters withExpressCreateSession(final boolean value) return this; } + /** + * Set builder value. + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withChecksumValidationEnabled(final boolean value) { + checksumValidationEnabled = value; + return this; + } + + public boolean isChecksumValidationEnabled() { + return checksumValidationEnabled; + } + @Override public String toString() { return "S3ClientCreationParameters{" + @@ -464,6 +483,7 @@ public String toString() { ", multipartCopy=" + multipartCopy + ", region='" + region + '\'' + ", expressCreateSession=" + expressCreateSession + + ", checksumValidationEnabled=" + checksumValidationEnabled + '}'; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 6dc3ca1102..ed1fda316d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -73,13 +73,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.internal.io.ChecksumValidatingInputStream; +import software.amazon.awssdk.services.s3.internal.checksums.S3ChecksumValidatingInputStream; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; import java.io.Closeable; import java.io.File; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; +import java.lang.reflect.Field; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; @@ -1663,6 +1669,54 @@ public static S3AInputStream getS3AInputStream( } } + /** + * Get the inner stream of a FilterInputStream. + * Uses reflection to access a protected field. + * @param fis input stream. + * @return the inner stream. + */ + public static InputStream getInnerStream(FilterInputStream fis) { + try { + final Field field = FilterInputStream.class.getDeclaredField("in"); + field.setAccessible(true); + return (InputStream) field.get(fis); + } catch (IllegalAccessException | NoSuchFieldException e) { + throw new AssertionError("Failed to get inner stream: " + e, e); + } + } + + /** + * Get the innermost stream of a chain of FilterInputStreams. + * This allows tests into the internals of an AWS SDK stream chain. + * @param fis input stream. + * @return the inner stream. + */ + public static InputStream getInnermostStream(FilterInputStream fis) { + InputStream inner = fis; + while (inner instanceof FilterInputStream) { + inner = getInnerStream((FilterInputStream) inner); + } + return inner; + } + + /** + * Verify that an s3a stream is not checksummed. + * The inner stream must be active. + */ + public static void assertStreamIsNotChecksummed(final S3AInputStream wrappedS3A) { + final ResponseInputStream wrappedStream = + wrappedS3A.getWrappedStream(); + Assertions.assertThat(wrappedStream) + .describedAs("wrapped stream is not open: call read() on %s", wrappedS3A) + .isNotNull(); + + final InputStream inner = getInnermostStream(wrappedStream); + Assertions.assertThat(inner) + .describedAs("innermost stream of %s", wrappedS3A) + .isNotInstanceOf(ChecksumValidatingInputStream.class) + .isNotInstanceOf(S3ChecksumValidatingInputStream.class); + } + /** * Disable Prefetching streams from S3AFileSystem in tests. * @param conf Configuration to remove the prefetch property from. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 361c376cff..63b25f9c88 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -20,6 +20,7 @@ import java.io.EOFException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.TimeUnit; @@ -29,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileStatus; @@ -45,8 +47,15 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream; +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; +import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED; @@ -79,6 +88,16 @@ public ITestS3AOpenCost() { super(true); } + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + removeBaseAndBucketOverrides(conf, + CHECKSUM_VALIDATION); + conf.setBoolean(CHECKSUM_VALIDATION, false); + disableFilesystemCaching(conf); + return conf; + } + /** * Setup creates a test file, saves is status and length * to fields. @@ -139,6 +158,34 @@ public void testOpenFileWithStatusOfOtherFS() throws Throwable { assertEquals("bytes read from file", fileLength, readLen); } + @Test + public void testStreamIsNotChecksummed() throws Throwable { + describe("Verify that an opened stream is not checksummed"); + S3AFileSystem fs = getFileSystem(); + // open the file + try (FSDataInputStream in = verifyMetrics(() -> + fs.openFile(testFile) + .must(FS_OPTION_OPENFILE_READ_POLICY, + FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE) + .mustLong(FS_OPTION_OPENFILE_LENGTH, fileLength) + .build() + .get(), + always(NO_HEAD_OR_LIST), + with(STREAM_READ_OPENED, 0))) { + + // if prefetching is enabled, skip this test + final InputStream wrapped = in.getWrappedStream(); + if (!(wrapped instanceof S3AInputStream)) { + skip("Not an S3AInputStream: " + wrapped); + } + + // open the stream. + in.read(); + // now examine the innermost stream and make sure it doesn't have a checksum + assertStreamIsNotChecksummed(getS3AInputStream(in)); + } + } + @Test public void testOpenFileShorterLength() throws Throwable { // do a second read with the length declared as short. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java index b77ca97c7d..00bae1519f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java @@ -43,6 +43,7 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD; +import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION; import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE; import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS; @@ -84,6 +85,11 @@ public class ITestUnbufferDraining extends AbstractS3ACostTest { */ public static final int ATTEMPTS = 10; + /** + * Should checksums be enabled? + */ + public static final boolean CHECKSUMS = false; + /** * Test FS with a tiny connection pool and * no recovery. @@ -102,6 +108,7 @@ public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); removeBaseAndBucketOverrides(conf, ASYNC_DRAIN_THRESHOLD, + CHECKSUM_VALIDATION, ESTABLISH_TIMEOUT, INPUT_FADVISE, MAX_ERROR_RETRIES, @@ -111,7 +118,7 @@ public Configuration createConfiguration() { REQUEST_TIMEOUT, RETRY_LIMIT, SOCKET_TIMEOUT); - + conf.setBoolean(CHECKSUM_VALIDATION, CHECKSUMS); return conf; } @@ -132,6 +139,7 @@ public void setup() throws Exception { conf.setInt(MAX_ERROR_RETRIES, 1); conf.setInt(READAHEAD_RANGE, READAHEAD); conf.setInt(RETRY_LIMIT, 1); + conf.setBoolean(CHECKSUM_VALIDATION, CHECKSUMS); setDurationAsSeconds(conf, ESTABLISH_TIMEOUT, Duration.ofSeconds(1)); @@ -221,12 +229,22 @@ private static long lookupCounter( */ private static void assertReadPolicy(final FSDataInputStream in, final S3AInputPolicy policy) { - S3AInputStream inner = (S3AInputStream) in.getWrappedStream(); + S3AInputStream inner = getS3AInputStream(in); Assertions.assertThat(inner.getInputPolicy()) .describedAs("input policy of %s", inner) .isEqualTo(policy); } + /** + * Extract the inner stream from an FSDataInputStream. + * Because prefetching is disabled, this is always an S3AInputStream. + * @param in input stream + * @return the inner stream cast to an S3AInputStream. + */ + private static S3AInputStream getS3AInputStream(final FSDataInputStream in) { + return (S3AInputStream) in.getWrappedStream(); + } + /** * Test stream close performance/behavior with unbuffer * aborting rather than draining.