diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 7282cdab2f..67854e6572 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -398,6 +398,21 @@ private Constants() { public static final Duration DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION = Duration.ofSeconds(60); + /** + * Timeout for uploading all of a small object or a single part + * of a larger one. + * {@value}. + * Default unit is milliseconds for consistency with other options. + */ + public static final String PART_UPLOAD_TIMEOUT = + "fs.s3a.connection.part.upload.timeout"; + + /** + * Default part upload timeout: 15 minutes. + */ + public static final Duration DEFAULT_PART_UPLOAD_TIMEOUT = + Duration.ofMinutes(15); + /** * Should TCP Keepalive be enabled on the socket? * This adds some network IO, but finds failures faster. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 366da8392b..3ab19dfa19 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1286,6 +1286,13 @@ protected RequestFactory createRequestFactory() { STORAGE_CLASS); } + // optional custom timeout for bulk uploads + Duration partUploadTimeout = ConfigurationHelper.getDuration(getConf(), + PART_UPLOAD_TIMEOUT, + DEFAULT_PART_UPLOAD_TIMEOUT, + TimeUnit.MILLISECONDS, + Duration.ZERO); + return RequestFactoryImpl.builder() .withBucket(requireNonNull(bucket)) .withCannedACL(getCannedACL()) @@ -1295,6 +1302,7 @@ protected RequestFactory createRequestFactory() { .withContentEncoding(contentEncoding) .withStorageClass(storageClass) .withMultipartUploadEnabled(isMultipartUploadEnabled) + .withPartUploadTimeout(partUploadTimeout) .build(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java index 60729ac308..afd3ed7ff3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java @@ -26,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.AwsRequest; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.retry.RetryMode; @@ -623,4 +625,24 @@ static ConnectionSettings createConnectionSettings(Configuration conf) { socketTimeout); } + /** + * Set a custom ApiCallTimeout for a single request. + * This allows for a longer timeout to be used in data upload + * requests than that for all other S3 interactions; + * This does not happen by default in the V2 SDK + * (see HADOOP-19295). + *

+ * If the timeout is zero, the request is not patched. + * @param builder builder to patch. + * @param timeout timeout + */ + public static void setRequestTimeout(AwsRequest.Builder builder, Duration timeout) { + if (!timeout.isZero()) { + builder.overrideConfiguration( + AwsRequestOverrideConfiguration.builder() + .apiCallTimeout(timeout) + .apiCallAttemptTimeout(timeout) + .build()); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index df2a6567db..e9f7d70728 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a.impl; +import java.time.Duration; import java.util.Base64; import java.util.HashMap; import java.util.List; @@ -59,7 +60,9 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM; +import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.util.Preconditions.checkArgument; import static org.apache.hadoop.util.Preconditions.checkNotNull; @@ -128,6 +131,12 @@ public class RequestFactoryImpl implements RequestFactory { */ private final boolean isMultipartUploadEnabled; + /** + * Timeout for uploading objects/parts. + * This will be set on data put/post operations only. + */ + private final Duration partUploadTimeout; + /** * Constructor. * @param builder builder with all the configuration. @@ -142,6 +151,7 @@ protected RequestFactoryImpl( this.contentEncoding = builder.contentEncoding; this.storageClass = builder.storageClass; this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; + this.partUploadTimeout = builder.partUploadTimeout; } /** @@ -344,6 +354,11 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key, putObjectRequestBuilder.storageClass(storageClass); } + // Set the timeout for object uploads but not directory markers. + if (!isDirectoryMarker) { + setRequestTimeout(putObjectRequestBuilder, partUploadTimeout); + } + return prepareRequest(putObjectRequestBuilder); } @@ -595,6 +610,9 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder( .partNumber(partNumber) .contentLength(size); uploadPartEncryptionParameters(builder); + + // Set the request timeout for the part upload + setRequestTimeout(builder, partUploadTimeout); return prepareRequest(builder); } @@ -702,6 +720,13 @@ public static final class RequestFactoryBuilder { */ private boolean isMultipartUploadEnabled = true; + /** + * Timeout for uploading objects/parts. + * This will be set on data put/post operations only. + * A zero value means "no custom timeout" + */ + private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT; + private RequestFactoryBuilder() { } @@ -799,6 +824,18 @@ public RequestFactoryBuilder withMultipartUploadEnabled( this.isMultipartUploadEnabled = value; return this; } + + /** + * Timeout for uploading objects/parts. + * This will be set on data put/post operations only. + * A zero value means "no custom timeout" + * @param value new value + * @return the builder + */ + public RequestFactoryBuilder withPartUploadTimeout(final Duration value) { + partUploadTimeout = value; + return this; + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java index 5676e67cde..d1fb28257f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import java.time.LocalDateTime; import java.util.function.Supplier; import javax.annotation.Nullable; @@ -224,6 +225,12 @@ public static abstract class BaseContentProvider */ private T currentStream; + /** + * When did this upload start? + * Use in error messages. + */ + private final LocalDateTime startTime; + /** * Constructor. * @param size size of the data. Must be non-negative. @@ -241,6 +248,7 @@ protected BaseContentProvider(int size, @Nullable Supplier isOpen) { checkArgument(size >= 0, "size is negative: %s", size); this.size = size; this.isOpen = isOpen; + this.startTime = LocalDateTime.now(); } /** @@ -274,8 +282,11 @@ public final InputStream newStream() { close(); checkOpen(); streamCreationCount++; - if (streamCreationCount > 1) { - LOG.info("Stream created more than once: {}", this); + if (streamCreationCount == 2) { + // the stream has been recreated for the first time. + // notify only once for this stream, so as not to flood + // the logs. + LOG.info("Stream recreated: {}", this); } return setCurrentStream(createNewStream()); } @@ -302,6 +313,14 @@ public int getSize() { return size; } + /** + * When did this upload start? + * @return start time + */ + public LocalDateTime getStartTime() { + return startTime; + } + /** * Current stream. * When {@link #newStream()} is called, this is set to the new value, @@ -330,6 +349,7 @@ protected T setCurrentStream(T stream) { public String toString() { return "BaseContentProvider{" + "size=" + size + + ", initiated at " + startTime + ", streamCreationCount=" + streamCreationCount + ", currentStream=" + currentStream + '}'; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java index 5e127050fe..a4eba8a964 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java @@ -41,6 +41,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; @@ -100,7 +101,10 @@ public void testCreateNonRecursiveSuccess() throws IOException { public void testPutObjectDirect() throws Throwable { final S3AFileSystem fs = getFileSystem(); try (AuditSpan span = span()) { - RequestFactory factory = RequestFactoryImpl.builder().withBucket(fs.getBucket()).build(); + RequestFactory factory = RequestFactoryImpl.builder() + .withBucket(fs.getBucket()) + .withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT) + .build(); Path path = path("putDirect"); PutObjectRequest.Builder putObjectRequestBuilder = factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java index 3c53fd6081..44864cd670 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java @@ -54,6 +54,7 @@ import org.apache.hadoop.util.Progressable; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor; import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory; import static org.apache.hadoop.util.Preconditions.checkNotNull; @@ -99,6 +100,7 @@ public class MockS3AFileSystem extends S3AFileSystem { .withRequestPreparer(MockS3AFileSystem::prepareRequest) .withBucket(BUCKET) .withEncryptionSecrets(new EncryptionSecrets()) + .withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT) .build(); /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java index 1abece4bfe..b16ad4f623 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java @@ -153,7 +153,7 @@ public Configuration createConfiguration() { */ @Override public void setup() throws Exception { - SdkFaultInjector.resetEvaluator(); + SdkFaultInjector.resetFaultInjector(); super.setup(); } @@ -161,7 +161,7 @@ public void setup() throws Exception { public void teardown() throws Exception { // safety check in case the evaluation is failing any // request needed in cleanup. - SdkFaultInjector.resetEvaluator(); + SdkFaultInjector.resetFaultInjector(); super.teardown(); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java index dc8270c9ff..9ca12e4f31 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java @@ -21,7 +21,9 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.apache.hadoop.conf.Configuration; @@ -33,8 +35,12 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.api.PerformanceFlagEnum; +import org.apache.hadoop.fs.s3a.test.SdkFaultInjector; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.ConnectTimeoutException; +import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.OperationDuration; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; @@ -42,16 +48,19 @@ import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_ACQUISITION_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_IDLE_TIME; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_TTL; +import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS; import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS; import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES; +import static org.apache.hadoop.fs.s3a.Constants.PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY; import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT; import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX; import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsMillis; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -63,7 +72,7 @@ * The likely cause is actually -Dprefetch test runs as these return connections to * the pool. * However, it is also important to have a non-brittle FS for creating the test file - * and teardow, again, this makes for a flaky test.. + * and teardown, again, this makes for a flaky test. */ public class ITestConnectionTimeouts extends AbstractS3ATestBase { @@ -72,6 +81,23 @@ public class ITestConnectionTimeouts extends AbstractS3ATestBase { */ public static final int FILE_SIZE = 1024; + public static final byte[] DATASET = dataset(FILE_SIZE, '0', 10); + + public static final Duration UPLOAD_DURATION = Duration.ofSeconds(15); + + @Override + protected Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + removeBaseAndBucketOverrides(conf, + DIRECTORY_OPERATIONS_PURGE_UPLOADS, + PART_UPLOAD_TIMEOUT); + setDurationAsMillis(conf, PART_UPLOAD_TIMEOUT, UPLOAD_DURATION); + + // set this so teardown will clean pending uploads. + conf.setBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, true); + return conf; + } + /** * Create a configuration for an FS which has timeouts set to very low values * and no retries. @@ -86,6 +112,7 @@ private Configuration timingOutConfiguration() { ESTABLISH_TIMEOUT, MAX_ERROR_RETRIES, MAXIMUM_CONNECTIONS, + PART_UPLOAD_TIMEOUT, PREFETCH_ENABLED_KEY, REQUEST_TIMEOUT, SOCKET_TIMEOUT, @@ -118,7 +145,6 @@ public void teardown() throws Exception { */ @Test public void testGeneratePoolTimeouts() throws Throwable { - byte[] data = dataset(FILE_SIZE, '0', 10); AWSClientConfig.setMinimumOperationDuration(Duration.ZERO); Configuration conf = timingOutConfiguration(); Path path = methodPath(); @@ -127,7 +153,7 @@ public void testGeneratePoolTimeouts() throws Throwable { final S3AFileSystem fs = getFileSystem(); // create the test file using the good fs, to avoid connection timeouts // during setup. - ContractTestUtils.createFile(fs, path, true, data); + ContractTestUtils.createFile(fs, path, true, DATASET); final FileStatus st = fs.getFileStatus(path); try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) { intercept(ConnectTimeoutException.class, () -> { @@ -148,4 +174,102 @@ public void testGeneratePoolTimeouts() throws Throwable { }); } } + + /** + * Verify that different timeouts are used for object upload operations. + * The PUT operation can take longer than the value set as the + * connection.request.timeout, but other operations (GET) will + * fail. + *

+ * This test tries to balance "being fast" with "not failing assertions + * in parallel test runs". + */ + @Test + public void testObjectUploadTimeouts() throws Throwable { + AWSClientConfig.setMinimumOperationDuration(Duration.ZERO); + final Path dir = methodPath(); + Path file = new Path(dir, "file"); + Configuration conf = new Configuration(getConfiguration()); + removeBaseAndBucketOverrides(conf, + PART_UPLOAD_TIMEOUT, + REQUEST_TIMEOUT, + FS_S3A_PERFORMANCE_FLAGS + ); + + // skip all checks + conf.set(FS_S3A_PERFORMANCE_FLAGS, PerformanceFlagEnum.Create.name()); + final int uploadTimeout = 10; + // uploads have a long timeout + final Duration uploadDuration = Duration.ofSeconds(uploadTimeout); + setDurationAsMillis(conf, PART_UPLOAD_TIMEOUT, uploadDuration); + + // other requests a short one + final Duration shortTimeout = Duration.ofSeconds(5); + setDurationAsMillis(conf, REQUEST_TIMEOUT, shortTimeout); + setDurationAsMillis(conf, CONNECTION_ACQUISITION_TIMEOUT, shortTimeout); + conf.setInt(RETRY_LIMIT, 0); + + SdkFaultInjector.resetFaultInjector(); + // total sleep time is tracked for extra assertions + final AtomicLong totalSleepTime = new AtomicLong(0); + // fault injector is set to sleep for a bit less than the upload timeout. + final long sleepTime = uploadDuration.toMillis() - 2000; + SdkFaultInjector.setAction((req, resp) -> { + totalSleepTime.addAndGet(sleepTime); + LOG.info("sleeping {} millis", sleepTime); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException ignored) { + } + return resp; + }); + SdkFaultInjector.setRequestFailureConditions(999, + SdkFaultInjector::isPutRequest); + SdkFaultInjector.addFaultInjection(conf); + final S3AFileSystem fs = getFileSystem(); + try (FileSystem brittleFS = FileSystem.newInstance(fs.getUri(), conf)) { + OperationDuration dur = new DurationInfo(LOG, "Creating File"); + ContractTestUtils.createFile(brittleFS, file, true, DATASET); + dur.finished(); + Assertions.assertThat(totalSleepTime.get()) + .describedAs("total sleep time of PUT") + .isGreaterThan(0); + Assertions.assertThat(dur.asDuration()) + .describedAs("Duration of write") + .isGreaterThan(shortTimeout) + .isLessThan(uploadDuration); + + // reading the file will fail because sleepiing + totalSleepTime.set(0); + LOG.debug("attempting read"); + SdkFaultInjector.setRequestFailureConditions(999, + SdkFaultInjector::isGetRequest); + // the exact IOE depends on what failed; if it is in the http read it will be a + // software.amazon.awssdk.thirdparty.org.apache.http.ConnectionClosedException + // which is too low level to safely assert about. + // it can also surface as an UncheckedIOException wrapping the inner cause. + intercept(Exception.class, () -> + ContractTestUtils.readUTF8(brittleFS, file, DATASET.length)); + Assertions.assertThat(totalSleepTime.get()) + .describedAs("total sleep time of read") + .isGreaterThan(0); + + // and try a multipart upload to verify that its requests also outlast + // the short requests + SdkFaultInjector.setRequestFailureConditions(999, + SdkFaultInjector::isPartUpload); + Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "0001/__base/file2"); + totalSleepTime.set(0); + OperationDuration dur2 = new DurationInfo(LOG, "Creating File"); + ContractTestUtils.createFile(brittleFS, magicFile, true, DATASET); + dur2.finished(); + Assertions.assertThat(totalSleepTime.get()) + .describedAs("total sleep time of magic write") + .isGreaterThan(0); + Assertions.assertThat(dur2.asDuration()) + .describedAs("Duration of magic write") + .isGreaterThan(shortTimeout); + brittleFS.delete(dir, true); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 9fee2fd63a..b864cd3b63 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -19,16 +19,21 @@ package org.apache.hadoop.fs.s3a.impl; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; import software.amazon.awssdk.awscore.AwsRequest; +import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.core.SdkRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.S3Request; +import software.amazon.awssdk.services.s3.model.UploadPartRequest; import org.apache.hadoop.fs.PathIOException; @@ -38,6 +43,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.test.AbstractHadoopTestBase; +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.assertj.core.api.Assertions.assertThat; @@ -109,8 +115,6 @@ public void testRequestFactoryWithCannedACL() throws Throwable { .isEqualTo(acl); } - - /** * Now add a processor and verify that it was invoked for * exactly as many requests as were analyzed. @@ -207,4 +211,64 @@ public void testMultipartUploadRequest() throws Throwable { .isEqualTo(requestsAnalyzed); } + /** + * Assertion for Request timeouts. + * @param duration expected duration. + * @param request request. + */ + private void assertApiTimeouts(Duration duration, S3Request request) { + Assertions.assertThat(request.overrideConfiguration()) + .describedAs("request %s", request) + .isNotEmpty(); + final AwsRequestOverrideConfiguration override = + request.overrideConfiguration().get(); + Assertions.assertThat(override.apiCallAttemptTimeout()) + .describedAs("apiCallAttemptTimeout") + .hasValue(duration); + Assertions.assertThat(override.apiCallTimeout()) + .describedAs("apiCallTimeout") + .hasValue(duration); + } + + /** + * If not overridden timeouts are set to the default part upload timeout. + */ + @Test + public void testDefaultUploadTimeouts() throws Throwable { + + RequestFactory factory = RequestFactoryImpl.builder() + .withBucket("bucket") + .withMultipartPartCountLimit(2) + .build(); + final UploadPartRequest upload = + factory.newUploadPartRequestBuilder("path", "id", 2, 128_000_000).build(); + assertApiTimeouts(DEFAULT_PART_UPLOAD_TIMEOUT, upload); + } + + /** + * Verify that when upload request timeouts are set, + * they are passed down. + */ + @Test + public void testUploadTimeouts() throws Throwable { + Duration partDuration = Duration.ofDays(1); + RequestFactory factory = RequestFactoryImpl.builder() + .withBucket("bucket") + .withPartUploadTimeout(partDuration) + .build(); + + String path = "path"; + + // A simple PUT + final PutObjectRequest put = factory.newPutObjectRequestBuilder(path, + PutObjectOptions.deletingDirs(), 1024, false).build(); + assertApiTimeouts(partDuration, put); + + // multipart part + final UploadPartRequest upload = factory.newUploadPartRequestBuilder(path, + "1", 3, 128_000_000) + .build(); + assertApiTimeouts(partDuration, upload); + + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java index d5df8c42d5..24ba519adf 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ABlockOutputStreamInterruption.java @@ -166,7 +166,7 @@ protected Configuration createScaleConfiguration() { */ @Override public void setup() throws Exception { - SdkFaultInjector.resetEvaluator(); + SdkFaultInjector.resetFaultInjector(); super.setup(); } @@ -174,7 +174,7 @@ public void setup() throws Exception { public void teardown() throws Exception { // safety check in case the evaluation is failing any // request needed in cleanup. - SdkFaultInjector.resetEvaluator(); + SdkFaultInjector.resetFaultInjector(); super.teardown(); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java index e154ab5676..aa702f158e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesNoMultipart.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.PART_UPLOAD_TIMEOUT; import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; @@ -78,12 +79,13 @@ protected Configuration createScaleConfiguration() { MIN_MULTIPART_THRESHOLD, MULTIPART_UPLOADS_ENABLED, MULTIPART_SIZE, + PART_UPLOAD_TIMEOUT, REQUEST_TIMEOUT); conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360); conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE); conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE); conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); - conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT); + conf.set(PART_UPLOAD_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT); return conf; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/SdkFaultInjector.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/SdkFaultInjector.java index 3af31b3f89..cec7435ba2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/SdkFaultInjector.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/SdkFaultInjector.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a.test; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.function.Function; import org.slf4j.Logger; @@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; +import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.enableLoggingAuditor; import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.resetAuditOptions; import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_EXECUTION_INTERCEPTORS; @@ -77,6 +79,13 @@ public final class SdkFaultInjector implements ExecutionInterceptor { */ private static Function evaluator = ALWAYS_ALLOW; + + /** + * Action to take on failure. + */ + private static BiFunction + action = SdkFaultInjector::patchStatusCode; + /** * Update the value of {@link #FAILURE_STATUS_CODE}. * @param value new value @@ -97,10 +106,14 @@ public static void setEvaluator(Function va /** - * Reset the evaluator to enable everything. + * Reset fault injection. + * The evaluator will enable everything; + * the failure action is set to + * {@link #patchStatusCode(SdkRequest, SdkHttpResponse)}. */ - public static void resetEvaluator() { + public static void resetFaultInjector() { setEvaluator(ALWAYS_ALLOW); + setAction(SdkFaultInjector::patchStatusCode); } /** @@ -123,6 +136,23 @@ public static void setRequestFailureConditions(final int attempts, setEvaluator(condition); } + /** + * Set the action to invoke. + * @param action new action. + */ + public static void setAction(BiFunction action) { + SdkFaultInjector.action = requireNonNull(action); + } + + /** + * Is the response being processed from a GET request? + * @param context request context. + * @return true if the request is of the right type. + */ + public static boolean isGetRequest(final Context.ModifyHttpResponse context) { + return context.httpRequest().method().equals(SdkHttpMethod.GET); + } + /** * Is the response being processed from a PUT request? * @param context request context. @@ -168,6 +198,8 @@ public static boolean isMultipartAbort(final Context.ModifyHttpResponse context) return context.request() instanceof AbortMultipartUploadRequest; } + + /** * Review response from S3 and optionall modify its status code. * @return the original response or a copy with a different status code. @@ -179,14 +211,7 @@ public SdkHttpResponse modifyHttpResponse(final Context.ModifyHttpResponse conte SdkHttpResponse httpResponse = context.httpResponse(); if (evaluator.apply(context) && shouldFail()) { - // fail the request - final int code = FAILURE_STATUS_CODE.get(); - LOG.info("Fault Injector returning {} error code for request {}", - code, request); - - return httpResponse.copy(b -> { - b.statusCode(code); - }); + return action.apply(request, httpResponse); } else { // pass unchanged @@ -194,6 +219,25 @@ public SdkHttpResponse modifyHttpResponse(final Context.ModifyHttpResponse conte } } + /** + * The default fault injector: patch the status code with the value in + * {@link #FAILURE_STATUS_CODE}. + * @param request original request + * @param httpResponse ongoing response + * @return modified response. + */ + public static SdkHttpResponse patchStatusCode(final SdkRequest request, + final SdkHttpResponse httpResponse) { + // fail the request + final int code = FAILURE_STATUS_CODE.get(); + LOG.info("Fault Injector returning {} error code for request {}", + code, request); + + return httpResponse.copy(b -> { + b.statusCode(code); + }); + } + /** * Should the request fail based on the failure count? * @return true if the request count means a request must fail