HADOOP-16823. Large DeleteObject requests are their own Thundering Herd.

Contributed by Steve Loughran.

During S3A rename() and delete() calls, the list of objects delete is
built up into batches of a thousand and then POSTed in a single large
DeleteObjects request.

But as the IO capacity allowed on an S3 partition may only be 3500 writes
per second *and* each entry in that POST counts as a single write, then
one of those posts alone can trigger throttling on an already loaded
S3 directory tree. Which can trigger backoff and retry, with the same
thousand entry post, and so recreate the exact same problem.

Fixes

* Page size for delete object requests is set in
  fs.s3a.bulk.delete.page.size; the default is 250.
* The property fs.s3a.experimental.aws.s3.throttling (default=true)
  can be set to false to disable throttle retry logic in the AWS
  client SDK -it is all handled in the S3A client. This
  gives more visibility in to when operations are being throttled
* Bulk delete throttling events are logged to the log
  org.apache.hadoop.fs.s3a.throttled log at INFO; if this appears
  often then choose a smaller page size.
* The metric "store_io_throttled" adds the entire count of delete
  requests when a single DeleteObjects request is throttled.
* A new quantile, "store_io_throttle_rate" can track throttling
  load over time.
* DynamoDB metastore throttle resilience issues have also been
  identified and fixed. Note: the fs.s3a.experimental.aws.s3.throttling
  flag does not apply to DDB IO precisely because there may still be
  lurking issues there and it safest to rely on the DynamoDB client
  SDK.

Change-Id: I00f85cdd94fc008864d060533f6bd4870263fd84
This commit is contained in:
Steve Loughran 2020-02-13 19:08:11 +00:00
parent da99ac7e93
commit 56dee66770
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
27 changed files with 1239 additions and 167 deletions

View File

@ -1698,7 +1698,7 @@
<property>
<name>fs.s3a.retry.throttle.limit</name>
<value>${fs.s3a.attempts.maximum}</value>
<value>20</value>
<description>
Number of times to retry any throttled request.
</description>
@ -1706,9 +1706,12 @@
<property>
<name>fs.s3a.retry.throttle.interval</name>
<value>1000ms</value>
<value>100ms</value>
<description>
Interval between retry attempts on throttled requests.
Initial between retry attempts on throttled requests, +/- 50%. chosen at random.
i.e. for an intial value of 3000ms, the initial delay would be in the range 1500ms to 4500ms.
Backoffs are exponential; again randomness is used to avoid the thundering heard problem.
500ms is the default value used by the AWS S3 Retry policy.
</description>
</property>

View File

