diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 07f3e8ab1e..fe0f8cc540 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1698,7 +1698,7 @@ fs.s3a.retry.throttle.limit - ${fs.s3a.attempts.maximum} + 20 Number of times to retry any throttled request. @@ -1706,9 +1706,12 @@ fs.s3a.retry.throttle.interval - 1000ms + 100ms - Interval between retry attempts on throttled requests. + Initial between retry attempts on throttled requests, +/- 50%. chosen at random. + i.e. for an intial value of 3000ms, the initial delay would be in the range 1500ms to 4500ms. + Backoffs are exponential; again randomness is used to avoid the thundering heard problem. + 500ms is the default value used by the AWS S3 Retry policy. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java index b509f32b64..b8a5d5cc7d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java @@ -204,9 +204,7 @@ public void testRenameWithNonEmptySubDir() throws Throwable { assertPathExists("not created in src/sub dir", new Path(srcSubDir, "subfile.txt")); - boolean rename = fs.rename(srcDir, finalDir); - assertTrue("rename(" + srcDir + ", " + finalDir + ") failed", - rename); + rename(srcDir, finalDir); // Accept both POSIX rename behavior and CLI rename behavior if (renameRemoveEmptyDest) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index f61634943b..4789630f95 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -418,8 +418,9 @@ public static boolean rm(FileSystem fileSystem, public static void rename(FileSystem fileSystem, Path src, Path dst) throws IOException { rejectRootOperation(src, false); - assertTrue(fileSystem.rename(src, dst)); - assertPathDoesNotExist(fileSystem, "renamed", src); + assertTrue("rename(" + src + ", " + dst + ") failed", + fileSystem.rename(src, dst)); + assertPathDoesNotExist(fileSystem, "renamed source dir", src); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index e107d4987f..561ab4a84a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -174,10 +174,42 @@ private Constants() { public static final String PROXY_DOMAIN = "fs.s3a.proxy.domain"; public static final String PROXY_WORKSTATION = "fs.s3a.proxy.workstation"; - // number of times we should retry errors + /** + * Number of times the AWS client library should retry errors before + * escalating to the S3A code: {@value}. + */ public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum"; + + /** + * Default number of times the AWS client library should retry errors before + * escalating to the S3A code: {@value}. + */ public static final int DEFAULT_MAX_ERROR_RETRIES = 10; + /** + * Experimental/Unstable feature: should the AWS client library retry + * throttle responses before escalating to the S3A code: {@value}. + * + * When set to false, the S3A connector sees all S3 throttle events, + * And so can update it counters and the metrics, and use its own retry + * policy. + * However, this may have adverse effects on some operations where the S3A + * code cannot retry as efficiently as the AWS client library. + * + * This only applies to S3 operations, not to DynamoDB or other services. + */ + @InterfaceStability.Unstable + public static final String EXPERIMENTAL_AWS_INTERNAL_THROTTLING = + "fs.s3a.experimental.aws.s3.throttling"; + + /** + * Default value of {@link #EXPERIMENTAL_AWS_INTERNAL_THROTTLING}, + * value: {@value}. + */ + @InterfaceStability.Unstable + public static final boolean EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT = + true; + // seconds until we give up trying to establish a connection to s3 public static final String ESTABLISH_TIMEOUT = "fs.s3a.connection.establish.timeout"; @@ -225,6 +257,33 @@ private Constants() { public static final String ENABLE_MULTI_DELETE = "fs.s3a.multiobjectdelete.enable"; + /** + * Number of objects to delete in a single multi-object delete {@value}. + * Max: 1000. + * + * A bigger value it means fewer POST requests when deleting a directory + * tree with many objects. + * However, as you are limited to only a a few thousand requests per + * second against a single partition of an S3 bucket, + * a large page size can easily overload the bucket and so trigger + * throttling. + * + * Furthermore, as the reaction to this request is being throttled + * is simply to retry it -it can take a while for the situation to go away. + * While a large value may give better numbers on tests and benchmarks + * where only a single operations being executed, once multiple + * applications start working with the same bucket these large + * deletes can be highly disruptive. + */ + public static final String BULK_DELETE_PAGE_SIZE = + "fs.s3a.bulk.delete.page.size"; + + /** + * Default Number of objects to delete in a single multi-object + * delete: {@value}. + */ + public static final int BULK_DELETE_PAGE_SIZE_DEFAULT = 250; + // comma separated list of directories public static final String BUFFER_DIR = "fs.s3a.buffer.dir"; @@ -733,8 +792,7 @@ private Constants() { /** * Default throttled retry limit: {@value}. */ - public static final int RETRY_THROTTLE_LIMIT_DEFAULT = - DEFAULT_MAX_ERROR_RETRIES; + public static final int RETRY_THROTTLE_LIMIT_DEFAULT = 20; /** * Interval between retry attempts on throttled requests: {@value}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index ff8ba1d6d5..276961bf8b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -34,7 +34,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING; import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT; import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS; /** @@ -56,7 +58,17 @@ public AmazonS3 createS3Client(URI name, final String userAgentSuffix) throws IOException { Configuration conf = getConf(); final ClientConfiguration awsConf = S3AUtils - .createAwsConf(getConf(), bucket, Constants.AWS_SERVICE_IDENTIFIER_S3); + .createAwsConf(conf, bucket, Constants.AWS_SERVICE_IDENTIFIER_S3); + + // When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false + // throttling is explicitly disabled on the S3 client so that + // all failures are collected in S3A instrumentation, and its + // retry policy is the only one used. + // This may cause problems in copy/rename. + awsConf.setUseThrottleRetries( + conf.getBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING, + EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT)); + if (!StringUtils.isEmpty(userAgentSuffix)) { awsConf.setUserAgentSuffix(userAgentSuffix); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index cc12848df9..ce7729fa39 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -99,6 +99,7 @@ import org.apache.hadoop.fs.s3a.auth.SignerManager; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; +import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ContextAccessors; import org.apache.hadoop.fs.s3a.impl.CopyOutcome; @@ -170,6 +171,8 @@ import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations; import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable; import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding; +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404; import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; @@ -273,6 +276,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private ITtlTimeProvider ttlTimeProvider; + /** + * Page size for deletions. + */ + private int pageSize; + /** * Specific operations used by rename and delete operations. */ @@ -440,6 +448,9 @@ public void initialize(URI name, Configuration originalConf) } initMultipartUploads(conf); + + pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE, + BULK_DELETE_PAGE_SIZE_DEFAULT, 0); } catch (AmazonClientException e) { // amazon client exception: stop all services then throw the translation stopAllServices(); @@ -1388,7 +1399,8 @@ private long innerRename(Path source, Path dest) createStoreContext(), src, srcKey, p.getLeft(), dst, dstKey, p.getRight(), - operationCallbacks); + operationCallbacks, + pageSize); return renameOperation.execute(); } @@ -1648,10 +1660,11 @@ protected void incrementGauge(Statistic statistic, long count) { * @param ex exception. */ public void operationRetried(Exception ex) { - Statistic stat = isThrottleException(ex) - ? STORE_IO_THROTTLED - : IGNORED_ERRORS; - incrementStatistic(stat); + if (isThrottleException(ex)) { + operationThrottled(false); + } else { + incrementStatistic(IGNORED_ERRORS); + } } /** @@ -1684,11 +1697,28 @@ public void operationRetried( public void metastoreOperationRetried(Exception ex, int retries, boolean idempotent) { - operationRetried(ex); incrementStatistic(S3GUARD_METADATASTORE_RETRY); if (isThrottleException(ex)) { + operationThrottled(true); + } else { + incrementStatistic(IGNORED_ERRORS); + } + } + + /** + * Note that an operation was throttled -this will update + * specific counters/metrics. + * @param metastore was the throttling observed in the S3Guard metastore? + */ + private void operationThrottled(boolean metastore) { + LOG.debug("Request throttled on {}", metastore ? "S3": "DynamoDB"); + if (metastore) { incrementStatistic(S3GUARD_METADATASTORE_THROTTLED); - instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, 1); + instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, + 1); + } else { + incrementStatistic(STORE_IO_THROTTLED); + instrumentation.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1); } } @@ -1917,6 +1947,13 @@ private void blockRootDelete(String key) throws InvalidRequestException { * Increments the {@code OBJECT_DELETE_REQUESTS} and write * operation statistics. * Retry policy: retry untranslated; delete considered idempotent. + * If the request is throttled, this is logged in the throttle statistics, + * with the counter set to the number of keys, rather than the number + * of invocations of the delete operation. + * This is because S3 considers each key as one mutating operation on + * the store when updating its load counters on a specific partition + * of an S3 bucket. + * If only the request was measured, this operation would under-report. * @param deleteRequest keys to delete on the s3-backend * @return the AWS response * @throws MultiObjectDeleteException one or more of the keys could not @@ -1927,17 +1964,24 @@ private void blockRootDelete(String key) throws InvalidRequestException { private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest) throws MultiObjectDeleteException, AmazonClientException, IOException { incrementWriteOperations(); + BulkDeleteRetryHandler retryHandler = + new BulkDeleteRetryHandler(createStoreContext()); try(DurationInfo ignored = new DurationInfo(LOG, false, "DELETE %d keys", deleteRequest.getKeys().size())) { return invoker.retryUntranslated("delete", DELETE_CONSIDERED_IDEMPOTENT, + (text, e, r, i) -> { + // handle the failure + retryHandler.bulkDeleteRetried(deleteRequest, e); + }, () -> { incrementStatistic(OBJECT_DELETE_REQUESTS, 1); return s3.deleteObjects(deleteRequest); }); } catch (MultiObjectDeleteException e) { - // one or more of the operations failed. + // one or more of the keys could not be deleted. + // log and rethrow List errors = e.getErrors(); LOG.debug("Partial failure of delete, {} errors", errors.size(), e); for (MultiObjectDeleteException.DeleteError error : errors) { @@ -2254,7 +2298,7 @@ private void noteDeleted(final int count, final boolean deleteFakeDir) { */ @VisibleForTesting @Retries.RetryMixed - void removeKeys( + public void removeKeys( final List keysToDelete, final boolean deleteFakeDir, final BulkOperationState operationState) @@ -2349,7 +2393,7 @@ public boolean delete(Path f, boolean recursive) throws IOException { innerGetFileStatus(f, true, StatusProbeEnum.ALL), recursive, operationCallbacks, - InternalConstants.MAX_ENTRIES_TO_DELETE); + pageSize); boolean outcome = deleteOperation.execute(); if (outcome) { try { @@ -2830,7 +2874,7 @@ S3AFileStatus innerGetFileStatus(final Path f, S3AFileStatus s3GetFileStatus(final Path path, final String key, final Set probes, - final Set tombstones) throws IOException { + @Nullable Set tombstones) throws IOException { if (!key.isEmpty()) { if (probes.contains(StatusProbeEnum.Head) && !key.endsWith("/")) { try { @@ -3515,7 +3559,14 @@ void finishedWrite(String key, long length, String eTag, String versionId, key, length, eTag, versionId); Path p = keyToQualifiedPath(key); Preconditions.checkArgument(length >= 0, "content length is negative"); - deleteUnnecessaryFakeDirectories(p.getParent()); + final boolean isDir = objectRepresentsDirectory(key, length); + // kick off an async delete + final CompletableFuture deletion = submit( + unboundedThreadPool, + () -> { + deleteUnnecessaryFakeDirectories(p.getParent()); + return null; + }); // this is only set if there is a metastore to update and the // operationState parameter passed in was null. BulkOperationState stateToClose = null; @@ -3529,12 +3580,13 @@ void finishedWrite(String key, long length, String eTag, String versionId, // information gleaned from addAncestors is preserved into the // subsequent put. stateToClose = S3Guard.initiateBulkWrite(metadataStore, - BulkOperationState.OperationType.Mkdir, + isDir + ? BulkOperationState.OperationType.Mkdir + : BulkOperationState.OperationType.Put, keyToPath(key)); activeState = stateToClose; } S3Guard.addAncestors(metadataStore, p, ttlTimeProvider, activeState); - final boolean isDir = objectRepresentsDirectory(key, length); S3AFileStatus status = createUploadFileStatus(p, isDir, length, getDefaultBlockSize(p), username, eTag, versionId); @@ -3557,6 +3609,8 @@ void finishedWrite(String key, long length, String eTag, String versionId, activeState); } } + // and catch up with any delete operation. + waitForCompletionIgnoringExceptions(deletion); } catch (IOException e) { if (failOnMetadataWriteError) { throw new MetadataPersistenceException(p.toString(), e); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 4e1de370a6..b9918b5098 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -123,6 +123,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource { private final MutableCounterLong ignoredErrors; private final MutableQuantiles putLatencyQuantile; private final MutableQuantiles throttleRateQuantile; + private final MutableQuantiles s3GuardThrottleRateQuantile; private final MutableCounterLong numberOfFilesCreated; private final MutableCounterLong numberOfFilesCopied; private final MutableCounterLong bytesOfFilesCopied; @@ -248,7 +249,9 @@ public S3AInstrumentation(URI name) { int interval = 1; putLatencyQuantile = quantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY, "ops", "latency", interval); - throttleRateQuantile = quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, + s3GuardThrottleRateQuantile = quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, + "events", "frequency (Hz)", interval); + throttleRateQuantile = quantiles(STORE_IO_THROTTLE_RATE, "events", "frequency (Hz)", interval); registerAsMetricsSource(name); @@ -617,6 +620,7 @@ public void close() { // task in a shared thread pool. putLatencyQuantile.stop(); throttleRateQuantile.stop(); + s3GuardThrottleRateQuantile.stop(); metricsSystem.unregisterSource(metricsSourceName); int activeSources = --metricsSourceActiveCounter; if (activeSources == 0) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 06c60a46f5..1d3d475802 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -230,6 +230,8 @@ public enum Statistic { "S3Guard metadata store authoritative directories updated from S3"), STORE_IO_THROTTLED("store_io_throttled", "Requests throttled and retried"), + STORE_IO_THROTTLE_RATE("store_io_throttle_rate", + "Rate of S3 request throttling"), DELEGATION_TOKENS_ISSUED("delegation_tokens_issued", "Number of delegation tokens issued"); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteRetryHandler.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteRetryHandler.java new file mode 100644 index 0000000000..b2c1cc6271 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteRetryHandler.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.util.List; + +import com.amazonaws.SdkClientException; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.s3a.AWSClientIOException; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.S3AStorageStatistics; +import org.apache.hadoop.fs.s3a.Statistic; + +import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException; +import static org.apache.hadoop.fs.s3a.Statistic.IGNORED_ERRORS; +import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLED; +import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLE_RATE; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.THROTTLE_LOG_NAME; + +/** + * Handler for bulk delete retry events. + */ +public class BulkDeleteRetryHandler extends AbstractStoreOperation { + + private static final Logger LOG = LoggerFactory.getLogger( + BulkDeleteRetryHandler.class); + + private static final Logger THROTTLE_LOG = LoggerFactory.getLogger( + THROTTLE_LOG_NAME); + + /** + * This is an error string we see in exceptions when the XML parser + * failed: {@value}. + */ + public static final String XML_PARSE_BROKEN = "Failed to parse XML document"; + + private final S3AInstrumentation instrumentation; + + private final S3AStorageStatistics storageStatistics; + + /** + * Constructor. + * @param storeContext context + */ + public BulkDeleteRetryHandler(final StoreContext storeContext) { + super(storeContext); + instrumentation = storeContext.getInstrumentation(); + storageStatistics = storeContext.getStorageStatistics(); + } + + /** + * Increment a statistic by 1. + * This increments both the instrumentation and storage statistics. + * @param statistic The operation to increment + */ + protected void incrementStatistic(Statistic statistic) { + incrementStatistic(statistic, 1); + } + + /** + * Increment a statistic by a specific value. + * This increments both the instrumentation and storage statistics. + * @param statistic The operation to increment + * @param count the count to increment + */ + protected void incrementStatistic(Statistic statistic, long count) { + instrumentation.incrementCounter(statistic, count); + storageStatistics.incrementCounter(statistic, count); + } + + /** + * Handler for failure of bulk delete requests. + * @param deleteRequest request which was retried. + * @param ex exception + */ + public void bulkDeleteRetried( + DeleteObjectsRequest deleteRequest, + Exception ex) { + LOG.debug("Retrying on error during bulk delete", ex); + if (isThrottleException(ex)) { + onDeleteThrottled(deleteRequest); + } else if (isSymptomOfBrokenConnection(ex)) { + // this is one which surfaces when an HTTPS connection is broken while + // the service is reading the result. + // it is treated as a throttle event for statistics + LOG.warn("Bulk delete operation interrupted: {}", ex.getMessage()); + onDeleteThrottled(deleteRequest); + } else { + incrementStatistic(IGNORED_ERRORS); + } + } + + /** + * Handle a delete throttling event. + * @param deleteRequest request which failed. + */ + private void onDeleteThrottled(final DeleteObjectsRequest deleteRequest) { + final List keys = deleteRequest.getKeys(); + final int size = keys.size(); + incrementStatistic(STORE_IO_THROTTLED, size); + instrumentation.addValueToQuantiles(STORE_IO_THROTTLE_RATE, size); + THROTTLE_LOG.info( + "Bulk delete {} keys throttled -first key = {}; last = {}", + size, + keys.get(0).getKey(), + keys.get(size - 1).getKey()); + } + + /** + * Does this error indicate that the connection was ultimately broken while + * the XML Response was parsed? As this seems a symptom of the far end + * blocking the response (i.e. server-side throttling) while + * the client eventually times out. + * @param ex exception received. + * @return true if this exception is considered a sign of a broken connection. + */ + private boolean isSymptomOfBrokenConnection(final Exception ex) { + return ex instanceof AWSClientIOException + && ex.getCause() instanceof SdkClientException + && ex.getMessage().contains(XML_PARSE_BROKEN); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java index 609eecee64..1c61d30b08 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.s3a.impl; +import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.concurrent.Callable; @@ -123,4 +124,34 @@ public static void waitForCompletion( } } + /** + * Wait for a single of future to complete, ignoring exceptions raised. + * @param future future to wait for. + */ + public static void waitForCompletionIgnoringExceptions( + @Nullable final CompletableFuture future) { + if (future != null) { + try (DurationInfo ignore = + new DurationInfo(LOG, false, "Waiting for task completion")) { + future.join(); + } catch (Exception e) { + LOG.debug("Ignoring exception raised in task completion: "); + } + } + } + + /** + * Block awaiting completion for any non-null future passed in; + * No-op if a null arg was supplied. + * @param future future + * @throws IOException if one of the called futures raised an IOE. + * @throws RuntimeException if one of the futures raised one. + */ + public static void maybeAwaitCompletion( + @Nullable final CompletableFuture future) + throws IOException { + if (future != null) { + waitForCompletion(future); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java index 4ff1f8223b..daf93d99bd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java @@ -45,8 +45,8 @@ import org.apache.hadoop.util.DurationInfo; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.maybeAwaitCompletion; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; -import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; /** * Implementation of the delete() operation. @@ -186,7 +186,7 @@ public class DeleteOperation extends ExecutingStoreOperation { * @param status pre-fetched source status * @param recursive recursive delete? * @param callbacks callback provider - * @param pageSize number of entries in a page + * @param pageSize size of delete pages */ public DeleteOperation(final StoreContext context, final S3AFileStatus status, @@ -200,7 +200,7 @@ public DeleteOperation(final StoreContext context, this.callbacks = callbacks; checkArgument(pageSize > 0 && pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE, - "page size out of range: %d", pageSize); + "page size out of range: %s", pageSize); this.pageSize = pageSize; metadataStore = context.getMetadataStore(); executor = context.createThrottledExecutor(1); @@ -557,22 +557,5 @@ private void asyncDeleteAction( } } - /** - * Block awaiting completion for any non-null future passed in; - * No-op if a null arg was supplied. - * @param future future - * @throws IOException if one of the called futures raised an IOE. - * @throws RuntimeException if one of the futures raised one. - */ - private void maybeAwaitCompletion( - @Nullable final CompletableFuture future) - throws IOException { - if (future != null) { - try (DurationInfo ignored = - new DurationInfo(LOG, false, "delete completion")) { - waitForCompletion(future); - } - } - } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index 82250afc05..c73580d19f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -85,4 +85,12 @@ private InternalConstants() { /** 404 error code. */ public static final int SC_404 = 404; + + /** Name of the log for throttling events. Value: {@value}. */ + public static final String THROTTLE_LOG_NAME = + "org.apache.hadoop.fs.s3a.throttled"; + + /** Directory marker attribute: see HADOOP-16613. Value: {@value}. */ + public static final String X_DIRECTORY = + "application/x-directory"; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java index a3561072c1..750aebf500 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java @@ -50,7 +50,6 @@ import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_BLOCKSIZE; -import static org.apache.hadoop.fs.s3a.impl.InternalConstants.MAX_ENTRIES_TO_DELETE; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.RENAME_PARALLEL_LIMIT; /** @@ -99,8 +98,14 @@ public class RenameOperation extends ExecutingStoreOperation { /** * Counter of bytes copied. */ + private final AtomicLong bytesCopied = new AtomicLong(); + /** + * Page size for bulk deletes. + */ + private final int pageSize; + /** * Rename tracker. */ @@ -137,6 +142,7 @@ public class RenameOperation extends ExecutingStoreOperation { * @param destKey destination key * @param destStatus destination status. * @param callbacks callback provider + * @param pageSize size of delete requests */ public RenameOperation( final StoreContext storeContext, @@ -146,7 +152,8 @@ public RenameOperation( final Path destPath, final String destKey, final S3AFileStatus destStatus, - final OperationCallbacks callbacks) { + final OperationCallbacks callbacks, + final int pageSize) { super(storeContext); this.sourcePath = sourcePath; this.sourceKey = sourceKey; @@ -157,6 +164,7 @@ public RenameOperation( this.callbacks = callbacks; blocksize = storeContext.getConfiguration() .getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE); + this.pageSize = pageSize; } /** @@ -360,7 +368,7 @@ protected void recursiveDirectoryRename() throws IOException { LOG.debug("Waiting for active copies to complete"); completeActiveCopies("batch threshold reached"); } - if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { + if (keysToDelete.size() == pageSize) { // finish ongoing copies then delete all queued keys. // provided the parallel limit is a factor of the max entry // constant, this will not need to block for the copy, and diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java index 7a273a66c2..536481ac23 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java @@ -439,8 +439,8 @@ private void dumpEntry(CsvFile csv, DDBPathMetadata md) { private Pair scanMetastore(CsvFile csv) { S3GuardTableAccess tableAccess = new S3GuardTableAccess(getStore()); ExpressionSpecBuilder builder = new ExpressionSpecBuilder(); - Iterable results = tableAccess.scanMetadata( - builder); + Iterable results = + getStore().wrapWithRetries(tableAccess.scanMetadata(builder)); long live = 0; long tombstone = 0; for (DDBPathMetadata md : results) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 143e276d1f..38b38fb7f9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -80,6 +80,7 @@ import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.impl.FunctionsRaisingIOE; +import org.apache.hadoop.fs.impl.WrappedIOException; import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; import org.apache.hadoop.fs.s3a.AWSServiceThrottledException; import org.apache.hadoop.fs.s3a.Constants; @@ -324,8 +325,12 @@ public class DynamoDBMetadataStore implements MetadataStore, /** Invoker for write operations. */ private Invoker writeOp; + /** Invoker for scan operations. */ + private Invoker scanOp; + private final AtomicLong readThrottleEvents = new AtomicLong(0); private final AtomicLong writeThrottleEvents = new AtomicLong(0); + private final AtomicLong scanThrottleEvents = new AtomicLong(0); private final AtomicLong batchWriteCapacityExceededEvents = new AtomicLong(0); /** @@ -424,11 +429,6 @@ public void initialize(FileSystem fs, ITtlTimeProvider ttlTp) tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket); initDataAccessRetries(conf); - // set up a full retry policy - invoker = new Invoker(new S3GuardDataAccessRetryPolicy(conf), - this::retryEvent - ); - this.ttlTimeProvider = ttlTp; tableHandler = new DynamoDBMetadataStoreTableManager( @@ -543,6 +543,7 @@ private void initDataAccessRetries(Configuration config) { = new S3GuardDataAccessRetryPolicy(config); readOp = new Invoker(throttledRetryRetryPolicy, this::readRetryEvent); writeOp = new Invoker(throttledRetryRetryPolicy, this::writeRetryEvent); + scanOp = new Invoker(throttledRetryRetryPolicy, this::scanRetryEvent); } @Override @@ -810,33 +811,31 @@ public DirListingMetadata listChildren(final Path path) throws IOException { checkPath(path); LOG.debug("Listing table {} in region {}: {}", tableName, region, path); + final QuerySpec spec = new QuerySpec() + .withHashKey(pathToParentKeyAttribute(path)) + .withConsistentRead(true); // strictly consistent read + final List metas = new ArrayList<>(); // find the children in the table - return readOp.retry( + final ItemCollection items = scanOp.retry( "listChildren", path.toString(), true, - () -> { - final QuerySpec spec = new QuerySpec() - .withHashKey(pathToParentKeyAttribute(path)) - .withConsistentRead(true); // strictly consistent read - final ItemCollection items = table.query(spec); + () -> table.query(spec)); + // now wrap the result with retry logic + try { + for (Item item : wrapWithRetries(items)) { + metas.add(itemToPathMetadata(item, username)); + } + } catch (WrappedIOException e) { + // failure in the iterators; unwrap. + throw e.getCause(); + } - final List metas = new ArrayList<>(); - for (Item item : items) { - DDBPathMetadata meta = itemToPathMetadata(item, username); - metas.add(meta); - } - - // Minor race condition here - if the path is deleted between - // getting the list of items and the directory metadata we might - // get a null in DDBPathMetadata. - DDBPathMetadata dirPathMeta = get(path); - - final DirListingMetadata dirListing = - getDirListingMetadataFromDirMetaAndList(path, metas, - dirPathMeta); - return dirListing; - }); + // Minor race condition here - if the path is deleted between + // getting the list of items and the directory metadata we might + // get a null in DDBPathMetadata. + return getDirListingMetadataFromDirMetaAndList(path, metas, + get(path)); } DirListingMetadata getDirListingMetadataFromDirMetaAndList(Path path, @@ -1992,6 +1991,22 @@ void writeRetryEvent( retryEvent(text, ex, attempts, idempotent); } + /** + * Callback on a scan operation retried. + * @param text text of the operation + * @param ex exception + * @param attempts number of attempts + * @param idempotent is the method idempotent (this is assumed to be true) + */ + void scanRetryEvent( + String text, + IOException ex, + int attempts, + boolean idempotent) { + scanThrottleEvents.incrementAndGet(); + retryEvent(text, ex, attempts, idempotent); + } + /** * Callback from {@link Invoker} when an operation is retried. * @param text text of the operation @@ -2048,14 +2063,38 @@ public long getWriteThrottleEventCount() { return writeThrottleEvents.get(); } + /** + * Get the count of scan throttle events. + * @return the current count of scan throttle events. + */ + @VisibleForTesting + public long getScanThrottleEventCount() { + return scanThrottleEvents.get(); + } + @VisibleForTesting public long getBatchWriteCapacityExceededCount() { return batchWriteCapacityExceededEvents.get(); } - @VisibleForTesting + /** + * Get the operation invoker for write operations. + * @return an invoker for retrying mutating operations on a store. + */ public Invoker getInvoker() { - return invoker; + return writeOp; + } + + /** + * Wrap an iterator returned from any scan with a retrying one. + * This includes throttle handling. + * Retries will update the relevant counters/metrics for scan operations. + * @param source source iterator + * @return a retrying iterator. + */ + public Iterable wrapWithRetries( + final Iterable source) { + return new RetryingCollection<>("scan dynamoDB table", scanOp, source); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PurgeS3GuardDynamoTable.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PurgeS3GuardDynamoTable.java index 244779abb9..ad298c222a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PurgeS3GuardDynamoTable.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PurgeS3GuardDynamoTable.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a.s3guard; import javax.annotation.Nullable; +import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; @@ -128,9 +129,10 @@ protected void serviceStart() throws Exception { * delete all entries from that bucket. * @return the exit code. * @throws ServiceLaunchException on failure. + * @throws IOException IO failure. */ @Override - public int execute() throws ServiceLaunchException { + public int execute() throws ServiceLaunchException, IOException { URI uri = getUri(); String host = uri.getHost(); @@ -144,7 +146,8 @@ public int execute() throws ServiceLaunchException { LOG.info("Scanning for entries with prefix {} to delete from {}", prefix, ddbms); - Iterable entries = tableAccess.scanMetadata(builder); + Iterable entries = + ddbms.wrapWithRetries(tableAccess.scanMetadata(builder)); List list = new ArrayList<>(); entries.iterator().forEachRemaining(e -> { if (!(e instanceof S3GuardTableAccess.VersionMarker)) { @@ -169,7 +172,14 @@ public int execute() throws ServiceLaunchException { new DurationInfo(LOG, "deleting %s entries from %s", count, ddbms.toString()); - tableAccess.delete(list); + // sending this in one by one for more efficient retries + for (Path path: list) { + ddbms.getInvoker() + .retry("delete", + prefix, + true, + () -> tableAccess.delete(path)); + } duration.close(); long durationMillis = duration.value(); long timePerEntry = durationMillis / count; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/RetryingCollection.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/RetryingCollection.java new file mode 100644 index 0000000000..394f393c57 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/RetryingCollection.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.s3guard; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.fs.impl.WrappedIOException; +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.Retries; + +/** + * A collection which wraps the result of a query or scan + * with retries. + * Important: iterate through this only once; the outcome + * of repeating an iteration is "undefined" + * @param type of outcome. + */ +class RetryingCollection implements Iterable { + + /** + * Source iterable. + */ + private final Iterable source; + + /** + * Invoker for retries. + */ + private final Invoker invoker; + + /** + * Operation name for invoker.retry messages. + */ + private final String operation; + + /** + * Constructor. + * @param operation Operation name for invoker.retry messages. + * @param invoker Invoker for retries. + * @param source Source iterable. + */ + RetryingCollection( + final String operation, + final Invoker invoker, + final Iterable source) { + this.operation = operation; + this.source = source; + this.invoker = invoker; + } + + /** + * Demand creates a new iterator which will retry all hasNext/next + * operations through the invoker supplied in the constructor. + * @return a new iterator. + */ + @Override + public Iterator iterator() { + return new RetryingIterator(source.iterator()); + } + + /** + * An iterator which wraps a non-retrying iterator of scan results + * (i.e {@code S3GuardTableAccess.DDBPathMetadataIterator}. + */ + private final class RetryingIterator implements Iterator { + + private final Iterator iterator; + + private RetryingIterator(final Iterator iterator) { + this.iterator = iterator; + } + + /** + * {@inheritDoc}. + * @throws WrappedIOException for IO failure, including throttling. + */ + @Override + @Retries.RetryTranslated + public boolean hasNext() { + try { + return invoker.retry( + operation, + null, + true, + iterator::hasNext); + } catch (IOException e) { + throw new WrappedIOException(e); + } + } + + /** + * {@inheritDoc}. + * @throws WrappedIOException for IO failure, including throttling. + */ + @Override + @Retries.RetryTranslated + public T next() { + try { + return invoker.retry( + "Scan Dynamo", + null, + true, + iterator::next); + } catch (IOException e) { + throw new WrappedIOException(e); + } + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java index 19ef90e455..7e8413b3de 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTableAccess.java @@ -36,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.s3a.S3AFileStatus; import static com.google.common.base.Preconditions.checkNotNull; @@ -44,6 +45,7 @@ import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.TABLE_VERSION; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.itemToPathMetadata; +import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.pathToKey; /** * Package-scoped accessor to table state in S3Guard. @@ -70,6 +72,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Unstable +@Retries.OnceRaw class S3GuardTableAccess { private static final Logger LOG = @@ -107,6 +110,7 @@ private String getUsername() { * @param spec query spec. * @return the outcome. */ + @Retries.OnceRaw ItemCollection query(QuerySpec spec) { return table.query(spec); } @@ -118,24 +122,33 @@ ItemCollection query(QuerySpec spec) { * @param spec query spec. * @return an iterator over path entries. */ + @Retries.OnceRaw Iterable queryMetadata(QuerySpec spec) { return new DDBPathMetadataCollection<>(query(spec)); } + @Retries.OnceRaw ItemCollection scan(ExpressionSpecBuilder spec) { return table.scan(spec.buildForScan()); } + @Retries.OnceRaw Iterable scanMetadata(ExpressionSpecBuilder spec) { return new DDBPathMetadataCollection<>(scan(spec)); } + @Retries.OnceRaw void delete(Collection paths) { paths.stream() .map(PathMetadataDynamoDBTranslation::pathToKey) .forEach(table::deleteItem); } + @Retries.OnceRaw + void delete(Path path) { + table.deleteItem(pathToKey(path)); + } + /** * A collection which wraps the result of a query or scan. * Important: iterate through this only once; the outcome @@ -191,11 +204,13 @@ private DDBPathMetadataIterator(final IteratorSupport it) { } @Override + @Retries.OnceRaw public boolean hasNext() { return it.hasNext(); } @Override + @Retries.OnceRaw public DDBPathMetadata next() { Item item = it.next(); Pair key = primaryKey(item); diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md index 3c201e106a..1d1b9fdbe2 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md @@ -464,6 +464,22 @@ Otherwise, set a large timeout in `fs.s3a.scale.test.timeout` The tests are executed in an order to only clean up created files after the end of all the tests. If the tests are interrupted, the test data will remain. +## Load tests. + +Some are designed to overload AWS services with more +requests per second than an AWS account is permitted. + +The operation of these test maybe observable to other users of the same +account -especially if they are working in the AWS region to which the +tests are targeted. + +There may also run up larger bills. + +These tests all have the prefix `ILoadTest` + +They do not run automatically: they must be explicitly run from the command line or an IDE. + +Look in the source for these and reads the Javadocs before executing. ## Testing against non AWS S3 endpoints. @@ -1399,6 +1415,9 @@ as it may take a couple of SDK updates before it is ready. in `fs.s3a.assumed.role.arn` for testing assumed roles, and `fs.s3a.server-side-encryption.key` for encryption, for full coverage. If you can, scale up the scale tests. +1. Run the `ILoadTest*` load tests from your IDE or via maven through + `mvn verify -Dtest=skip -Dit.test=ILoadTest\*` ; look for regressions in performance + as much as failures. 1. Create the site with `mvn site -DskipTests`; look in `target/site` for the report. 1. Review *every single `-output.txt` file in `hadoop-tools/hadoop-aws/target/failsafe-reports`, paying particular attention to @@ -1492,6 +1511,7 @@ Then see if complete successfully in roughly the same time once the upgrade is a to AWS services. * Try and get other people, especially anyone with their own endpoints, apps or different deployment environments, to run their own tests. +* Run the load tests, especially `ILoadTestS3ABulkDeleteThrottling`. ### Dealing with Deprecated APIs and New Features diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java index a631410d8b..e623d5d279 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java @@ -156,7 +156,7 @@ public void testRenamePopulatesFileAncestors2() throws Exception { S3ATestUtils.MetricDiff fileCopyBytes = new S3ATestUtils.MetricDiff(fs, Statistic.FILES_COPIED_BYTES); - fs.rename(src, dest); + rename(src, dest); describe("Rename has completed, examining data under " + base); fileCopyDiff.assertDiffEquals("Number of files copied", 1); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java index 455a8a3ebd..74fe45d72d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java @@ -525,6 +525,7 @@ public void testRevertMissingCommit() throws Throwable { commit.setDestinationKey(fs.pathToKey(destFile)); fullThrottle(); actions.revertCommit(commit, null); + resetFailures(); assertPathExists("parent of reverted (nonexistent) commit", destFile.getParent()); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java index 69ca553b9c..1b53ef5a91 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java @@ -818,16 +818,25 @@ private static void buildPaths( } // create the file paths for (int i = 0; i < fileCount; i++) { - String name = PREFIX + i; + String name = filenameOfIndex(i); Path p = new Path(destDir, name); filePaths.add(p); } for (int i = 0; i < dirCount; i++) { - String name = "dir-" + i; + String name = String.format("dir-%03d", i); Path p = new Path(destDir, name); dirPaths.add(p); buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount); } } + + /** + * Given an index, return a string to use as the filename. + * @param i index + * @return name + */ + public static String filenameOfIndex(final int i) { + return String.format("%s%03d", PREFIX, i); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java index 4ad789965a..fc81c8d854 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStore.java @@ -130,8 +130,10 @@ public ITestDynamoDBMetadataStore() { private String bucket; + @SuppressWarnings("StaticNonFinalField") private static DynamoDBMetadataStore ddbmsStatic; + @SuppressWarnings("StaticNonFinalField") private static String testDynamoDBTableName; private static final List UNCHANGED_ENTRIES = Collections.emptyList(); @@ -166,13 +168,17 @@ public void setUp() throws Exception { try{ super.setUp(); - tableHandler = getDynamoMetadataStore().getTableHandler(); } catch (FileNotFoundException e){ LOG.warn("MetadataStoreTestBase setup failed. Waiting for table to be " - + "deleted before trying again."); - ddbmsStatic.getTable().waitForDelete(); + + "deleted before trying again.", e); + try { + ddbmsStatic.getTable().waitForDelete(); + } catch (IllegalArgumentException | InterruptedException ex) { + LOG.warn("When awaiting a table to be cleaned up", e); + } super.setUp(); } + tableHandler = getDynamoMetadataStore().getTableHandler(); } @BeforeClass @@ -780,10 +786,16 @@ private void removeVersionMarkerTag(Table table, AmazonDynamoDB addb) { .withTagKeys(VERSION_MARKER_TAG_NAME)); } - private void deleteVersionMarkerItem(Table table) { + /** + * Deletes a version marker; spins briefly to await it disappearing. + * @param table table to delete the key + * @throws Exception failure + */ + private void deleteVersionMarkerItem(Table table) throws Exception { table.deleteItem(VERSION_MARKER_PRIMARY_KEY); - assertNull("Version marker should be null after deleting it " + - "from the table.", table.getItem(VERSION_MARKER_PRIMARY_KEY)); + eventually(30_000, 1_0, () -> + assertNull("Version marker should be null after deleting it " + + "from the table.", table.getItem(VERSION_MARKER_PRIMARY_KEY))); } /** @@ -1003,7 +1015,8 @@ public void testDeleteTable() throws Exception { final String tableName = getTestTableName("testDeleteTable"); Path testPath = new Path(new Path(fsUri), "/" + tableName); final S3AFileSystem s3afs = getFileSystem(); - final Configuration conf = getTableCreationConfig(); + // patch the filesystem config as this is one read in initialize() + final Configuration conf = s3afs.getConf(); conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName); enableOnDemand(conf); DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java index f2f37f21ea..75b630ae22 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestDynamoDBMetadataStoreScale.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a.s3guard; import javax.annotation.Nullable; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -34,6 +35,7 @@ import com.amazonaws.services.dynamodbv2.document.Table; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder; +import org.assertj.core.api.Assertions; import org.junit.Assume; import org.junit.FixMethodOrder; import org.junit.Test; @@ -47,14 +49,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.impl.WrappedIOException; import org.apache.hadoop.fs.s3a.AWSServiceThrottledException; +import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AStorageStatistics; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.scale.AbstractITestS3AMetadataStoreScale; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.DurationInfo; @@ -62,7 +65,7 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.s3guard.MetadataStoreTestBase.basicFileStatus; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT; -import static org.junit.Assume.*; +import static org.junit.Assume.assumeTrue; /** * Scale test for DynamoDBMetadataStore. @@ -91,6 +94,20 @@ public class ITestDynamoDBMetadataStoreScale private static final long MAXIMUM_READ_CAPACITY = 10; private static final long MAXIMUM_WRITE_CAPACITY = 15; + /** + * Time in milliseconds to sleep after a test throttled: + * {@value}. + * This is to help isolate throttling to the test which failed, + * rather than have it surface in a followup test. + * Also the test reports will record durations more accurately, + * as JUnit doesn't include setup/teardown times in its reports. + * There's a cost: single test runs will sleep, and the last test + * run may throttle when it doesn't need to. + * The last test {}@link {@link #test_999_delete_all_entries()} + * doesn't do the sleep so a full batch run should not suffer here. + */ + public static final int THROTTLE_RECOVER_TIME_MILLIS = 5_000; + private DynamoDBMetadataStore ddbms; private DynamoDBMetadataStoreTableManager tableHandler; @@ -119,7 +136,7 @@ public class ITestDynamoDBMetadataStoreScale * @throws AssumptionViolatedException if the FS isn't running S3Guard + DDB/ */ @Override - public MetadataStore createMetadataStore() throws IOException { + public DynamoDBMetadataStore createMetadataStore() throws IOException { S3AFileSystem fs = getFileSystem(); assumeTrue("S3Guard is disabled for " + fs.getUri(), fs.hasMetadataStore()); @@ -145,12 +162,16 @@ public MetadataStore createMetadataStore() throws IOException { conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName); conf.set(S3GUARD_DDB_REGION_KEY, region); conf.set(S3GUARD_DDB_THROTTLE_RETRY_INTERVAL, "50ms"); - conf.set(S3GUARD_DDB_MAX_RETRIES, "2"); + conf.set(S3GUARD_DDB_MAX_RETRIES, "1"); conf.set(MAX_ERROR_RETRIES, "1"); conf.set(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, "5ms"); DynamoDBMetadataStore ms = new DynamoDBMetadataStore(); - ms.initialize(conf, new S3Guard.TtlTimeProvider(conf)); + // init the metastore in a bigger retry loop than the test setup + // in case the previous test case overloaded things + final Invoker fsInvoker = fs.createStoreContext().getInvoker(); + fsInvoker.retry("init metastore", null, true, + () -> ms.initialize(conf, new S3Guard.TtlTimeProvider(conf))); // wire up the owner FS so that we can make assertions about throttle // events ms.bindToOwnerFilesystem(fs); @@ -168,8 +189,7 @@ public void setup() throws Exception { table = ddb.getTable(tableName); originalCapacity = table.describe().getProvisionedThroughput(); - // If you set the same provisioned I/O as already set it throws an - // exception, avoid that. + // is this table too big for throttling to surface? isOverProvisionedForTest = ( originalCapacity.getReadCapacityUnits() > MAXIMUM_READ_CAPACITY || originalCapacity.getWriteCapacityUnits() > MAXIMUM_WRITE_CAPACITY); @@ -177,29 +197,47 @@ public void setup() throws Exception { @Override public void teardown() throws Exception { - if (ddbms != null) { - S3GuardTableAccess tableAccess = new S3GuardTableAccess(ddbms); - ExpressionSpecBuilder builder = new ExpressionSpecBuilder(); - builder.withCondition( - ExpressionSpecBuilder.S(PARENT).beginsWith("/test/")); - - Iterable entries = tableAccess.scanMetadata(builder); - List list = new ArrayList<>(); - entries.iterator().forEachRemaining(e -> { - Path p = e.getFileStatus().getPath(); - LOG.info("Deleting {}", p); - list.add(p); - }); - tableAccess.delete(list); - } IOUtils.cleanupWithLogger(LOG, ddbms); super.teardown(); } + /** + * Is throttling likely? + * @return true if the DDB table has prepaid IO and is small enough + * to throttle. + */ private boolean expectThrottling() { return !isOverProvisionedForTest && !isOnDemandTable; } + /** + * Recover from throttling by sleeping briefly. + */ + private void recoverFromThrottling() throws InterruptedException { + LOG.info("Sleeping to recover from throttling for {} ms", + THROTTLE_RECOVER_TIME_MILLIS); + Thread.sleep(THROTTLE_RECOVER_TIME_MILLIS); + } + + /** + * The subclass expects the superclass to be throttled; sometimes it is. + */ + @Test + @Override + public void test_010_Put() throws Throwable { + ThrottleTracker tracker = new ThrottleTracker(ddbms); + try { + // if this doesn't throttle, all is well. + super.test_010_Put(); + } catch (AWSServiceThrottledException ex) { + // if the service was throttled, all is good. + // log and continue + LOG.warn("DDB connection was throttled", ex); + } finally { + LOG.info("Statistics {}", tracker); + } + } + /** * The subclass expects the superclass to be throttled; sometimes it is. */ @@ -283,9 +321,8 @@ public void test_030_BatchedWrite() throws Exception { } } }); - if (expectThrottling()) { - assertNotEquals("No batch retries in " + result, - 0, result.getBatchThrottles()); + if (expectThrottling() && result.probeThrottlingDetected()) { + recoverFromThrottling(); } } finally { describe("Cleaning up table %s", tableName); @@ -326,7 +363,12 @@ public void test_050_getVersionMarkerItem() throws Throwable { execute("get", OPERATIONS_PER_THREAD * 2, expectThrottling(), - () -> tableHandler.getVersionMarkerItem() + () -> { + try { + tableHandler.getVersionMarkerItem(); + } catch (FileNotFoundException ignored) { + } + } ); } @@ -473,6 +515,39 @@ public void test_900_instrumentation() throws Throwable { statistics.getLong(throttledKey) > 0); } + @Test + public void test_999_delete_all_entries() throws Throwable { + describe("Delete all entries from the table"); + S3GuardTableAccess tableAccess = new S3GuardTableAccess(ddbms); + ExpressionSpecBuilder builder = new ExpressionSpecBuilder(); + final String path = "/test/"; + builder.withCondition( + ExpressionSpecBuilder.S(PARENT).beginsWith(path)); + Iterable entries = + ddbms.wrapWithRetries(tableAccess.scanMetadata(builder)); + List list = new ArrayList<>(); + try { + entries.iterator().forEachRemaining(e -> { + Path p = e.getFileStatus().getPath(); + LOG.info("Deleting {}", p); + list.add(p); + }); + } catch (WrappedIOException e) { + // the iterator may have overloaded; swallow if so. + if (!(e.getCause() instanceof AWSServiceThrottledException)) { + throw e; + } + } + // sending this in one by one for more efficient retries + for (Path p : list) { + ddbms.getInvoker() + .retry("delete", + path, + true, + () -> tableAccess.delete(p)); + } + } + /** * Execute a set of operations in parallel, collect throttling statistics * and return them. @@ -504,7 +579,8 @@ public ThrottleTracker execute(String operation, final ContractTestUtils.NanoTimer t = new ContractTestUtils.NanoTimer(); for (int j = 0; j < operationsPerThread; j++) { - if (tracker.isThrottlingDetected()) { + if (tracker.isThrottlingDetected() + || throttleExceptions.get() > 0) { outcome.skipped = true; return outcome; } @@ -539,13 +615,12 @@ public ThrottleTracker execute(String operation, LOG.info("Completed {} with {}", operation, tracker); LOG.info("time to execute: {} millis", elapsedMs); - for (Future future : futures) { - assertTrue("Future timed out", future.isDone()); - } + Assertions.assertThat(futures) + .describedAs("Futures of all tasks") + .allMatch(Future::isDone); tracker.probe(); - - if (expectThrottling) { - tracker.assertThrottlingDetected(); + if (expectThrottling() && tracker.probeThrottlingDetected()) { + recoverFromThrottling(); } for (Future future : futures) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ThrottleTracker.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ThrottleTracker.java index 5e33be8367..0dad1bf03d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ThrottleTracker.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ThrottleTracker.java @@ -18,7 +18,8 @@ package org.apache.hadoop.fs.s3a.s3guard; -import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Something to track throttles in DynamoDB metastores. @@ -34,13 +35,17 @@ */ class ThrottleTracker { + private static final Logger LOG = LoggerFactory.getLogger( + ThrottleTracker.class); private final DynamoDBMetadataStore ddbms; - private long writeThrottleEventOrig = 0; + private long writeThrottleEventOrig; - private long readThrottleEventOrig = 0; + private long readThrottleEventOrig; - private long batchWriteThrottleCountOrig = 0; + private long batchWriteThrottleCountOrig; + + private long scanThrottleCountOrig; private long readThrottles; @@ -48,6 +53,8 @@ class ThrottleTracker { private long batchThrottles; + private long scanThrottles; + ThrottleTracker(final DynamoDBMetadataStore ddbms) { this.ddbms = ddbms; reset(); @@ -65,6 +72,9 @@ public synchronized void reset() { batchWriteThrottleCountOrig = ddbms.getBatchWriteCapacityExceededCount(); + + scanThrottleCountOrig + = ddbms.getScanThrottleEventCount(); } /** @@ -78,6 +88,8 @@ public synchronized boolean probe() { - writeThrottleEventOrig); setBatchThrottles(ddbms.getBatchWriteCapacityExceededCount() - batchWriteThrottleCountOrig); + setScanThrottles(ddbms.getScanThrottleEventCount() + - scanThrottleCountOrig); return isThrottlingDetected(); } @@ -85,27 +97,35 @@ public synchronized boolean probe() { public String toString() { return String.format( "Tracker with read throttle events = %d;" - + " write events = %d;" - + " batch throttles = %d", - getReadThrottles(), getWriteThrottles(), getBatchThrottles()); + + " write throttles = %d;" + + " batch throttles = %d;" + + " scan throttles = %d", + getReadThrottles(), getWriteThrottles(), getBatchThrottles(), + getScanThrottles()); } /** - * Assert that throttling has been detected. + * Check that throttling was detected; Warn if not. + * @return true if throttling took place. */ - public void assertThrottlingDetected() { - Assert.assertTrue("No throttling detected in " + this + - " against " + ddbms.toString(), - isThrottlingDetected()); + public boolean probeThrottlingDetected() { + if (!isThrottlingDetected()) { + LOG.warn("No throttling detected in {} against {}", + this, ddbms); + return false; + } + return true; } /** * Has there been any throttling on an operation? - * @return true iff read, write or batch operations were throttled. + * @return true if any operations were throttled. */ public boolean isThrottlingDetected() { - return getReadThrottles() > 0 || getWriteThrottles() - > 0 || getBatchThrottles() > 0; + return getReadThrottles() > 0 + || getWriteThrottles() > 0 + || getBatchThrottles() > 0 + || getScanThrottles() > 0; } public long getReadThrottles() { @@ -131,4 +151,12 @@ public long getBatchThrottles() { public void setBatchThrottles(long batchThrottles) { this.batchThrottles = batchThrottles; } + + public long getScanThrottles() { + return scanThrottles; + } + + public void setScanThrottles(final long scanThrottles) { + this.scanThrottles = scanThrottles; + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ILoadTestS3ABulkDeleteThrottling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ILoadTestS3ABulkDeleteThrottling.java new file mode 100644 index 0000000000..a1d5c46159 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ILoadTestS3ABulkDeleteThrottling.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; + +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.assertj.core.api.Assertions; +import org.junit.Assume; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.MethodSorters; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.impl.FunctionsRaisingIOE; +import org.apache.hadoop.fs.impl.WrappedIOException; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.auth.delegation.Csvout; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING; +import static org.apache.hadoop.fs.s3a.Constants.BULK_DELETE_PAGE_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.BULK_DELETE_PAGE_SIZE_DEFAULT; +import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; +import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.MAX_ENTRIES_TO_DELETE; + +/** + * Test some scalable operations related to file renaming and deletion. + * Much of the setup code is lifted from ILoadTestSessionCredentials; + * whereas that was designed to overload an STS endpoint, this just + * tries to overload a single S3 shard with too many bulk IO requests + * -and so see what happens. + * Note: UA field includes the configuration tested for the benefit + * of anyone looking through the server logs. + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +@RunWith(Parameterized.class) +public class ILoadTestS3ABulkDeleteThrottling extends S3AScaleTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(ILoadTestS3ABulkDeleteThrottling.class); + + protected static final int THREADS = 20; + public static final int TOTAL_KEYS = 25000; + + public static final int SMALL = BULK_DELETE_PAGE_SIZE_DEFAULT; + public static final int SMALL_REQS = TOTAL_KEYS / SMALL; + + public static final int MAXIMUM = MAX_ENTRIES_TO_DELETE; + public static final int MAXIMUM_REQS = TOTAL_KEYS / MAXIMUM; + + // shared across test cases. + @SuppressWarnings("StaticNonFinalField") + private static boolean testWasThrottled; + + private final ExecutorService executor = + HadoopExecutors.newFixedThreadPool( + THREADS, + new ThreadFactoryBuilder() + .setNameFormat("#%d") + .build()); + + private final CompletionService + completionService = new ExecutorCompletionService<>(executor); + + private File dataDir; + + private final boolean throttle; + private final int pageSize; + private final int requests; + + /** + * Test array for parameterized test runs. + *
    + *
  • AWS client throttle on/off
  • + *
  • Page size
  • + *
+ * + * @return a list of parameter tuples. + */ + @Parameterized.Parameters( + name = "bulk-delete-aws-retry={0}-requests={2}-size={1}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {false, SMALL, SMALL_REQS}, + {false, MAXIMUM, MAXIMUM_REQS}, + {true, SMALL, SMALL_REQS}, + {true, MAXIMUM, MAXIMUM_REQS}, + }); + } + + /** + * Parameterized constructor. + * @param throttle AWS client throttle on/off + * @param pageSize Page size + * @param requests request count; + */ + public ILoadTestS3ABulkDeleteThrottling( + final boolean throttle, + final int pageSize, + final int requests) { + this.throttle = throttle; + Preconditions.checkArgument(pageSize > 0, + "page size too low %s", pageSize); + + this.pageSize = pageSize; + this.requests = requests; + } + + @Override + protected Configuration createScaleConfiguration() { + Configuration conf = super.createScaleConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + return conf; + } + + @Override + public void setup() throws Exception { + final Configuration conf = getConf(); + S3ATestUtils.removeBaseAndBucketOverrides(conf, + EXPERIMENTAL_AWS_INTERNAL_THROTTLING, + BULK_DELETE_PAGE_SIZE, + USER_AGENT_PREFIX); + conf.setBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING, throttle); + Assertions.assertThat(pageSize) + .describedAs("page size") + .isGreaterThan(0); + conf.setInt(BULK_DELETE_PAGE_SIZE, pageSize); + conf.set(USER_AGENT_PREFIX, + String.format("ILoadTestS3ABulkDeleteThrottling-%s-%04d", + throttle, pageSize)); + + super.setup(); + Assume.assumeTrue("multipart delete disabled", + conf.getBoolean(ENABLE_MULTI_DELETE, true)); + dataDir = GenericTestUtils.getTestDir("throttling"); + dataDir.mkdirs(); + final String size = getFileSystem().getConf().get(BULK_DELETE_PAGE_SIZE); + Assertions.assertThat(size) + .describedAs("page size") + .isNotEmpty(); + Assertions.assertThat(getFileSystem().getConf() + .getInt(BULK_DELETE_PAGE_SIZE, -1)) + .isEqualTo(pageSize); + + } + + @Test + public void test_010_Reset() throws Throwable { + testWasThrottled = false; + } + + @Test + public void test_020_DeleteThrottling() throws Throwable { + describe("test how S3 reacts to massive multipart deletion requests"); + final File results = deleteFiles(requests, pageSize); + LOG.info("Test run completed against {}:\n see {}", getFileSystem(), + results); + if (testWasThrottled) { + LOG.warn("Test was throttled"); + } else { + LOG.info("No throttling recorded in filesystem"); + } + } + + @Test + public void test_030_Sleep() throws Throwable { + maybeSleep(); + } + + private void maybeSleep() throws InterruptedException, IOException { + if (testWasThrottled) { + LOG.info("Sleeping briefly to let store recover"); + Thread.sleep(30_000); + getFileSystem().delete(path("recovery"), true); + testWasThrottled = false; + } + } + + /** + * delete files. + * @param requestCount number of requests. + * @throws Exception failure + * @return CSV filename + */ + private File deleteFiles(final int requestCount, + final int entries) + throws Exception { + File csvFile = new File(dataDir, + String.format("delete-%03d-%04d-%s.csv", + requestCount, entries, throttle)); + describe("Issuing %d requests of size %d, saving log to %s", + requestCount, entries, csvFile); + Path basePath = path("testDeleteObjectThrottling"); + final S3AFileSystem fs = getFileSystem(); + final String base = fs.pathToKey(basePath); + final List fileList + = buildDeleteRequest(base, entries); + final FileWriter out = new FileWriter(csvFile); + Csvout csvout = new Csvout(out, "\t", "\n"); + Outcome.writeSchema(csvout); + + final ContractTestUtils.NanoTimer jobTimer = + new ContractTestUtils.NanoTimer(); + + for (int i = 0; i < requestCount; i++) { + final int id = i; + completionService.submit(() -> { + final long startTime = System.currentTimeMillis(); + Thread.currentThread().setName("#" + id); + LOG.info("Issuing request {}", id); + final ContractTestUtils.NanoTimer timer = + new ContractTestUtils.NanoTimer(); + Exception ex = null; + try { + fs.removeKeys(fileList, false, null); + } catch (IOException e) { + ex = e; + } + timer.end("Request " + id); + return new Outcome(id, startTime, timer, + ex); + }); + } + + NanoTimerStats stats = new NanoTimerStats("Overall"); + NanoTimerStats success = new NanoTimerStats("Successful"); + NanoTimerStats throttled = new NanoTimerStats("Throttled"); + List throttledEvents = new ArrayList<>(); + for (int i = 0; i < requestCount; i++) { + Outcome outcome = completionService.take().get(); + ContractTestUtils.NanoTimer timer = outcome.timer; + Exception ex = outcome.exception; + outcome.writeln(csvout); + stats.add(timer); + if (ex != null) { + // throttling event occurred. + LOG.info("Throttled at event {}", i, ex); + throttled.add(timer); + throttledEvents.add(outcome); + } else { + success.add(timer); + } + } + + csvout.close(); + + jobTimer.end("Execution of operations"); + // now print the stats + LOG.info("Summary file is " + csvFile); + LOG.info("Made {} requests with {} throttle events\n: {}\n{}\n{}", + requestCount, + throttled.getCount(), + stats, + throttled, + success); + + double duration = jobTimer.duration(); + double iops = requestCount * entries * 1.0e9 / duration; + LOG.info(String.format("TPS %3f operations/second", + iops)); + // log at debug + if (LOG.isDebugEnabled()) { + throttledEvents.forEach((outcome -> { + LOG.debug("{}: duration: {}", + outcome.id, outcome.timer.elapsedTimeMs()); + })); + } + return csvFile; + } + + + private List buildDeleteRequest( + String base, int count) { + List request = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + request.add(new DeleteObjectsRequest.KeyVersion( + String.format("%s/file-%04d", base, i))); + } + return request; + } + + + private R wrap(FunctionsRaisingIOE.CallableRaisingIOE callable) { + try { + return callable.apply(); + } catch (IOException e) { + throw new WrappedIOException(e); + } + } + + /** + * Outcome of one of the load operations. + */ + private static class Outcome { + + private final int id; + + private final long startTime; + + private final ContractTestUtils.NanoTimer timer; + + private final Exception exception; + + Outcome(final int id, + final long startTime, + final ContractTestUtils.NanoTimer timer, + final Exception exception) { + this.id = id; + this.startTime = startTime; + this.timer = timer; + this.exception = exception; + } + + /** + * Write this record. + * @param out the csvout to write through. + * @return the csvout instance + * @throws IOException IO failure. + */ + public Csvout writeln(Csvout out) throws IOException { + return out.write( + id, + startTime, + exception == null ? 1 : 0, + timer.getStartTime(), + timer.getEndTime(), + timer.duration(), + '"' + (exception == null ? "" : exception.getMessage()) + '"') + .newline(); + } + + /** + * Write the schema of the outcome records. + * @param out CSV destinatin + * @throws IOException IO failure. + */ + public static void writeSchema(Csvout out) throws IOException { + out.write("id", "starttime", "success", "started", "ended", + "duration", "error").newline(); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java index bc3224a6c2..efaec5f4fa 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java @@ -18,27 +18,58 @@ package org.apache.hadoop.fs.s3a.scale; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes; +import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.util.DurationInfo; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.contract.ContractTestUtils.rm; +import static org.apache.hadoop.fs.s3a.Constants.BULK_DELETE_PAGE_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING; +import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.createFiles; +import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.filenameOfIndex; /** * Test some scalable operations related to file renaming and deletion. + * We set a bulk page size low enough that even the default test scale will + * issue multiple delete requests during a delete sequence -so test that + * operation more efficiently. */ public class ITestS3ADeleteManyFiles extends S3AScaleTestBase { + private static final Logger LOG = LoggerFactory.getLogger(ITestS3ADeleteManyFiles.class); - public static final String PREFIX = ITestPartialRenamesDeletes.PREFIX; + /** + * Delete Page size: {@value}. + */ + static final int DELETE_PAGE_SIZE = 50; + + + @Override + protected Configuration createScaleConfiguration() { + Configuration conf = super.createScaleConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + S3ATestUtils.removeBaseAndBucketOverrides(conf, + EXPERIMENTAL_AWS_INTERNAL_THROTTLING, + BULK_DELETE_PAGE_SIZE, + USER_AGENT_PREFIX); + conf.setBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING, false); + conf.setInt(BULK_DELETE_PAGE_SIZE, DELETE_PAGE_SIZE); + return conf; + } /** * CAUTION: If this test starts failing, please make sure that the @@ -46,33 +77,37 @@ public class ITestS3ADeleteManyFiles extends S3AScaleTestBase { * set too low. Alternatively, consider reducing the * scale.test.operation.count parameter in * getOperationCount(). - * + * If it is slow: look at the size of any S3Guard Table used. * @see #getOperationCount() */ @Test public void testBulkRenameAndDelete() throws Throwable { - final Path scaleTestDir = path("testBulkRenameAndDelete"); - final Path srcDir = new Path(scaleTestDir, "src"); - final Path finalDir = new Path(scaleTestDir, "final"); final int count = getConf().getInt(KEY_FILE_COUNT, DEFAULT_FILE_COUNT); + describe("Testing bulk rename and delete of %d files", count); + + final Path scaleTestDir = path("testBulkRenameAndDelete"); + final Path srcParentDir = new Path(scaleTestDir, "srcParent"); + final Path srcDir = new Path(srcParentDir, "src"); + final Path finalParentDir = new Path(scaleTestDir, "finalParent"); + final Path finalDir = new Path(finalParentDir, "final"); final S3AFileSystem fs = getFileSystem(); - ContractTestUtils.rm(fs, scaleTestDir, true, false); + rm(fs, scaleTestDir, true, false); fs.mkdirs(srcDir); + fs.mkdirs(finalParentDir); createFiles(fs, srcDir, 1, count, 0); FileStatus[] statuses = fs.listStatus(srcDir); int nSrcFiles = statuses.length; - long sourceSize = 0; - for (FileStatus status : statuses) { - sourceSize += status.getLen(); - } + long sourceSize = Arrays.stream(statuses) + .mapToLong(FileStatus::getLen) + .sum(); assertEquals("Source file Count", count, nSrcFiles); ContractTestUtils.NanoTimer renameTimer = new ContractTestUtils.NanoTimer(); try (DurationInfo ignored = new DurationInfo(LOG, "Rename %s to %s", srcDir, finalDir)) { - assertTrue("Rename failed", fs.rename(srcDir, finalDir)); + rename(srcDir, finalDir); } renameTimer.end(); LOG.info("Effective rename bandwidth {} MB/s", @@ -80,19 +115,31 @@ public void testBulkRenameAndDelete() throws Throwable { LOG.info(String.format( "Time to rename a file: %,03f milliseconds", (renameTimer.nanosPerOperation(count) * 1.0f) / 1.0e6)); - assertEquals(nSrcFiles, fs.listStatus(finalDir).length); - ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", - new Path(srcDir, PREFIX + 0)); - ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", - new Path(srcDir, PREFIX + count / 2)); - ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", - new Path(srcDir, PREFIX + (count - 1))); - ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", - new Path(finalDir, PREFIX + 0)); - ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", - new Path(finalDir, PREFIX + count/2)); - ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", - new Path(finalDir, PREFIX + (count-1))); + Assertions.assertThat(lsR(fs, srcParentDir, true)) + .describedAs("Recursive listing of source dir %s", srcParentDir) + .isEqualTo(0); + + assertPathDoesNotExist("not deleted after rename", + new Path(srcDir, filenameOfIndex(0))); + assertPathDoesNotExist("not deleted after rename", + new Path(srcDir, filenameOfIndex(count / 2))); + assertPathDoesNotExist("not deleted after rename", + new Path(srcDir, filenameOfIndex(count - 1))); + + // audit destination + Assertions.assertThat(lsR(fs, finalDir, true)) + .describedAs("size of recursive destination listFiles(%s)", finalDir) + .isEqualTo(count); + Assertions.assertThat(fs.listStatus(finalDir)) + .describedAs("size of destination listStatus(%s)", finalDir) + .hasSize(count); + + assertPathExists("not renamed to dest dir", + new Path(finalDir, filenameOfIndex(0))); + assertPathExists("not renamed to dest dir", + new Path(finalDir, filenameOfIndex(count / 2))); + assertPathExists("not renamed to dest dir", + new Path(finalDir, filenameOfIndex(count - 1))); ContractTestUtils.NanoTimer deleteTimer = new ContractTestUtils.NanoTimer(); @@ -104,6 +151,11 @@ public void testBulkRenameAndDelete() throws Throwable { LOG.info(String.format( "Time to delete an object %,03f milliseconds", (deleteTimer.nanosPerOperation(count) * 1.0f) / 1.0e6)); + Assertions.assertThat(lsR(fs, finalParentDir, true)) + .describedAs("Recursive listing of deleted rename destination %s", + finalParentDir) + .isEqualTo(0); + } }