@ -204,9 +204,7 @@ public void testRenameWithNonEmptySubDir() throws Throwable {
assertPathExists("not created in src/sub dir",
new Path(srcSubDir, "subfile.txt"));
boolean rename = fs.rename(srcDir, finalDir);
assertTrue("rename(" + srcDir + ", " + finalDir + ") failed",
rename);
rename(srcDir, finalDir);
// Accept both POSIX rename behavior and CLI rename behavior
if (renameRemoveEmptyDest) {

View File

@ -418,8 +418,9 @@ public static boolean rm(FileSystem fileSystem,
public static void rename(FileSystem fileSystem, Path src, Path dst)
throws IOException {
rejectRootOperation(src, false);
assertTrue(fileSystem.rename(src, dst));
assertPathDoesNotExist(fileSystem, "renamed", src);
assertTrue("rename(" + src + ", " + dst + ") failed",
fileSystem.rename(src, dst));
assertPathDoesNotExist(fileSystem, "renamed source dir", src);
}
/**

View File

@ -174,10 +174,42 @@ private Constants() {
public static final String PROXY_DOMAIN = "fs.s3a.proxy.domain";
public static final String PROXY_WORKSTATION = "fs.s3a.proxy.workstation";
// number of times we should retry errors
/**
* Number of times the AWS client library should retry errors before
* escalating to the S3A code: {@value}.
*/
public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
/**
* Default number of times the AWS client library should retry errors before
* escalating to the S3A code: {@value}.
*/
public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
/**
* Experimental/Unstable feature: should the AWS client library retry
* throttle responses before escalating to the S3A code: {@value}.
*
* When set to false, the S3A connector sees all S3 throttle events,
* And so can update it counters and the metrics, and use its own retry
* policy.
* However, this may have adverse effects on some operations where the S3A
* code cannot retry as efficiently as the AWS client library.
*
* This only applies to S3 operations, not to DynamoDB or other services.
*/
@InterfaceStability.Unstable
public static final String EXPERIMENTAL_AWS_INTERNAL_THROTTLING =
"fs.s3a.experimental.aws.s3.throttling";
/**
* Default value of {@link #EXPERIMENTAL_AWS_INTERNAL_THROTTLING},
* value: {@value}.
*/
@InterfaceStability.Unstable
public static final boolean EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT =
true;
// seconds until we give up trying to establish a connection to s3
public static final String ESTABLISH_TIMEOUT =
"fs.s3a.connection.establish.timeout";
@ -225,6 +257,33 @@ private Constants() {
public static final String ENABLE_MULTI_DELETE =
"fs.s3a.multiobjectdelete.enable";
/**
* Number of objects to delete in a single multi-object delete {@value}.
* Max: 1000.
*
* A bigger value it means fewer POST requests when deleting a directory
* tree with many objects.
* However, as you are limited to only a a few thousand requests per
* second against a single partition of an S3 bucket,
* a large page size can easily overload the bucket and so trigger
* throttling.
*
* Furthermore, as the reaction to this request is being throttled
* is simply to retry it -it can take a while for the situation to go away.
* While a large value may give better numbers on tests and benchmarks
* where only a single operations being executed, once multiple
* applications start working with the same bucket these large
* deletes can be highly disruptive.
*/
public static final String BULK_DELETE_PAGE_SIZE =
"fs.s3a.bulk.delete.page.size";
/**
* Default Number of objects to delete in a single multi-object
* delete: {@value}.
*/
public static final int BULK_DELETE_PAGE_SIZE_DEFAULT = 250;
// comma separated list of directories
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
@ -733,8 +792,7 @@ private Constants() {
/**
* Default throttled retry limit: {@value}.
*/
public static final int RETRY_THROTTLE_LIMIT_DEFAULT =
DEFAULT_MAX_ERROR_RETRIES;
public static final int RETRY_THROTTLE_LIMIT_DEFAULT = 20;
/**
* Interval between retry attempts on throttled requests: {@value}.

View File

@ -34,7 +34,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS;
/**
@ -56,7 +58,17 @@ public AmazonS3 createS3Client(URI name,
final String userAgentSuffix) throws IOException {
Configuration conf = getConf();
final ClientConfiguration awsConf = S3AUtils
.createAwsConf(getConf(), bucket, Constants.AWS_SERVICE_IDENTIFIER_S3);
.createAwsConf(conf, bucket, Constants.AWS_SERVICE_IDENTIFIER_S3);
// When EXPERIMENTAL_AWS_INTERNAL_THROTTLING is false
// throttling is explicitly disabled on the S3 client so that
// all failures are collected in S3A instrumentation, and its
// retry policy is the only one used.
// This may cause problems in copy/rename.
awsConf.setUseThrottleRetries(
conf.getBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING,
EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT));
if (!StringUtils.isEmpty(userAgentSuffix)) {
awsConf.setUserAgentSuffix(userAgentSuffix);
}

View File

@ -99,6 +99,7 @@
import org.apache.hadoop.fs.s3a.auth.SignerManager;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
@ -170,6 +171,8 @@
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@ -273,6 +276,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private ITtlTimeProvider ttlTimeProvider;
/**
* Page size for deletions.
*/
private int pageSize;
/**
* Specific operations used by rename and delete operations.
*/
@ -440,6 +448,9 @@ public void initialize(URI name, Configuration originalConf)
}
initMultipartUploads(conf);
pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
stopAllServices();
@ -1388,7 +1399,8 @@ private long innerRename(Path source, Path dest)
createStoreContext(),
src, srcKey, p.getLeft(),
dst, dstKey, p.getRight(),
operationCallbacks);
operationCallbacks,
pageSize);
return renameOperation.execute();
}
@ -1648,10 +1660,11 @@ protected void incrementGauge(Statistic statistic, long count) {
* @param ex exception.
*/
public void operationRetried(Exception ex) {
Statistic stat = isThrottleException(ex)
? STORE_IO_THROTTLED
: IGNORED_ERRORS;
incrementStatistic(stat);
if (isThrottleException(ex)) {
operationThrottled(false);
} else {
incrementStatistic(IGNORED_ERRORS);
}
}
/**
@ -1684,11 +1697,28 @@ public void operationRetried(
public void metastoreOperationRetried(Exception ex,
int retries,
boolean idempotent) {
operationRetried(ex);
incrementStatistic(S3GUARD_METADATASTORE_RETRY);
if (isThrottleException(ex)) {
operationThrottled(true);
} else {
incrementStatistic(IGNORED_ERRORS);
}
}
/**
* Note that an operation was throttled -this will update
* specific counters/metrics.
* @param metastore was the throttling observed in the S3Guard metastore?
*/
private void operationThrottled(boolean metastore) {
LOG.debug("Request throttled on {}", metastore ? "S3": "DynamoDB");
if (metastore) {
incrementStatistic(S3GUARD_METADATASTORE_THROTTLED);
instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, 1);
instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE,
1);
} else {
incrementStatistic(STORE_IO_THROTTLED);
instrumentation.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
}
}
@ -1917,6 +1947,13 @@ private void blockRootDelete(String key) throws InvalidRequestException {
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
* operation statistics.
* Retry policy: retry untranslated; delete considered idempotent.
* If the request is throttled, this is logged in the throttle statistics,
* with the counter set to the number of keys, rather than the number
* of invocations of the delete operation.
* This is because S3 considers each key as one mutating operation on
* the store when updating its load counters on a specific partition
* of an S3 bucket.
* If only the request was measured, this operation would under-report.
* @param deleteRequest keys to delete on the s3-backend
* @return the AWS response
* @throws MultiObjectDeleteException one or more of the keys could not
@ -1927,17 +1964,24 @@ private void blockRootDelete(String key) throws InvalidRequestException {
private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
throws MultiObjectDeleteException, AmazonClientException, IOException {
incrementWriteOperations();
BulkDeleteRetryHandler retryHandler =
new BulkDeleteRetryHandler(createStoreContext());
try(DurationInfo ignored =
new DurationInfo(LOG, false, "DELETE %d keys",
deleteRequest.getKeys().size())) {
return invoker.retryUntranslated("delete",
DELETE_CONSIDERED_IDEMPOTENT,
(text, e, r, i) -> {
// handle the failure
retryHandler.bulkDeleteRetried(deleteRequest, e);
},
() -> {
incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
return s3.deleteObjects(deleteRequest);
});
} catch (MultiObjectDeleteException e) {
// one or more of the operations failed.
// one or more of the keys could not be deleted.
// log and rethrow
List<MultiObjectDeleteException.DeleteError> errors = e.getErrors();
LOG.debug("Partial failure of delete, {} errors", errors.size(), e);
for (MultiObjectDeleteException.DeleteError error : errors) {
@ -2254,7 +2298,7 @@ private void noteDeleted(final int count, final boolean deleteFakeDir) {
*/
@VisibleForTesting
@Retries.RetryMixed
void removeKeys(
public void removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final boolean deleteFakeDir,
final BulkOperationState operationState)
@ -2349,7 +2393,7 @@ public boolean delete(Path f, boolean recursive) throws IOException {
innerGetFileStatus(f, true, StatusProbeEnum.ALL),
recursive,
operationCallbacks,
InternalConstants.MAX_ENTRIES_TO_DELETE);
pageSize);
boolean outcome = deleteOperation.execute();
if (outcome) {
try {
@ -2830,7 +2874,7 @@ S3AFileStatus innerGetFileStatus(final Path f,
S3AFileStatus s3GetFileStatus(final Path path,
final String key,
final Set<StatusProbeEnum> probes,
final Set<Path> tombstones) throws IOException {
@Nullable Set<Path> tombstones) throws IOException {
if (!key.isEmpty()) {
if (probes.contains(StatusProbeEnum.Head) && !key.endsWith("/")) {
try {
@ -3515,7 +3559,14 @@ void finishedWrite(String key, long length, String eTag, String versionId,
key, length, eTag, versionId);
Path p = keyToQualifiedPath(key);
Preconditions.checkArgument(length >= 0, "content length is negative");
deleteUnnecessaryFakeDirectories(p.getParent());
final boolean isDir = objectRepresentsDirectory(key, length);
// kick off an async delete
final CompletableFuture<?> deletion = submit(
unboundedThreadPool,
() -> {
deleteUnnecessaryFakeDirectories(p.getParent());
return null;
});
// this is only set if there is a metastore to update and the
// operationState parameter passed in was null.
BulkOperationState stateToClose = null;
@ -3529,12 +3580,13 @@ void finishedWrite(String key, long length, String eTag, String versionId,
// information gleaned from addAncestors is preserved into the
// subsequent put.
stateToClose = S3Guard.initiateBulkWrite(metadataStore,
BulkOperationState.OperationType.Mkdir,
isDir
? BulkOperationState.OperationType.Mkdir
: BulkOperationState.OperationType.Put,
keyToPath(key));
activeState = stateToClose;
}
S3Guard.addAncestors(metadataStore, p, ttlTimeProvider, activeState);
final boolean isDir = objectRepresentsDirectory(key, length);
S3AFileStatus status = createUploadFileStatus(p,
isDir, length,
getDefaultBlockSize(p), username, eTag, versionId);
@ -3557,6 +3609,8 @@ void finishedWrite(String key, long length, String eTag, String versionId,
activeState);
}
}
// and catch up with any delete operation.
waitForCompletionIgnoringExceptions(deletion);
} catch (IOException e) {
if (failOnMetadataWriteError) {
throw new MetadataPersistenceException(p.toString(), e);

View File

@ -123,6 +123,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
private final MutableCounterLong ignoredErrors;
private final MutableQuantiles putLatencyQuantile;
private final MutableQuantiles throttleRateQuantile;
private final MutableQuantiles s3GuardThrottleRateQuantile;
private final MutableCounterLong numberOfFilesCreated;
private final MutableCounterLong numberOfFilesCopied;
private final MutableCounterLong bytesOfFilesCopied;
@ -248,7 +249,9 @@ public S3AInstrumentation(URI name) {
int interval = 1;
putLatencyQuantile = quantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY,
"ops", "latency", interval);
throttleRateQuantile = quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE,
s3GuardThrottleRateQuantile = quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE,
"events", "frequency (Hz)", interval);
throttleRateQuantile = quantiles(STORE_IO_THROTTLE_RATE,
"events", "frequency (Hz)", interval);
registerAsMetricsSource(name);
@ -617,6 +620,7 @@ public void close() {
// task in a shared thread pool.
putLatencyQuantile.stop();
throttleRateQuantile.stop();
s3GuardThrottleRateQuantile.stop();
metricsSystem.unregisterSource(metricsSourceName);
int activeSources = --metricsSourceActiveCounter;
if (activeSources == 0) {

View File

@ -230,6 +230,8 @@ public enum Statistic {
"S3Guard metadata store authoritative directories updated from S3"),
STORE_IO_THROTTLED("store_io_throttled", "Requests throttled and retried"),
STORE_IO_THROTTLE_RATE("store_io_throttle_rate",
"Rate of S3 request throttling"),
DELEGATION_TOKENS_ISSUED("delegation_tokens_issued",
"Number of delegation tokens issued");

View File

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.util.List;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.s3a.AWSClientIOException;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.Statistic;
import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
import static org.apache.hadoop.fs.s3a.Statistic.IGNORED_ERRORS;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLED;
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLE_RATE;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.THROTTLE_LOG_NAME;
/**
* Handler for bulk delete retry events.
*/
public class BulkDeleteRetryHandler extends AbstractStoreOperation {
private static final Logger LOG = LoggerFactory.getLogger(
BulkDeleteRetryHandler.class);
private static final Logger THROTTLE_LOG = LoggerFactory.getLogger(
THROTTLE_LOG_NAME);
/**
* This is an error string we see in exceptions when the XML parser
* failed: {@value}.
*/
public static final String XML_PARSE_BROKEN = "Failed to parse XML document";
private final S3AInstrumentation instrumentation;
private final S3AStorageStatistics storageStatistics;
/**
* Constructor.
* @param storeContext context
*/
public BulkDeleteRetryHandler(final StoreContext storeContext) {
super(storeContext);
instrumentation = storeContext.getInstrumentation();
storageStatistics = storeContext.getStorageStatistics();
}
/**
* Increment a statistic by 1.
* This increments both the instrumentation and storage statistics.
* @param statistic The operation to increment
*/
protected void incrementStatistic(Statistic statistic) {
incrementStatistic(statistic, 1);
}
/**
* Increment a statistic by a specific value.
* This increments both the instrumentation and storage statistics.
* @param statistic The operation to increment
* @param count the count to increment
*/
protected void incrementStatistic(Statistic statistic, long count) {
instrumentation.incrementCounter(statistic, count);
storageStatistics.incrementCounter(statistic, count);
}
/**
* Handler for failure of bulk delete requests.
* @param deleteRequest request which was retried.
* @param ex exception
*/
public void bulkDeleteRetried(
DeleteObjectsRequest deleteRequest,
Exception ex) {
LOG.debug("Retrying on error during bulk delete", ex);
if (isThrottleException(ex)) {
onDeleteThrottled(deleteRequest);
} else if (isSymptomOfBrokenConnection(ex)) {
// this is one which surfaces when an HTTPS connection is broken while
// the service is reading the result.
// it is treated as a throttle event for statistics
LOG.warn("Bulk delete operation interrupted: {}", ex.getMessage());
onDeleteThrottled(deleteRequest);
} else {
incrementStatistic(IGNORED_ERRORS);
}
}
/**
* Handle a delete throttling event.
* @param deleteRequest request which failed.
*/
private void onDeleteThrottled(final DeleteObjectsRequest deleteRequest) {
final List<DeleteObjectsRequest.KeyVersion> keys = deleteRequest.getKeys();
final int size = keys.size();
incrementStatistic(STORE_IO_THROTTLED, size);
instrumentation.addValueToQuantiles(STORE_IO_THROTTLE_RATE, size);
THROTTLE_LOG.info(
"Bulk delete {} keys throttled -first key = {}; last = {}",
size,
keys.get(0).getKey(),
keys.get(size - 1).getKey());
}
/**
* Does this error indicate that the connection was ultimately broken while
* the XML Response was parsed? As this seems a symptom of the far end
* blocking the response (i.e. server-side throttling) while
* the client eventually times out.
* @param ex exception received.
* @return true if this exception is considered a sign of a broken connection.
*/
private boolean isSymptomOfBrokenConnection(final Exception ex) {
return ex instanceof AWSClientIOException
&& ex.getCause() instanceof SdkClientException
&& ex.getMessage().contains(XML_PARSE_BROKEN);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a.impl;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
@ -123,4 +124,34 @@ public static <T> void waitForCompletion(
}
}
/**
* Wait for a single of future to complete, ignoring exceptions raised.
* @param future future to wait for.
*/
public static <T> void waitForCompletionIgnoringExceptions(
@Nullable final CompletableFuture<T> future) {
if (future != null) {
try (DurationInfo ignore =
new DurationInfo(LOG, false, "Waiting for task completion")) {
future.join();
} catch (Exception e) {
LOG.debug("Ignoring exception raised in task completion: ");
}
}
}
/**
* Block awaiting completion for any non-null future passed in;
* No-op if a null arg was supplied.
* @param future future
* @throws IOException if one of the called futures raised an IOE.
* @throws RuntimeException if one of the futures raised one.
*/
public static void maybeAwaitCompletion(
@Nullable final CompletableFuture<Void> future)
throws IOException {
if (future != null) {
waitForCompletion(future);
}
}
}

View File

@ -45,8 +45,8 @@
import org.apache.hadoop.util.DurationInfo;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.maybeAwaitCompletion;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
/**
* Implementation of the delete() operation.
@ -186,7 +186,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
* @param status pre-fetched source status
* @param recursive recursive delete?
* @param callbacks callback provider
* @param pageSize number of entries in a page
* @param pageSize size of delete pages
*/
public DeleteOperation(final StoreContext context,
final S3AFileStatus status,
@ -200,7 +200,7 @@ public DeleteOperation(final StoreContext context,
this.callbacks = callbacks;
checkArgument(pageSize > 0
&& pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
"page size out of range: %d", pageSize);
"page size out of range: %s", pageSize);
this.pageSize = pageSize;
metadataStore = context.getMetadataStore();
executor = context.createThrottledExecutor(1);
@ -557,22 +557,5 @@ private void asyncDeleteAction(
}
}
/**
* Block awaiting completion for any non-null future passed in;
* No-op if a null arg was supplied.
* @param future future
* @throws IOException if one of the called futures raised an IOE.
* @throws RuntimeException if one of the futures raised one.
*/
private void maybeAwaitCompletion(
@Nullable final CompletableFuture<Void> future)
throws IOException {
if (future != null) {
try (DurationInfo ignored =
new DurationInfo(LOG, false, "delete completion")) {
waitForCompletion(future);
}
}
}
}

View File

@ -85,4 +85,12 @@ private InternalConstants() {
/** 404 error code. */
public static final int SC_404 = 404;
/** Name of the log for throttling events. Value: {@value}. */
public static final String THROTTLE_LOG_NAME =
"org.apache.hadoop.fs.s3a.throttled";
/** Directory marker attribute: see HADOOP-16613. Value: {@value}. */
public static final String X_DIRECTORY =
"application/x-directory";
}

View File

@ -50,7 +50,6 @@
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_BLOCKSIZE;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.MAX_ENTRIES_TO_DELETE;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.RENAME_PARALLEL_LIMIT;
/**
@ -99,8 +98,14 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
/**
* Counter of bytes copied.
*/
private final AtomicLong bytesCopied = new AtomicLong();
/**
* Page size for bulk deletes.
*/
private final int pageSize;
/**
* Rename tracker.
*/
@ -137,6 +142,7 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
* @param destKey destination key
* @param destStatus destination status.
* @param callbacks callback provider
* @param pageSize size of delete requests
*/
public RenameOperation(
final StoreContext storeContext,
@ -146,7 +152,8 @@ public RenameOperation(
final Path destPath,
final String destKey,
final S3AFileStatus destStatus,
final OperationCallbacks callbacks) {
final OperationCallbacks callbacks,
final int pageSize) {
super(storeContext);
this.sourcePath = sourcePath;
this.sourceKey = sourceKey;
@ -157,6 +164,7 @@ public RenameOperation(
this.callbacks = callbacks;
blocksize = storeContext.getConfiguration()
.getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
this.pageSize = pageSize;
}
/**
@ -360,7 +368,7 @@ protected void recursiveDirectoryRename() throws IOException {
LOG.debug("Waiting for active copies to complete");
completeActiveCopies("batch threshold reached");
}
if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
if (keysToDelete.size() == pageSize) {
// finish ongoing copies then delete all queued keys.
// provided the parallel limit is a factor of the max entry
// constant, this will not need to block for the copy, and

View File

@ -439,8 +439,8 @@ private void dumpEntry(CsvFile csv, DDBPathMetadata md) {
private Pair<Long, Long> scanMetastore(CsvFile csv) {
S3GuardTableAccess tableAccess = new S3GuardTableAccess(getStore());
ExpressionSpecBuilder builder = new ExpressionSpecBuilder();
Iterable<DDBPathMetadata> results = tableAccess.scanMetadata(
builder);
Iterable<DDBPathMetadata> results =
getStore().wrapWithRetries(tableAccess.scanMetadata(builder));
long live = 0;
long tombstone = 0;
for (DDBPathMetadata md : results) {

View File

@ -80,6 +80,7 @@
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.impl.FunctionsRaisingIOE;
import org.apache.hadoop.fs.impl.WrappedIOException;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.AWSServiceThrottledException;
import org.apache.hadoop.fs.s3a.Constants;
@ -324,8 +325,12 @@ public class DynamoDBMetadataStore implements MetadataStore,
/** Invoker for write operations. */
private Invoker writeOp;
/** Invoker for scan operations. */
private Invoker scanOp;
private final AtomicLong readThrottleEvents = new AtomicLong(0);
private final AtomicLong writeThrottleEvents = new AtomicLong(0);
private final AtomicLong scanThrottleEvents = new AtomicLong(0);
private final AtomicLong batchWriteCapacityExceededEvents = new AtomicLong(0);
/**
@ -424,11 +429,6 @@ public void initialize(FileSystem fs, ITtlTimeProvider ttlTp)
tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket);
initDataAccessRetries(conf);
// set up a full retry policy
invoker = new Invoker(new S3GuardDataAccessRetryPolicy(conf),
this::retryEvent
);
this.ttlTimeProvider = ttlTp;
tableHandler = new DynamoDBMetadataStoreTableManager(
@ -543,6 +543,7 @@ private void initDataAccessRetries(Configuration config) {
= new S3GuardDataAccessRetryPolicy(config);
readOp = new Invoker(throttledRetryRetryPolicy, this::readRetryEvent);
writeOp = new Invoker(throttledRetryRetryPolicy, this::writeRetryEvent);
scanOp = new Invoker(throttledRetryRetryPolicy, this::scanRetryEvent);
}
@Override
@ -810,33 +811,31 @@ public DirListingMetadata listChildren(final Path path) throws IOException {
checkPath(path);
LOG.debug("Listing table {} in region {}: {}", tableName, region, path);
final QuerySpec spec = new QuerySpec()
.withHashKey(pathToParentKeyAttribute(path))
.withConsistentRead(true); // strictly consistent read
final List<PathMetadata> metas = new ArrayList<>();
// find the children in the table
return readOp.retry(
final ItemCollection<QueryOutcome> items = scanOp.retry(
"listChildren",
path.toString(),
true,
() -> {
final QuerySpec spec = new QuerySpec()
.withHashKey(pathToParentKeyAttribute(path))
.withConsistentRead(true); // strictly consistent read
final ItemCollection<QueryOutcome> items = table.query(spec);
() -> table.query(spec));
// now wrap the result with retry logic
try {
for (Item item : wrapWithRetries(items)) {
metas.add(itemToPathMetadata(item, username));
}
} catch (WrappedIOException e) {
// failure in the iterators; unwrap.
throw e.getCause();
}
final List<PathMetadata> metas = new ArrayList<>();
for (Item item : items) {
DDBPathMetadata meta = itemToPathMetadata(item, username);
metas.add(meta);
}
// Minor race condition here - if the path is deleted between
// getting the list of items and the directory metadata we might
// get a null in DDBPathMetadata.
DDBPathMetadata dirPathMeta = get(path);
final DirListingMetadata dirListing =
getDirListingMetadataFromDirMetaAndList(path, metas,
dirPathMeta);
return dirListing;
});
// Minor race condition here - if the path is deleted between
// getting the list of items and the directory metadata we might
// get a null in DDBPathMetadata.
return getDirListingMetadataFromDirMetaAndList(path, metas,
get(path));
}
DirListingMetadata getDirListingMetadataFromDirMetaAndList(Path path,
@ -1992,6 +1991,22 @@ void writeRetryEvent(
retryEvent(text, ex, attempts, idempotent);
}
/**
* Callback on a scan operation retried.
* @param text text of the operation
* @param ex exception
* @param attempts number of attempts
* @param idempotent is the method idempotent (this is assumed to be true)
*/
void scanRetryEvent(
String text,
IOException ex,
int attempts,
boolean idempotent) {
scanThrottleEvents.incrementAndGet();
retryEvent(text, ex, attempts, idempotent);
}
/**
* Callback from {@link Invoker} when an operation is retried.
* @param text text of the operation
@ -2048,14 +2063,38 @@ public long getWriteThrottleEventCount() {
return writeThrottleEvents.get();
}
/**
* Get the count of scan throttle events.
* @return the current count of scan throttle events.
*/
@VisibleForTesting
public long getScanThrottleEventCount() {
return scanThrottleEvents.get();
}
@VisibleForTesting
public long getBatchWriteCapacityExceededCount() {
return batchWriteCapacityExceededEvents.get();
}
@VisibleForTesting
/**
* Get the operation invoker for write operations.
* @return an invoker for retrying mutating operations on a store.
*/
public Invoker getInvoker() {
return invoker;
return writeOp;
}
/**
* Wrap an iterator returned from any scan with a retrying one.
* This includes throttle handling.
* Retries will update the relevant counters/metrics for scan operations.
* @param source source iterator
* @return a retrying iterator.
*/
public <T> Iterable<T> wrapWithRetries(
final Iterable<T> source) {
return new RetryingCollection<>("scan dynamoDB table", scanOp, source);
}
/**

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.s3guard;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
@ -128,9 +129,10 @@ protected void serviceStart() throws Exception {
* delete all entries from that bucket.
* @return the exit code.
* @throws ServiceLaunchException on failure.
* @throws IOException IO failure.
*/
@Override
public int execute() throws ServiceLaunchException {
public int execute() throws ServiceLaunchException, IOException {
URI uri = getUri();
String host = uri.getHost();
@ -144,7 +146,8 @@ public int execute() throws ServiceLaunchException {
LOG.info("Scanning for entries with prefix {} to delete from {}",
prefix, ddbms);
Iterable<DDBPathMetadata> entries = tableAccess.scanMetadata(builder);
Iterable<DDBPathMetadata> entries =
ddbms.wrapWithRetries(tableAccess.scanMetadata(builder));
List<Path> list = new ArrayList<>();
entries.iterator().forEachRemaining(e -> {
if (!(e instanceof S3GuardTableAccess.VersionMarker)) {
@ -169,7 +172,14 @@ public int execute() throws ServiceLaunchException {
new DurationInfo(LOG,
"deleting %s entries from %s",
count, ddbms.toString());
tableAccess.delete(list);
// sending this in one by one for more efficient retries
for (Path path: list) {
ddbms.getInvoker()
.retry("delete",
prefix,
true,
() -> tableAccess.delete(path));
}
duration.close();
long durationMillis = duration.value();
long timePerEntry = durationMillis / count;

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.s3guard;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.impl.WrappedIOException;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
/**
* A collection which wraps the result of a query or scan
* with retries.
* Important: iterate through this only once; the outcome
* of repeating an iteration is "undefined"
* @param <T> type of outcome.
*/
class RetryingCollection<T> implements Iterable<T> {
/**
* Source iterable.
*/
private final Iterable<T> source;
/**
* Invoker for retries.
*/
private final Invoker invoker;
/**
* Operation name for invoker.retry messages.
*/
private final String operation;
/**
* Constructor.
* @param operation Operation name for invoker.retry messages.
* @param invoker Invoker for retries.
* @param source Source iterable.
*/
RetryingCollection(
final String operation,
final Invoker invoker,
final Iterable<T> source) {
this.operation = operation;
this.source = source;
this.invoker = invoker;
}
/**
* Demand creates a new iterator which will retry all hasNext/next
* operations through the invoker supplied in the constructor.
* @return a new iterator.
*/
@Override
public Iterator<T> iterator() {
return new RetryingIterator(source.iterator());
}
/**
* An iterator which wraps a non-retrying iterator of scan results
* (i.e {@code S3GuardTableAccess.DDBPathMetadataIterator}.
*/
private final class RetryingIterator implements Iterator<T> {
private final Iterator<T> iterator;
private RetryingIterator(final Iterator<T> iterator) {
this.iterator = iterator;
}
/**
* {@inheritDoc}.
* @throws WrappedIOException for IO failure, including throttling.
*/
@Override
@Retries.RetryTranslated
public boolean hasNext() {
try {
return invoker.retry(
operation,
null,
true,
iterator::hasNext);
} catch (IOException e) {
throw new WrappedIOException(e);
}
}
/**
* {@inheritDoc}.
* @throws WrappedIOException for IO failure, including throttling.
*/
@Override
@Retries.RetryTranslated
public T next() {
try {
return invoker.retry(
"Scan Dynamo",
null,
true,
iterator::next);
} catch (IOException e) {
throw new WrappedIOException(e);
}
}
}
}

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import static com.google.common.base.Preconditions.checkNotNull;
@ -44,6 +45,7 @@
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.TABLE_VERSION;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.itemToPathMetadata;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.pathToKey;
/**
* Package-scoped accessor to table state in S3Guard.
@ -70,6 +72,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@Retries.OnceRaw
class S3GuardTableAccess {
private static final Logger LOG =
@ -107,6 +110,7 @@ private String getUsername() {
* @param spec query spec.
* @return the outcome.
*/
@Retries.OnceRaw
ItemCollection<QueryOutcome> query(QuerySpec spec) {
return table.query(spec);
}
@ -118,24 +122,33 @@ ItemCollection<QueryOutcome> query(QuerySpec spec) {
* @param spec query spec.
* @return an iterator over path entries.
*/
@Retries.OnceRaw
Iterable<DDBPathMetadata> queryMetadata(QuerySpec spec) {
return new DDBPathMetadataCollection<>(query(spec));
}
@Retries.OnceRaw
ItemCollection<ScanOutcome> scan(ExpressionSpecBuilder spec) {
return table.scan(spec.buildForScan());
}
@Retries.OnceRaw
Iterable<DDBPathMetadata> scanMetadata(ExpressionSpecBuilder spec) {
return new DDBPathMetadataCollection<>(scan(spec));
}
@Retries.OnceRaw
void delete(Collection<Path> paths) {
paths.stream()
.map(PathMetadataDynamoDBTranslation::pathToKey)
.forEach(table::deleteItem);
}
@Retries.OnceRaw
void delete(Path path) {
table.deleteItem(pathToKey(path));
}
/**
* A collection which wraps the result of a query or scan.
* Important: iterate through this only once; the outcome
@ -191,11 +204,13 @@ private DDBPathMetadataIterator(final IteratorSupport<Item, T> it) {
}
@Override
@Retries.OnceRaw
public boolean hasNext() {
return it.hasNext();
}
@Override
@Retries.OnceRaw
public DDBPathMetadata next() {
Item item = it.next();
Pair<String, String> key = primaryKey(item);

View File

@ -464,6 +464,22 @@ Otherwise, set a large timeout in `fs.s3a.scale.test.timeout`
The tests are executed in an order to only clean up created files after
the end of all the tests. If the tests are interrupted, the test data will remain.
## <a name="alternate_s3"></a> Load tests.
Some are designed to overload AWS services with more
requests per second than an AWS account is permitted.
The operation of these test maybe observable to other users of the same
account -especially if they are working in the AWS region to which the
tests are targeted.
There may also run up larger bills.
These tests all have the prefix `ILoadTest`
They do not run automatically: they must be explicitly run from the command line or an IDE.
Look in the source for these and reads the Javadocs before executing.
## <a name="alternate_s3"></a> Testing against non AWS S3 endpoints.
@ -1399,6 +1415,9 @@ as it may take a couple of SDK updates before it is ready.
in `fs.s3a.assumed.role.arn` for testing assumed roles,
and `fs.s3a.server-side-encryption.key` for encryption, for full coverage.
If you can, scale up the scale tests.
1. Run the `ILoadTest*` load tests from your IDE or via maven through
`mvn verify -Dtest=skip -Dit.test=ILoadTest\*` ; look for regressions in performance
as much as failures.
1. Create the site with `mvn site -DskipTests`; look in `target/site` for the report.
1. Review *every single `-output.txt` file in `hadoop-tools/hadoop-aws/target/failsafe-reports`,
paying particular attention to
@ -1492,6 +1511,7 @@ Then see if complete successfully in roughly the same time once the upgrade is a
to AWS services.
* Try and get other people, especially anyone with their own endpoints,
apps or different deployment environments, to run their own tests.
* Run the load tests, especially `ILoadTestS3ABulkDeleteThrottling`.
### Dealing with Deprecated APIs and New Features

View File

@ -156,7 +156,7 @@ public void testRenamePopulatesFileAncestors2() throws Exception {
S3ATestUtils.MetricDiff fileCopyBytes = new S3ATestUtils.MetricDiff(fs,
Statistic.FILES_COPIED_BYTES);
fs.rename(src, dest);
rename(src, dest);
describe("Rename has completed, examining data under " + base);
fileCopyDiff.assertDiffEquals("Number of files copied", 1);

View File

@ -525,6 +525,7 @@ public void testRevertMissingCommit() throws Throwable {
commit.setDestinationKey(fs.pathToKey(destFile));
fullThrottle();
actions.revertCommit(commit, null);
resetFailures();
assertPathExists("parent of reverted (nonexistent) commit",
destFile.getParent());
}

View File

@ -818,16 +818,25 @@ private static void buildPaths(
}
// create the file paths
for (int i = 0; i < fileCount; i++) {
String name = PREFIX + i;
String name = filenameOfIndex(i);
Path p = new Path(destDir, name);
filePaths.add(p);
}
for (int i = 0; i < dirCount; i++) {
String name = "dir-" + i;
String name = String.format("dir-%03d", i);
Path p = new Path(destDir, name);
dirPaths.add(p);
buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount);
}
}
/**
* Given an index, return a string to use as the filename.
* @param i index
* @return name
*/
public static String filenameOfIndex(final int i) {
return String.format("%s%03d", PREFIX, i);
}
}

View File

@ -130,8 +130,10 @@ public ITestDynamoDBMetadataStore() {
private String bucket;
@SuppressWarnings("StaticNonFinalField")
private static DynamoDBMetadataStore ddbmsStatic;
@SuppressWarnings("StaticNonFinalField")
private static String testDynamoDBTableName;
private static final List<Path> UNCHANGED_ENTRIES = Collections.emptyList();
@ -166,13 +168,17 @@ public void setUp() throws Exception {
try{
super.setUp();
tableHandler = getDynamoMetadataStore().getTableHandler();
} catch (FileNotFoundException e){
LOG.warn("MetadataStoreTestBase setup failed. Waiting for table to be "
+ "deleted before trying again.");
ddbmsStatic.getTable().waitForDelete();
+ "deleted before trying again.", e);
try {
ddbmsStatic.getTable().waitForDelete();
} catch (IllegalArgumentException | InterruptedException ex) {
LOG.warn("When awaiting a table to be cleaned up", e);
}
super.setUp();
}
tableHandler = getDynamoMetadataStore().getTableHandler();
}
@BeforeClass
@ -780,10 +786,16 @@ private void removeVersionMarkerTag(Table table, AmazonDynamoDB addb) {
.withTagKeys(VERSION_MARKER_TAG_NAME));
}
private void deleteVersionMarkerItem(Table table) {
/**
* Deletes a version marker; spins briefly to await it disappearing.
* @param table table to delete the key
* @throws Exception failure
*/
private void deleteVersionMarkerItem(Table table) throws Exception {
table.deleteItem(VERSION_MARKER_PRIMARY_KEY);
assertNull("Version marker should be null after deleting it " +
"from the table.", table.getItem(VERSION_MARKER_PRIMARY_KEY));
eventually(30_000, 1_0, () ->
assertNull("Version marker should be null after deleting it " +
"from the table.", table.getItem(VERSION_MARKER_PRIMARY_KEY)));
}
/**
@ -1003,7 +1015,8 @@ public void testDeleteTable() throws Exception {
final String tableName = getTestTableName("testDeleteTable");
Path testPath = new Path(new Path(fsUri), "/" + tableName);
final S3AFileSystem s3afs = getFileSystem();
final Configuration conf = getTableCreationConfig();
// patch the filesystem config as this is one read in initialize()
final Configuration conf = s3afs.getConf();
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
enableOnDemand(conf);
DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore();

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.s3guard;
import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -34,6 +35,7 @@
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
import com.amazonaws.services.dynamodbv2.xspec.ExpressionSpecBuilder;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.FixMethodOrder;
import org.junit.Test;
@ -47,14 +49,15 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.impl.WrappedIOException;
import org.apache.hadoop.fs.s3a.AWSServiceThrottledException;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.scale.AbstractITestS3AMetadataStoreScale;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.DurationInfo;
@ -62,7 +65,7 @@
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.s3guard.MetadataStoreTestBase.basicFileStatus;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.PARENT;
import static org.junit.Assume.*;
import static org.junit.Assume.assumeTrue;
/**
* Scale test for DynamoDBMetadataStore.
@ -91,6 +94,20 @@ public class ITestDynamoDBMetadataStoreScale
private static final long MAXIMUM_READ_CAPACITY = 10;
private static final long MAXIMUM_WRITE_CAPACITY = 15;
/**
* Time in milliseconds to sleep after a test throttled:
* {@value}.
* This is to help isolate throttling to the test which failed,
* rather than have it surface in a followup test.
* Also the test reports will record durations more accurately,
* as JUnit doesn't include setup/teardown times in its reports.
* There's a cost: single test runs will sleep, and the last test
* run may throttle when it doesn't need to.
* The last test {}@link {@link #test_999_delete_all_entries()}
* doesn't do the sleep so a full batch run should not suffer here.
*/
public static final int THROTTLE_RECOVER_TIME_MILLIS = 5_000;
private DynamoDBMetadataStore ddbms;
private DynamoDBMetadataStoreTableManager tableHandler;
@ -119,7 +136,7 @@ public class ITestDynamoDBMetadataStoreScale
* @throws AssumptionViolatedException if the FS isn't running S3Guard + DDB/
*/
@Override
public MetadataStore createMetadataStore() throws IOException {
public DynamoDBMetadataStore createMetadataStore() throws IOException {
S3AFileSystem fs = getFileSystem();
assumeTrue("S3Guard is disabled for " + fs.getUri(),
fs.hasMetadataStore());
@ -145,12 +162,16 @@ public MetadataStore createMetadataStore() throws IOException {
conf.set(S3GUARD_DDB_TABLE_NAME_KEY, tableName);
conf.set(S3GUARD_DDB_REGION_KEY, region);
conf.set(S3GUARD_DDB_THROTTLE_RETRY_INTERVAL, "50ms");
conf.set(S3GUARD_DDB_MAX_RETRIES, "2");
conf.set(S3GUARD_DDB_MAX_RETRIES, "1");
conf.set(MAX_ERROR_RETRIES, "1");
conf.set(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, "5ms");
DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
ms.initialize(conf, new S3Guard.TtlTimeProvider(conf));
// init the metastore in a bigger retry loop than the test setup
// in case the previous test case overloaded things
final Invoker fsInvoker = fs.createStoreContext().getInvoker();
fsInvoker.retry("init metastore", null, true,
() -> ms.initialize(conf, new S3Guard.TtlTimeProvider(conf)));
// wire up the owner FS so that we can make assertions about throttle
// events
ms.bindToOwnerFilesystem(fs);
@ -168,8 +189,7 @@ public void setup() throws Exception {
table = ddb.getTable(tableName);
originalCapacity = table.describe().getProvisionedThroughput();
// If you set the same provisioned I/O as already set it throws an
// exception, avoid that.
// is this table too big for throttling to surface?
isOverProvisionedForTest = (
originalCapacity.getReadCapacityUnits() > MAXIMUM_READ_CAPACITY
|| originalCapacity.getWriteCapacityUnits() > MAXIMUM_WRITE_CAPACITY);
@ -177,29 +197,47 @@ public void setup() throws Exception {
@Override
public void teardown() throws Exception {
if (ddbms != null) {
S3GuardTableAccess tableAccess = new S3GuardTableAccess(ddbms);
ExpressionSpecBuilder builder = new ExpressionSpecBuilder();
builder.withCondition(
ExpressionSpecBuilder.S(PARENT).beginsWith("/test/"));
Iterable<DDBPathMetadata> entries = tableAccess.scanMetadata(builder);
List<Path> list = new ArrayList<>();
entries.iterator().forEachRemaining(e -> {
Path p = e.getFileStatus().getPath();
LOG.info("Deleting {}", p);
list.add(p);
});
tableAccess.delete(list);
}
IOUtils.cleanupWithLogger(LOG, ddbms);
super.teardown();
}
/**
* Is throttling likely?
* @return true if the DDB table has prepaid IO and is small enough
* to throttle.
*/
private boolean expectThrottling() {
return !isOverProvisionedForTest && !isOnDemandTable;
}
/**
* Recover from throttling by sleeping briefly.
*/
private void recoverFromThrottling() throws InterruptedException {
LOG.info("Sleeping to recover from throttling for {} ms",
THROTTLE_RECOVER_TIME_MILLIS);
Thread.sleep(THROTTLE_RECOVER_TIME_MILLIS);
}
/**
* The subclass expects the superclass to be throttled; sometimes it is.
*/
@Test
@Override
public void test_010_Put() throws Throwable {
ThrottleTracker tracker = new ThrottleTracker(ddbms);
try {
// if this doesn't throttle, all is well.
super.test_010_Put();
} catch (AWSServiceThrottledException ex) {
// if the service was throttled, all is good.
// log and continue
LOG.warn("DDB connection was throttled", ex);
} finally {
LOG.info("Statistics {}", tracker);
}
}
/**
* The subclass expects the superclass to be throttled; sometimes it is.
*/
@ -283,9 +321,8 @@ public void test_030_BatchedWrite() throws Exception {
}
}
});
if (expectThrottling()) {
assertNotEquals("No batch retries in " + result,
0, result.getBatchThrottles());
if (expectThrottling() && result.probeThrottlingDetected()) {
recoverFromThrottling();
}
} finally {
describe("Cleaning up table %s", tableName);
@ -326,7 +363,12 @@ public void test_050_getVersionMarkerItem() throws Throwable {
execute("get",
OPERATIONS_PER_THREAD * 2,
expectThrottling(),
() -> tableHandler.getVersionMarkerItem()
() -> {
try {
tableHandler.getVersionMarkerItem();
} catch (FileNotFoundException ignored) {
}
}
);
}
@ -473,6 +515,39 @@ public void test_900_instrumentation() throws Throwable {
statistics.getLong(throttledKey) > 0);
}
@Test
public void test_999_delete_all_entries() throws Throwable {
describe("Delete all entries from the table");
S3GuardTableAccess tableAccess = new S3GuardTableAccess(ddbms);
ExpressionSpecBuilder builder = new ExpressionSpecBuilder();
final String path = "/test/";
builder.withCondition(
ExpressionSpecBuilder.S(PARENT).beginsWith(path));
Iterable<DDBPathMetadata> entries =
ddbms.wrapWithRetries(tableAccess.scanMetadata(builder));
List<Path> list = new ArrayList<>();
try {
entries.iterator().forEachRemaining(e -> {
Path p = e.getFileStatus().getPath();
LOG.info("Deleting {}", p);
list.add(p);
});
} catch (WrappedIOException e) {
// the iterator may have overloaded; swallow if so.
if (!(e.getCause() instanceof AWSServiceThrottledException)) {
throw e;
}
}
// sending this in one by one for more efficient retries
for (Path p : list) {
ddbms.getInvoker()
.retry("delete",
path,
true,
() -> tableAccess.delete(p));
}
}
/**
* Execute a set of operations in parallel, collect throttling statistics
* and return them.
@ -504,7 +579,8 @@ public ThrottleTracker execute(String operation,
final ContractTestUtils.NanoTimer t
= new ContractTestUtils.NanoTimer();
for (int j = 0; j < operationsPerThread; j++) {
if (tracker.isThrottlingDetected()) {
if (tracker.isThrottlingDetected()
|| throttleExceptions.get() > 0) {
outcome.skipped = true;
return outcome;
}
@ -539,13 +615,12 @@ public ThrottleTracker execute(String operation,
LOG.info("Completed {} with {}", operation, tracker);
LOG.info("time to execute: {} millis", elapsedMs);
for (Future<ExecutionOutcome> future : futures) {
assertTrue("Future timed out", future.isDone());
}
Assertions.assertThat(futures)
.describedAs("Futures of all tasks")
.allMatch(Future::isDone);
tracker.probe();
if (expectThrottling) {
tracker.assertThrottlingDetected();
if (expectThrottling() && tracker.probeThrottlingDetected()) {
recoverFromThrottling();
}
for (Future<ExecutionOutcome> future : futures) {

View File

@ -18,7 +18,8 @@
package org.apache.hadoop.fs.s3a.s3guard;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Something to track throttles in DynamoDB metastores.
@ -34,13 +35,17 @@
*/
class ThrottleTracker {
private static final Logger LOG = LoggerFactory.getLogger(
ThrottleTracker.class);
private final DynamoDBMetadataStore ddbms;
private long writeThrottleEventOrig = 0;
private long writeThrottleEventOrig;
private long readThrottleEventOrig = 0;
private long readThrottleEventOrig;
private long batchWriteThrottleCountOrig = 0;
private long batchWriteThrottleCountOrig;
private long scanThrottleCountOrig;
private long readThrottles;
@ -48,6 +53,8 @@ class ThrottleTracker {
private long batchThrottles;
private long scanThrottles;
ThrottleTracker(final DynamoDBMetadataStore ddbms) {
this.ddbms = ddbms;
reset();
@ -65,6 +72,9 @@ public synchronized void reset() {
batchWriteThrottleCountOrig
= ddbms.getBatchWriteCapacityExceededCount();
scanThrottleCountOrig
= ddbms.getScanThrottleEventCount();
}
/**
@ -78,6 +88,8 @@ public synchronized boolean probe() {
- writeThrottleEventOrig);
setBatchThrottles(ddbms.getBatchWriteCapacityExceededCount()
- batchWriteThrottleCountOrig);
setScanThrottles(ddbms.getScanThrottleEventCount()
- scanThrottleCountOrig);
return isThrottlingDetected();
}
@ -85,27 +97,35 @@ public synchronized boolean probe() {
public String toString() {
return String.format(
"Tracker with read throttle events = %d;"
+ " write events = %d;"
+ " batch throttles = %d",
getReadThrottles(), getWriteThrottles(), getBatchThrottles());
+ " write throttles = %d;"
+ " batch throttles = %d;"
+ " scan throttles = %d",
getReadThrottles(), getWriteThrottles(), getBatchThrottles(),
getScanThrottles());
}
/**
* Assert that throttling has been detected.
* Check that throttling was detected; Warn if not.
* @return true if throttling took place.
*/
public void assertThrottlingDetected() {
Assert.assertTrue("No throttling detected in " + this +
" against " + ddbms.toString(),
isThrottlingDetected());
public boolean probeThrottlingDetected() {
if (!isThrottlingDetected()) {
LOG.warn("No throttling detected in {} against {}",
this, ddbms);
return false;
}
return true;
}
/**
* Has there been any throttling on an operation?
* @return true iff read, write or batch operations were throttled.
* @return true if any operations were throttled.
*/
public boolean isThrottlingDetected() {
return getReadThrottles() > 0 || getWriteThrottles()
> 0 || getBatchThrottles() > 0;
return getReadThrottles() > 0
|| getWriteThrottles() > 0
|| getBatchThrottles() > 0
|| getScanThrottles() > 0;
}
public long getReadThrottles() {
@ -131,4 +151,12 @@ public long getBatchThrottles() {
public void setBatchThrottles(long batchThrottles) {
this.batchThrottles = batchThrottles;
}
public long getScanThrottles() {
return scanThrottles;
}
public void setScanThrottles(final long scanThrottles) {
this.scanThrottles = scanThrottles;
}
}

View File

@ -0,0 +1,380 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.scale;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.impl.FunctionsRaisingIOE;
import org.apache.hadoop.fs.impl.WrappedIOException;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.auth.delegation.Csvout;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.BULK_DELETE_PAGE_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.BULK_DELETE_PAGE_SIZE_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.MAX_ENTRIES_TO_DELETE;
/**
* Test some scalable operations related to file renaming and deletion.
* Much of the setup code is lifted from ILoadTestSessionCredentials;
* whereas that was designed to overload an STS endpoint, this just
* tries to overload a single S3 shard with too many bulk IO requests
* -and so see what happens.
* Note: UA field includes the configuration tested for the benefit
* of anyone looking through the server logs.
*/
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(Parameterized.class)
public class ILoadTestS3ABulkDeleteThrottling extends S3AScaleTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(ILoadTestS3ABulkDeleteThrottling.class);
protected static final int THREADS = 20;
public static final int TOTAL_KEYS = 25000;
public static final int SMALL = BULK_DELETE_PAGE_SIZE_DEFAULT;
public static final int SMALL_REQS = TOTAL_KEYS / SMALL;
public static final int MAXIMUM = MAX_ENTRIES_TO_DELETE;
public static final int MAXIMUM_REQS = TOTAL_KEYS / MAXIMUM;
// shared across test cases.
@SuppressWarnings("StaticNonFinalField")
private static boolean testWasThrottled;
private final ExecutorService executor =
HadoopExecutors.newFixedThreadPool(
THREADS,
new ThreadFactoryBuilder()
.setNameFormat("#%d")
.build());
private final CompletionService<Outcome>
completionService = new ExecutorCompletionService<>(executor);
private File dataDir;
private final boolean throttle;
private final int pageSize;
private final int requests;
/**
* Test array for parameterized test runs.
* <ul>
* <li>AWS client throttle on/off</li>
* <li>Page size</li>
* </ul>
*
* @return a list of parameter tuples.
*/
@Parameterized.Parameters(
name = "bulk-delete-aws-retry={0}-requests={2}-size={1}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{false, SMALL, SMALL_REQS},
{false, MAXIMUM, MAXIMUM_REQS},
{true, SMALL, SMALL_REQS},
{true, MAXIMUM, MAXIMUM_REQS},
});
}
/**
* Parameterized constructor.
* @param throttle AWS client throttle on/off
* @param pageSize Page size
* @param requests request count;
*/
public ILoadTestS3ABulkDeleteThrottling(
final boolean throttle,
final int pageSize,
final int requests) {
this.throttle = throttle;
Preconditions.checkArgument(pageSize > 0,
"page size too low %s", pageSize);
this.pageSize = pageSize;
this.requests = requests;
}
@Override
protected Configuration createScaleConfiguration() {
Configuration conf = super.createScaleConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
return conf;
}
@Override
public void setup() throws Exception {
final Configuration conf = getConf();
S3ATestUtils.removeBaseAndBucketOverrides(conf,
EXPERIMENTAL_AWS_INTERNAL_THROTTLING,
BULK_DELETE_PAGE_SIZE,
USER_AGENT_PREFIX);
conf.setBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING, throttle);
Assertions.assertThat(pageSize)
.describedAs("page size")
.isGreaterThan(0);
conf.setInt(BULK_DELETE_PAGE_SIZE, pageSize);
conf.set(USER_AGENT_PREFIX,
String.format("ILoadTestS3ABulkDeleteThrottling-%s-%04d",
throttle, pageSize));
super.setup();
Assume.assumeTrue("multipart delete disabled",
conf.getBoolean(ENABLE_MULTI_DELETE, true));
dataDir = GenericTestUtils.getTestDir("throttling");
dataDir.mkdirs();
final String size = getFileSystem().getConf().get(BULK_DELETE_PAGE_SIZE);
Assertions.assertThat(size)
.describedAs("page size")
.isNotEmpty();
Assertions.assertThat(getFileSystem().getConf()
.getInt(BULK_DELETE_PAGE_SIZE, -1))
.isEqualTo(pageSize);
}
@Test
public void test_010_Reset() throws Throwable {
testWasThrottled = false;
}
@Test
public void test_020_DeleteThrottling() throws Throwable {
describe("test how S3 reacts to massive multipart deletion requests");
final File results = deleteFiles(requests, pageSize);
LOG.info("Test run completed against {}:\n see {}", getFileSystem(),
results);
if (testWasThrottled) {
LOG.warn("Test was throttled");
} else {
LOG.info("No throttling recorded in filesystem");
}
}
@Test
public void test_030_Sleep() throws Throwable {
maybeSleep();
}
private void maybeSleep() throws InterruptedException, IOException {
if (testWasThrottled) {
LOG.info("Sleeping briefly to let store recover");
Thread.sleep(30_000);
getFileSystem().delete(path("recovery"), true);
testWasThrottled = false;
}
}
/**
* delete files.
* @param requestCount number of requests.
* @throws Exception failure
* @return CSV filename
*/
private File deleteFiles(final int requestCount,
final int entries)
throws Exception {
File csvFile = new File(dataDir,
String.format("delete-%03d-%04d-%s.csv",
requestCount, entries, throttle));
describe("Issuing %d requests of size %d, saving log to %s",
requestCount, entries, csvFile);
Path basePath = path("testDeleteObjectThrottling");
final S3AFileSystem fs = getFileSystem();
final String base = fs.pathToKey(basePath);
final List<DeleteObjectsRequest.KeyVersion> fileList
= buildDeleteRequest(base, entries);
final FileWriter out = new FileWriter(csvFile);
Csvout csvout = new Csvout(out, "\t", "\n");
Outcome.writeSchema(csvout);
final ContractTestUtils.NanoTimer jobTimer =
new ContractTestUtils.NanoTimer();
for (int i = 0; i < requestCount; i++) {
final int id = i;
completionService.submit(() -> {
final long startTime = System.currentTimeMillis();
Thread.currentThread().setName("#" + id);
LOG.info("Issuing request {}", id);
final ContractTestUtils.NanoTimer timer =
new ContractTestUtils.NanoTimer();
Exception ex = null;
try {
fs.removeKeys(fileList, false, null);
} catch (IOException e) {
ex = e;
}
timer.end("Request " + id);
return new Outcome(id, startTime, timer,
ex);
});
}
NanoTimerStats stats = new NanoTimerStats("Overall");
NanoTimerStats success = new NanoTimerStats("Successful");
NanoTimerStats throttled = new NanoTimerStats("Throttled");
List<Outcome> throttledEvents = new ArrayList<>();
for (int i = 0; i < requestCount; i++) {
Outcome outcome = completionService.take().get();
ContractTestUtils.NanoTimer timer = outcome.timer;
Exception ex = outcome.exception;
outcome.writeln(csvout);
stats.add(timer);
if (ex != null) {
// throttling event occurred.
LOG.info("Throttled at event {}", i, ex);
throttled.add(timer);
throttledEvents.add(outcome);
} else {
success.add(timer);
}
}
csvout.close();
jobTimer.end("Execution of operations");
// now print the stats
LOG.info("Summary file is " + csvFile);
LOG.info("Made {} requests with {} throttle events\n: {}\n{}\n{}",
requestCount,
throttled.getCount(),
stats,
throttled,
success);
double duration = jobTimer.duration();
double iops = requestCount * entries * 1.0e9 / duration;
LOG.info(String.format("TPS %3f operations/second",
iops));
// log at debug
if (LOG.isDebugEnabled()) {
throttledEvents.forEach((outcome -> {
LOG.debug("{}: duration: {}",
outcome.id, outcome.timer.elapsedTimeMs());
}));
}
return csvFile;
}
private List<DeleteObjectsRequest.KeyVersion> buildDeleteRequest(
String base, int count) {
List<DeleteObjectsRequest.KeyVersion> request = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
request.add(new DeleteObjectsRequest.KeyVersion(
String.format("%s/file-%04d", base, i)));
}
return request;
}
private <R> R wrap(FunctionsRaisingIOE.CallableRaisingIOE<R> callable) {
try {
return callable.apply();
} catch (IOException e) {
throw new WrappedIOException(e);
}
}
/**
* Outcome of one of the load operations.
*/
private static class Outcome {
private final int id;
private final long startTime;
private final ContractTestUtils.NanoTimer timer;
private final Exception exception;
Outcome(final int id,
final long startTime,
final ContractTestUtils.NanoTimer timer,
final Exception exception) {
this.id = id;
this.startTime = startTime;
this.timer = timer;
this.exception = exception;
}
/**
* Write this record.
* @param out the csvout to write through.
* @return the csvout instance
* @throws IOException IO failure.
*/
public Csvout writeln(Csvout out) throws IOException {
return out.write(
id,
startTime,
exception == null ? 1 : 0,
timer.getStartTime(),
timer.getEndTime(),
timer.duration(),
'"' + (exception == null ? "" : exception.getMessage()) + '"')
.newline();
}
/**
* Write the schema of the outcome records.
* @param out CSV destinatin
* @throws IOException IO failure.
*/
public static void writeSchema(Csvout out) throws IOException {
out.write("id", "starttime", "success", "started", "ended",
"duration", "error").newline();
}
}
}

View File

@ -18,27 +18,58 @@
package org.apache.hadoop.fs.s3a.scale;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.util.DurationInfo;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
import static org.apache.hadoop.fs.s3a.Constants.BULK_DELETE_PAGE_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.createFiles;
import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.filenameOfIndex;
/**
* Test some scalable operations related to file renaming and deletion.
* We set a bulk page size low enough that even the default test scale will
* issue multiple delete requests during a delete sequence -so test that
* operation more efficiently.
*/
public class ITestS3ADeleteManyFiles extends S3AScaleTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3ADeleteManyFiles.class);
public static final String PREFIX = ITestPartialRenamesDeletes.PREFIX;
/**
* Delete Page size: {@value}.
*/
static final int DELETE_PAGE_SIZE = 50;
@Override
protected Configuration createScaleConfiguration() {
Configuration conf = super.createScaleConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
S3ATestUtils.removeBaseAndBucketOverrides(conf,
EXPERIMENTAL_AWS_INTERNAL_THROTTLING,
BULK_DELETE_PAGE_SIZE,
USER_AGENT_PREFIX);
conf.setBoolean(EXPERIMENTAL_AWS_INTERNAL_THROTTLING, false);
conf.setInt(BULK_DELETE_PAGE_SIZE, DELETE_PAGE_SIZE);
return conf;
}
/**
* CAUTION: If this test starts failing, please make sure that the
@ -46,33 +77,37 @@ public class ITestS3ADeleteManyFiles extends S3AScaleTestBase {
* set too low. Alternatively, consider reducing the
* <code>scale.test.operation.count</code> parameter in
* <code>getOperationCount()</code>.
*
* If it is slow: look at the size of any S3Guard Table used.
* @see #getOperationCount()
*/
@Test
public void testBulkRenameAndDelete() throws Throwable {
final Path scaleTestDir = path("testBulkRenameAndDelete");
final Path srcDir = new Path(scaleTestDir, "src");
final Path finalDir = new Path(scaleTestDir, "final");
final int count = getConf().getInt(KEY_FILE_COUNT,
DEFAULT_FILE_COUNT);
describe("Testing bulk rename and delete of %d files", count);
final Path scaleTestDir = path("testBulkRenameAndDelete");
final Path srcParentDir = new Path(scaleTestDir, "srcParent");
final Path srcDir = new Path(srcParentDir, "src");
final Path finalParentDir = new Path(scaleTestDir, "finalParent");
final Path finalDir = new Path(finalParentDir, "final");
final S3AFileSystem fs = getFileSystem();
ContractTestUtils.rm(fs, scaleTestDir, true, false);
rm(fs, scaleTestDir, true, false);
fs.mkdirs(srcDir);
fs.mkdirs(finalParentDir);
createFiles(fs, srcDir, 1, count, 0);
FileStatus[] statuses = fs.listStatus(srcDir);
int nSrcFiles = statuses.length;
long sourceSize = 0;
for (FileStatus status : statuses) {
sourceSize += status.getLen();
}
long sourceSize = Arrays.stream(statuses)
.mapToLong(FileStatus::getLen)
.sum();
assertEquals("Source file Count", count, nSrcFiles);
ContractTestUtils.NanoTimer renameTimer = new ContractTestUtils.NanoTimer();
try (DurationInfo ignored = new DurationInfo(LOG,
"Rename %s to %s", srcDir, finalDir)) {
assertTrue("Rename failed", fs.rename(srcDir, finalDir));
rename(srcDir, finalDir);
}
renameTimer.end();
LOG.info("Effective rename bandwidth {} MB/s",
@ -80,19 +115,31 @@ public void testBulkRenameAndDelete() throws Throwable {
LOG.info(String.format(
"Time to rename a file: %,03f milliseconds",
(renameTimer.nanosPerOperation(count) * 1.0f) / 1.0e6));
assertEquals(nSrcFiles, fs.listStatus(finalDir).length);
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
new Path(srcDir, PREFIX + 0));
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
new Path(srcDir, PREFIX + count / 2));
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename",
new Path(srcDir, PREFIX + (count - 1)));
ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
new Path(finalDir, PREFIX + 0));
ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
new Path(finalDir, PREFIX + count/2));
ContractTestUtils.assertPathExists(fs, "not renamed to dest dir",
new Path(finalDir, PREFIX + (count-1)));
Assertions.assertThat(lsR(fs, srcParentDir, true))
.describedAs("Recursive listing of source dir %s", srcParentDir)
.isEqualTo(0);
assertPathDoesNotExist("not deleted after rename",
new Path(srcDir, filenameOfIndex(0)));
assertPathDoesNotExist("not deleted after rename",
new Path(srcDir, filenameOfIndex(count / 2)));
assertPathDoesNotExist("not deleted after rename",
new Path(srcDir, filenameOfIndex(count - 1)));
// audit destination
Assertions.assertThat(lsR(fs, finalDir, true))
.describedAs("size of recursive destination listFiles(%s)", finalDir)
.isEqualTo(count);
Assertions.assertThat(fs.listStatus(finalDir))
.describedAs("size of destination listStatus(%s)", finalDir)
.hasSize(count);
assertPathExists("not renamed to dest dir",
new Path(finalDir, filenameOfIndex(0)));
assertPathExists("not renamed to dest dir",
new Path(finalDir, filenameOfIndex(count / 2)));
assertPathExists("not renamed to dest dir",
new Path(finalDir, filenameOfIndex(count - 1)));
ContractTestUtils.NanoTimer deleteTimer =
new ContractTestUtils.NanoTimer();
@ -104,6 +151,11 @@ public void testBulkRenameAndDelete() throws Throwable {
LOG.info(String.format(
"Time to delete an object %,03f milliseconds",
(deleteTimer.nanosPerOperation(count) * 1.0f) / 1.0e6));
Assertions.assertThat(lsR(fs, finalParentDir, true))
.describedAs("Recursive listing of deleted rename destination %s",
finalParentDir)
.isEqualTo(0);
}
}