diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java index c04c1bb47f..19ee9d1414 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java @@ -244,6 +244,13 @@ public final class StoreStatisticNames { public static final String OBJECT_MULTIPART_UPLOAD_ABORTED = "object_multipart_aborted"; + /** + * Object multipart list request. + * Value :{@value}. + */ + public static final String OBJECT_MULTIPART_UPLOAD_LIST = + "object_multipart_list"; + /** * Object put/multipart upload count. * Value :{@value}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index d69d01f994..8b174e92b2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1318,4 +1318,19 @@ private Constants() { * The bucket region header. */ public static final String BUCKET_REGION_HEADER = "x-amz-bucket-region"; + + /** + * Should directory operations purge uploads? + * This adds at least one parallelized list operation to the call, + * plus the overhead of deletions. + * Value: {@value}. + */ + public static final String DIRECTORY_OPERATIONS_PURGE_UPLOADS = + "fs.s3a.directory.operations.purge.uploads"; + + /** + * Default value of {@link #DIRECTORY_OPERATIONS_PURGE_UPLOADS}: {@value}. + */ + public static final boolean DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT = false; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java index efca093204..b2057c211d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java @@ -36,7 +36,7 @@ import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.store.audit.AuditSpan; -import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_LIST; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; @@ -66,7 +66,7 @@ private MultipartUtils() { } * @param maxKeys maximum batch size to request at a time from S3. * @return an iterator of matching uploads */ - static MultipartUtils.UploadIterator listMultipartUploads( + static RemoteIterator listMultipartUploads( final StoreContext storeContext, S3Client s3, @Nullable String prefix, @@ -196,7 +196,7 @@ private void requestNextBatch() throws IOException { listing = invoker.retry("listMultipartUploads", prefix, true, trackDurationOfOperation(storeContext.getInstrumentation(), - MULTIPART_UPLOAD_LIST.getSymbol(), + OBJECT_MULTIPART_UPLOAD_LIST.getSymbol(), () -> s3.listMultipartUploads(requestBuilder.build()))); LOG.debug("Listing found {} upload(s)", listing.uploads().size()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index d7149d7dea..defbcd94a5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -258,6 +258,7 @@ import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.util.Preconditions.checkArgument; +import static org.apache.hadoop.util.functional.RemoteIterators.foreach; import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator; /** @@ -384,6 +385,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private SignerManager signerManager; private S3AInternals s3aInternals; + /** + * Do directory operations purge pending uploads? + */ + private boolean dirOperationsPurgeUploads; + /** * Page size for deletions. */ @@ -565,6 +571,9 @@ public void initialize(URI name, Configuration originalConf) //check but do not store the block size longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); + // should the delete also purge uploads. + dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, + DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT); this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT); long prefetchBlockSizeLong = @@ -1230,7 +1239,7 @@ public void abortOutstandingMultipartUploads(long seconds) purgeBefore); invoker.retry("Purging multipart uploads", bucket, true, () -> { - MultipartUtils.UploadIterator uploadIterator = + RemoteIterator uploadIterator = MultipartUtils.listMultipartUploads(createStoreContext(), s3Client, null, maxKeys); while (uploadIterator.hasNext()) { @@ -2283,12 +2292,14 @@ private long innerRename(Path source, Path dest) // Initiate the rename. // this will call back into this class via the rename callbacks + final StoreContext storeContext = createStoreContext(); RenameOperation renameOperation = new RenameOperation( - createStoreContext(), + storeContext, src, srcKey, p.getLeft(), dst, dstKey, p.getRight(), - new OperationCallbacksImpl(), - pageSize); + new OperationCallbacksImpl(storeContext), + pageSize, + dirOperationsPurgeUploads); return renameOperation.execute(); } @@ -2309,8 +2320,19 @@ private final class OperationCallbacksImpl implements OperationCallbacks { /** Audit Span at time of creation. */ private final AuditSpan auditSpan; - private OperationCallbacksImpl() { - auditSpan = getActiveAuditSpan(); + private final StoreContext storeContext; + + private OperationCallbacksImpl(final StoreContext storeContext) { + this.storeContext = requireNonNull(storeContext); + this.auditSpan = storeContext.getActiveAuditSpan(); + } + + /** + * Get the audit span. + * @return the span + */ + private AuditSpan getAuditSpan() { + return auditSpan; } @Override @@ -2410,7 +2432,29 @@ public RemoteIterator listObjects( Listing.ACCEPT_ALL_BUT_S3N, auditSpan)); } - } + + /** + * Abort multipart uploads under a path. + * @param prefix prefix for uploads to abort + * @return a count of aborts + * @throws IOException trouble; FileNotFoundExceptions are swallowed. + */ + @Override + @Retries.RetryTranslated + public long abortMultipartUploadsUnderPrefix(String prefix) + throws IOException { + getAuditSpan().activate(); + // this deactivates the audit span somehow + final RemoteIterator uploads = + S3AFileSystem.this.listUploadsUnderPrefix(storeContext, prefix); + // so reactivate it. + getAuditSpan().activate(); + return foreach(uploads, upload -> + invoker.retry("Aborting multipart commit", upload.key(), true, () -> + S3AFileSystem.this.abortMultipartUpload(upload))); + } + + } // end OperationCallbacksImpl /** * Callbacks from {@link Listing}. @@ -3371,14 +3415,17 @@ protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOEx // span covers delete, getFileStatus, fake directory operations. try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(), path.toString(), null)) { + // SC will include active span + final StoreContext storeContext = createStoreContext(); boolean outcome = trackDuration(getDurationTrackerFactory(), INVOCATION_DELETE.getSymbol(), new DeleteOperation( - createStoreContext(), + storeContext, innerGetFileStatus(path, true, StatusProbeEnum.ALL), recursive, - new OperationCallbacksImpl(), - pageSize)); + new OperationCallbacksImpl(storeContext), + pageSize, + dirOperationsPurgeUploads)); if (outcome) { try { maybeCreateFakeParentDirectory(path); @@ -5151,13 +5198,39 @@ S3ALocatedFileStatus toLocatedFileStatus(S3AFileStatus status) @InterfaceAudience.Private @Retries.RetryTranslated @AuditEntryPoint - public MultipartUtils.UploadIterator listUploads(@Nullable String prefix) + public RemoteIterator listUploads(@Nullable String prefix) throws IOException { // span is picked up retained in the listing. - return trackDurationAndSpan(MULTIPART_UPLOAD_LIST, prefix, null, () -> - MultipartUtils.listMultipartUploads( - createStoreContext(), s3Client, prefix, maxKeys - )); + checkNotClosed(); + try (AuditSpan span = createSpan(MULTIPART_UPLOAD_LIST.getSymbol(), + prefix, null)) { + return listUploadsUnderPrefix(createStoreContext(), prefix); + } + } + + /** + * List any pending multipart uploads whose keys begin with prefix, using + * an iterator that can handle an unlimited number of entries. + * See {@link #listMultipartUploads(String)} for a non-iterator version of + * this. + * @param storeContext store conext. + * @param prefix optional key prefix to search + * @return Iterator over multipart uploads. + * @throws IOException on failure + */ + @InterfaceAudience.Private + @Retries.RetryTranslated + public RemoteIterator listUploadsUnderPrefix( + final StoreContext storeContext, + final @Nullable String prefix) + throws IOException { + // span is picked up retained in the listing. + String p = prefix; + if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) { + p = prefix + "/"; + } + // duration tracking is done in iterator. + return MultipartUtils.listMultipartUploads(storeContext, s3Client, p, maxKeys); } /** @@ -5179,9 +5252,10 @@ public List listMultipartUploads(String prefix) } String p = prefix; return invoker.retry("listMultipartUploads", p, true, () -> { - ListMultipartUploadsRequest.Builder requestBuilder = getRequestFactory() - .newListMultipartUploadsRequestBuilder(p); - return s3Client.listMultipartUploads(requestBuilder.build()).uploads(); + final ListMultipartUploadsRequest request = getRequestFactory() + .newListMultipartUploadsRequestBuilder(p).build(); + return trackDuration(getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () -> + s3Client.listMultipartUploads(request).uploads()); }); } @@ -5190,37 +5264,35 @@ public List listMultipartUploads(String prefix) * Retry policy: none. * @param destKey destination key * @param uploadId Upload ID + * @throws IOException IO failure, including any uprated SdkException */ - @Retries.OnceRaw - void abortMultipartUpload(String destKey, String uploadId) { - LOG.info("Aborting multipart upload {} to {}", uploadId, destKey); - s3Client.abortMultipartUpload( - getRequestFactory().newAbortMultipartUploadRequestBuilder( - destKey, - uploadId).build()); + @Retries.OnceTranslated + public void abortMultipartUpload(String destKey, String uploadId) throws IOException { + LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey); + trackDuration(getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () -> + s3Client.abortMultipartUpload( + getRequestFactory().newAbortMultipartUploadRequestBuilder( + destKey, + uploadId).build())); } /** * Abort a multipart upload. * Retry policy: none. * @param upload the listed upload to abort. + * @throws IOException IO failure, including any uprated SdkException */ - @Retries.OnceRaw - void abortMultipartUpload(MultipartUpload upload) { - String destKey; - String uploadId; - destKey = upload.key(); - uploadId = upload.uploadId(); - if (LOG.isInfoEnabled()) { + @Retries.OnceTranslated + public void abortMultipartUpload(MultipartUpload upload) throws IOException { + String destKey = upload.key(); + String uploadId = upload.uploadId(); + if (LOG.isDebugEnabled()) { DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}", uploadId, destKey, upload.initiator(), df.format(Date.from(upload.initiated()))); } - s3Client.abortMultipartUpload( - getRequestFactory().newAbortMultipartUploadRequestBuilder( - destKey, - uploadId).build()); + abortMultipartUpload(destKey, uploadId); } /** @@ -5266,13 +5338,17 @@ public boolean hasPathCapability(final Path path, final String capability) case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE: return true; + // Do directory operations purge uploads. + case DIRECTORY_OPERATIONS_PURGE_UPLOADS: + return dirOperationsPurgeUploads; + // etags are avaialable in listings, but they // are not consistent across renames. // therefore, only availability is declared case CommonPathCapabilities.ETAGS_AVAILABLE: return true; - /* + /* * Marker policy capabilities are handed off. */ case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP: @@ -5545,7 +5621,7 @@ public MarkerToolOperations createMarkerToolOperations(final String target) throws IOException { createSpan("marker-tool-scan", target, null); - return new MarkerToolOperationsImpl(new OperationCallbacksImpl()); + return new MarkerToolOperationsImpl(new OperationCallbacksImpl(createStoreContext())); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index f4e28aa627..72fc75b642 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -242,7 +242,10 @@ public enum Statistic { StoreStatisticNames.OBJECT_MULTIPART_UPLOAD_ABORTED, "Object multipart upload aborted", TYPE_DURATION), - OBJECT_PUT_REQUESTS( + OBJECT_MULTIPART_UPLOAD_LIST( + StoreStatisticNames.OBJECT_MULTIPART_UPLOAD_LIST, + "Object multipart list request issued", + TYPE_DURATION), OBJECT_PUT_REQUESTS( StoreStatisticNames.OBJECT_PUT_REQUEST, "Object put/multipart upload count", TYPE_DURATION), diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java index 0156207419..e0580df08a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CallableSupplier.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -155,19 +156,21 @@ public static void waitForCompletion( * Wait for a single of future to complete, extracting IOEs afterwards. * @param future future to wait for. * @param type + * @return the result * @throws IOException if one of the called futures raised an IOE. * @throws RuntimeException if one of the futures raised one. */ - public static void waitForCompletion( + public static T waitForCompletion( final CompletableFuture future) throws IOException { try (DurationInfo ignore = new DurationInfo(LOG, false, "Waiting for task completion")) { - future.join(); + return future.join(); } catch (CancellationException e) { throw new IOException(e); } catch (CompletionException e) { raiseInnerCause(e); + return null; } } @@ -175,31 +178,35 @@ public static void waitForCompletion( * Wait for a single of future to complete, ignoring exceptions raised. * @param future future to wait for. * @param type + * @return the outcome if successfully retrieved. */ - public static void waitForCompletionIgnoringExceptions( + public static Optional waitForCompletionIgnoringExceptions( @Nullable final CompletableFuture future) { - if (future != null) { - try (DurationInfo ignore = - new DurationInfo(LOG, false, "Waiting for task completion")) { - future.join(); - } catch (Exception e) { - LOG.debug("Ignoring exception raised in task completion: "); - } + + try { + return maybeAwaitCompletion(future); + } catch (Exception e) { + LOG.debug("Ignoring exception raised in task completion: ", e); + return Optional.empty(); } } /** * Block awaiting completion for any non-null future passed in; * No-op if a null arg was supplied. + * @param return type * @param future future + * @return the outcome; is empty if the future was null/had no return value * @throws IOException if one of the called futures raised an IOE. * @throws RuntimeException if one of the futures raised one. */ - public static void maybeAwaitCompletion( - @Nullable final CompletableFuture future) + public static Optional maybeAwaitCompletion( + @Nullable final CompletableFuture future) throws IOException { if (future != null) { - waitForCompletion(future); + return Optional.ofNullable(waitForCompletion(future)); + } else { + return Optional.empty(); } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java index 314d7cb82d..11e73aeb75 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -41,6 +42,7 @@ import org.apache.hadoop.util.DurationInfo; +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; import static org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan; import static org.apache.hadoop.util.Preconditions.checkArgument; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.maybeAwaitCompletion; @@ -110,6 +112,16 @@ public class DeleteOperation extends ExecutingStoreOperation { */ private long filesDeleted; + /** + * Do directory operations purge pending uploads? + */ + private final boolean dirOperationsPurgeUploads; + + /** + * Count of uploads aborted. + */ + private Optional uploadsAborted = Optional.empty(); + /** * Constructor. * @param context store context @@ -117,12 +129,14 @@ public class DeleteOperation extends ExecutingStoreOperation { * @param recursive recursive delete? * @param callbacks callback provider * @param pageSize size of delete pages + * @param dirOperationsPurgeUploads Do directory operations purge pending uploads? */ public DeleteOperation(final StoreContext context, final S3AFileStatus status, final boolean recursive, final OperationCallbacks callbacks, - final int pageSize) { + final int pageSize, + final boolean dirOperationsPurgeUploads) { super(context); this.status = status; @@ -134,12 +148,22 @@ public DeleteOperation(final StoreContext context, this.pageSize = pageSize; executor = MoreExecutors.listeningDecorator( context.createThrottledExecutor(1)); + this.dirOperationsPurgeUploads = dirOperationsPurgeUploads; } public long getFilesDeleted() { return filesDeleted; } + /** + * Get the count of uploads aborted. + * Non-empty iff enabled, and the operations completed without errors. + * @return count of aborted uploads. + */ + public Optional getUploadsAborted() { + return uploadsAborted; + } + /** * Delete a file or directory tree. *

@@ -236,6 +260,17 @@ protected void deleteDirectoryTree(final Path path, try (DurationInfo ignored = new DurationInfo(LOG, false, "deleting %s", dirKey)) { + final CompletableFuture abortUploads; + if (dirOperationsPurgeUploads) { + final StoreContext sc = getStoreContext(); + final String key = sc.pathToKey(path) + "/"; + LOG.debug("All uploads under {} will be deleted", key); + abortUploads = submit(sc.getExecutor(), sc.getActiveAuditSpan(), () -> + callbacks.abortMultipartUploadsUnderPrefix(key)); + } else { + abortUploads = null; + } + // init the lists of keys and paths to delete resetDeleteList(); deleteFuture = null; @@ -257,10 +292,10 @@ protected void deleteDirectoryTree(final Path path, LOG.debug("Deleting final batch of listed files"); submitNextBatch(); maybeAwaitCompletion(deleteFuture); - + uploadsAborted = waitForCompletionIgnoringExceptions(abortUploads); } - LOG.debug("Delete \"{}\" completed; deleted {} objects", path, - filesDeleted); + LOG.debug("Delete \"{}\" completed; deleted {} objects and aborted {} uploads", path, + filesDeleted, uploadsAborted.orElse(0L)); } /** @@ -313,7 +348,8 @@ private void submitNextBatch() throws IOException { // delete a single page of keys and the metadata. // block for any previous batch. - maybeAwaitCompletion(deleteFuture); + maybeAwaitCompletion(deleteFuture).ifPresent(count -> + LOG.debug("Deleted {} uploads", count)); // delete the current page of keys and paths deleteFuture = submitDelete(keys); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java index e0d9c7c6aa..9c88870633 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java @@ -164,4 +164,16 @@ RemoteIterator listObjects( Path path, String key) throws IOException; + + /** + * Abort multipart uploads under a path; paged. + * @param prefix prefix for uploads to abort + * @return a count of aborts + * @throws IOException trouble; FileNotFoundExceptions are swallowed. + */ + @Retries.RetryTranslated + default long abortMultipartUploadsUnderPrefix(String prefix) + throws IOException { + return 0; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java index 4bb15f7496..288b3c0aae 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -44,6 +45,7 @@ import org.apache.hadoop.util.OperationDuration; import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; import static org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; @@ -124,9 +126,18 @@ public class RenameOperation extends ExecutingStoreOperation { private final List keysToDelete = new ArrayList<>(); + /** + * Do directory operations purge pending uploads? + */ + private final boolean dirOperationsPurgeUploads; + + /** + * Count of uploads aborted. + */ + private Optional uploadsAborted = Optional.empty(); + /** * Initiate the rename. - * * @param storeContext store context * @param sourcePath source path * @param sourceKey key of source @@ -136,6 +147,7 @@ public class RenameOperation extends ExecutingStoreOperation { * @param destStatus destination status. * @param callbacks callback provider * @param pageSize size of delete requests + * @param dirOperationsPurgeUploads Do directory operations purge pending uploads? */ public RenameOperation( final StoreContext storeContext, @@ -146,7 +158,8 @@ public RenameOperation( final String destKey, final S3AFileStatus destStatus, final OperationCallbacks callbacks, - final int pageSize) { + final int pageSize, + final boolean dirOperationsPurgeUploads) { super(storeContext); this.sourcePath = sourcePath; this.sourceKey = sourceKey; @@ -159,6 +172,16 @@ public RenameOperation( && pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE, "page size out of range: %s", pageSize); this.pageSize = pageSize; + this.dirOperationsPurgeUploads = dirOperationsPurgeUploads; + } + + /** + * Get the count of uploads aborted. + * Non-empty iff enabled, and the operations completed without errors. + * @return count of aborted uploads. + */ + public Optional getUploadsAborted() { + return uploadsAborted; } /** @@ -341,6 +364,16 @@ protected void recursiveDirectoryRename() throws IOException { throw new RenameFailedException(srcKey, dstKey, "cannot rename a directory to a subdirectory of itself "); } + // start the async dir cleanup + final CompletableFuture abortUploads; + if (dirOperationsPurgeUploads) { + final String key = srcKey; + LOG.debug("All uploads under {} will be deleted", key); + abortUploads = submit(getStoreContext().getExecutor(), () -> + callbacks.abortMultipartUploadsUnderPrefix(key)); + } else { + abortUploads = null; + } if (destStatus != null && destStatus.isEmptyDirectory() == Tristate.TRUE) { @@ -422,6 +455,8 @@ protected void recursiveDirectoryRename() throws IOException { // have been deleted. completeActiveCopiesAndDeleteSources("final copy and delete"); + // and if uploads were being aborted, wait for that to finish + uploadsAborted = waitForCompletionIgnoringExceptions(abortUploads); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java index 22fc630dad..ea1ea90848 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java @@ -47,8 +47,8 @@ import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.s3a.Constants; -import org.apache.hadoop.fs.s3a.MultipartUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.WriteOperationHelper; import org.apache.hadoop.fs.s3a.auth.RolePolicies; @@ -683,7 +683,7 @@ private void promptBeforeAbort(PrintStream out) throws IOException { private void processUploads(PrintStream out) throws IOException { final S3AFileSystem fs = getFilesystem(); - MultipartUtils.UploadIterator uploads = fs.listUploads(prefix); + RemoteIterator uploads = fs.listUploads(prefix); // create a span so that the write operation helper // is within one AuditSpan span = diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md index a7ea7b2e59..0216e46014 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md @@ -39,11 +39,12 @@ The features which may be unavailable include: * Optional Bucket Probes at startup (`fs.s3a.bucket.probe = 0`). This is now the default -do not change it. * List API to use (`fs.s3a.list.version = 1`) +* Bucket lifecycle rules to clean up pending uploads. ## Configuring s3a to connect to a third party store -### Connecting to a third party object store over HTTPS +## Connecting to a third party object store over HTTPS The core setting for a third party store is to change the endpoint in `fs.s3a.endpoint`. @@ -89,6 +90,57 @@ then these must be set, either in XML or (preferred) in a JCEKS file. If per-bucket settings are used here, then third-party stores and credentials may be used alongside an AWS store. + + +## Other issues + +### Coping without bucket lifecycle rules + +Not all third-party stores support bucket lifecycle rules to clean up buckets +of incomplete uploads. + +This can be addressed in two ways +* Command line: `hadoop s3guard uploads -abort -force \`. +* With `fs.s3a.multipart.purge` and a purge age set in `fs.s3a.multipart.purge.age` +* In rename/delete `fs.s3a.directory.operations.purge.uploads = true`. + +#### S3Guard uploads command + +This can be executed on a schedule, or manually + +``` +hadoop s3guard uploads -abort -force s3a://bucket/ +``` + +Consult the [S3Guard documentation](s3guard.html) for the full set of parameters. + +#### In startup: `fs.s3a.multipart.purge` + +This lists all uploads in a bucket when a filesystem is created and deletes +all of those above a certain age. + +This can hurt performance on a large bucket, as the purge scans the entire tree, +and is executed whenever a filesystem is created -which can happen many times during +hive, spark, distcp jobs. + +For this reason, this option may be deleted in future, however it has long been +available in the S3A client and so guaranteed to work across versions. + +#### During rename and delete: `fs.s3a.directory.operations.purge.uploads` + +When `fs.s3a.directory.operations.purge.uploads` is set, when a directory is renamed +or deleted, then in parallel with the delete an attempt is made to list +all pending uploads. +If there are any, they are aborted (sequentially). + +* This is disabled by default: it adds overhead and extra cost. +* Because it only applies to the directories being processed, directories which + are not renamed or deleted will retain all incomplete uploads. +* There is no age checking: all uploads will be aborted. +* If any other process is writing to the same directory tree, their operations +will be cancelled. + + # Troubleshooting The most common problem when talking to third-party stores are @@ -412,4 +464,5 @@ It is also a way to regression test foundational S3A third-party store compatibi ``` _Note_ If anyone is set up to test this reguarly, please let the hadoop developer team know if regressions do surface, -as it is not a common test configuration. \ No newline at end of file +as it is not a common test configuration. +[] \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java index 263a857e03..e0559b7c49 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.store.audit.AuditSpan; import java.io.IOException; @@ -76,7 +77,7 @@ public void testListMultipartUploads() throws Exception { // 2. Verify all uploads are found listing by prefix describe("Verifying upload list by prefix"); - MultipartUtils.UploadIterator uploads = fs.listUploads(getPartPrefix(fs)); + RemoteIterator uploads = fs.listUploads(getPartPrefix(fs)); assertUploadsPresent(uploads, keySet); // 3. Verify all uploads are found listing without prefix @@ -97,7 +98,7 @@ public void testListMultipartUploads() throws Exception { * @param ourUploads set up uploads that should be present * @throws IOException on I/O error */ - private void assertUploadsPresent(MultipartUtils.UploadIterator list, + private void assertUploadsPresent(RemoteIterator list, Set ourUploads) throws IOException { // Don't modify passed-in set, use copy. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java index 3e343a9ea8..3f6870be46 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.io.IOUtils; @@ -96,7 +97,7 @@ public static void clearAnyUploads(S3AFileSystem fs, Path path) { String key = fs.pathToKey(path); AuditSpan span = null; try { - MultipartUtils.UploadIterator uploads = fs.listUploads(key); + RemoteIterator uploads = fs.listUploads(key); span = fs.createSpan("multipart", path.toString(), null); final WriteOperationHelper helper = fs.getWriteOperationHelper(); @@ -118,7 +119,7 @@ public static void clearAnyUploads(S3AFileSystem fs, Path path) { public static void assertNoUploadsAt(S3AFileSystem fs, Path path) throws Exception { String key = fs.pathToKey(path); - MultipartUtils.UploadIterator uploads = fs.listUploads(key); + RemoteIterator uploads = fs.listUploads(key); while (uploads.hasNext()) { MultipartUpload upload = uploads.next(); Assert.fail("Found unexpected upload " + upload.key() + " " + @@ -130,7 +131,7 @@ public static void assertNoUploadsAt(S3AFileSystem fs, Path path) throws public static int countUploadsAt(S3AFileSystem fs, Path path) throws IOException { String key = fs.pathToKey(path); - MultipartUtils.UploadIterator uploads = fs.listUploads(key); + RemoteIterator uploads = fs.listUploads(key); int count = 0; while (uploads.hasNext()) { MultipartUpload upload = uploads.next(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java index 5534bb77c0..12234301b5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java @@ -27,6 +27,7 @@ import java.util.stream.IntStream; import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.services.s3.model.MultipartUpload; import software.amazon.awssdk.services.sts.model.StsException; import com.fasterxml.jackson.core.JsonProcessingException; import org.assertj.core.api.Assertions; @@ -40,10 +41,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.AWSBadRequestException; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; -import org.apache.hadoop.fs.s3a.MultipartUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestConstants; import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider; @@ -463,7 +464,7 @@ public void testReadOnlyOperations() throws Throwable { // list multipart uploads. // This is part of the read policy. int counter = 0; - MultipartUtils.UploadIterator iterator = roleFS.listUploads("/"); + RemoteIterator iterator = roleFS.listUploads("/"); while (iterator.hasNext()) { counter++; iterator.next(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java new file mode 100644 index 0000000000..9e07027375 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.MultipartUpload; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.store.audit.AuditSpan; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertFileHasLength; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities; +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; +import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_LIST; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX; +import static org.apache.hadoop.util.functional.RemoteIterators.toList; + +/** + * Test behavior of purging uploads in rename and delete. + */ +public class ITestUploadPurgeOnDirectoryOperations extends AbstractS3ACostTest { + + @Override + public Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + removeBaseAndBucketOverrides(conf, + DIRECTORY_OPERATIONS_PURGE_UPLOADS, + MAGIC_COMMITTER_ENABLED); + conf.setBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, true); + conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + final S3AFileSystem fs = getFileSystem(); + assertHasPathCapabilities(fs, new Path("/"), + DIRECTORY_OPERATIONS_PURGE_UPLOADS); + clearAnyUploads(fs, methodPath()); + } + + @Test + public void testDeleteWithPendingUpload() throws Throwable { + + final S3AFileSystem fs = getFileSystem(); + final Path dir = methodPath(); + + // create a magic file. + createMagicFile(fs, dir); + + // and there's a pending upload + assertUploadCount(dir, 1); + + // delete the dir, with a cost of 1 abort, 1 list. + verifyMetrics(() -> fs.delete(dir, true), + with(OBJECT_MULTIPART_UPLOAD_ABORTED, 1), // abort + with(OBJECT_MULTIPART_UPLOAD_LIST, 1), // HTTP request inside iterator + with(MULTIPART_UPLOAD_LIST, 0)); // api list call + + + // and the pending upload is gone + assertUploadCount(dir, 0); + } + + @Test + public void testRenameWithPendingUpload() throws Throwable { + + final S3AFileSystem fs = getFileSystem(); + final Path base = methodPath(); + final Path dir = new Path(base, "src"); + final Path dest = new Path(base, "dest"); + + // create a magic file. + createMagicFile(fs, dir); + + // and there's a pending upload + assertUploadCount(dir, 1); + + // rename the dir, with a cost of 1 abort, 1 list. + verifyMetrics(() -> fs.rename(dir, dest), + with(OBJECT_MULTIPART_UPLOAD_ABORTED, 1), // abort + with(OBJECT_MULTIPART_UPLOAD_LIST, 1), // HTTP request inside iterator + with(MULTIPART_UPLOAD_LIST, 0)); // api list call + + // and there isn't + assertUploadCount(dir, 0); + } + + /** + * Create a magic file of "real" length more than 0 bytes long. + * @param fs filesystem + * @param dir directory + * @return the path + * @throws IOException creation failure.p + */ + private static Path createMagicFile(final S3AFileSystem fs, final Path dir) throws IOException { + Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "001/file.txt"); + createFile(fs, magicFile, true, "123".getBytes(StandardCharsets.UTF_8)); + + // the file exists but is a 0 byte marker file. + assertFileHasLength(fs, magicFile, 0); + return magicFile; + } + + /** + * Assert the upload count under a dir is the expected value. + * Failure message will include the list of entries. + * @param dir dir + * @param expected expected count + * @throws IOException listing problem + */ + private void assertUploadCount(final Path dir, final int expected) throws IOException { + Assertions.assertThat(toList(listUploads(dir))) + .describedAs("uploads under %s", dir) + .hasSize(expected); + } + + /** + * List uploads; use the same APIs that the directory operations use, + * so implicitly validating them. + * @param dir directory to list + * @return full list of entries + * @throws IOException listing problem + */ + private RemoteIterator listUploads(Path dir) throws IOException { + final S3AFileSystem fs = getFileSystem(); + try (AuditSpan ignored = span()) { + final StoreContext sc = fs.createStoreContext(); + return fs.listUploadsUnderPrefix(sc, sc.pathToKey(dir)); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java index 48378ce75d..e37717bfa1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java @@ -91,6 +91,13 @@ protected AbstractS3ACostTest( this.keepMarkers = keepMarkers; } + /** + * Constructor with markers kept. + */ + public AbstractS3ACostTest() { + this(true); + } + @Override public Configuration createConfiguration() { Configuration conf = super.createConfiguration(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java index 844230e8be..28bc2a246a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardTool.java @@ -97,22 +97,22 @@ public void testStoreInfo() throws Throwable { LOG.info("Exec output=\n{}", output); } - private final static String UPLOAD_PREFIX = "test-upload-prefix"; private final static String UPLOAD_NAME = "test-upload"; @Test public void testUploads() throws Throwable { S3AFileSystem fs = getFileSystem(); - Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME); + Path path = methodPath(); + Path file = new Path(path, UPLOAD_NAME); describe("Cleaning up any leftover uploads from previous runs."); - final String key = fs.pathToKey(path); + final String key = fs.pathToKey(file); try { // 1. Make sure key doesn't already exist clearAnyUploads(fs, path); // 2. Confirm no uploads are listed via API - assertNoUploadsAt(fs, path.getParent()); + assertNoUploadsAt(fs, path); // 3. Confirm no uploads are listed via CLI describe("Confirming CLI lists nothing."); @@ -127,8 +127,6 @@ public void testUploads() throws Throwable { // 6. Confirm part exists via CLI, direct path and parent path describe("Confirming CLI lists one part"); assertNumUploads(path, 1); - assertNumUploads(path.getParent(), 1); - // 7. Use CLI to delete part, assert it worked describe("Deleting part via CLI"); assertNumDeleted(fs, path, 1); @@ -150,22 +148,23 @@ public void testUploads() throws Throwable { @Test public void testUploadListByAge() throws Throwable { S3AFileSystem fs = getFileSystem(); - Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME); + Path path = methodPath(); + Path file = new Path(path, UPLOAD_NAME); describe("Cleaning up any leftover uploads from previous runs."); + // 1. Make sure key doesn't already exist clearAnyUploads(fs, path); // 2. Create a upload part describe("Uploading single part."); - final String key = fs.pathToKey(path); + final String key = fs.pathToKey(file); createPartUpload(fs, key, 128, 1); //try (AuditSpan span = fs.startOperation("multipart", key, null)) { try { - // 3. Confirm it exists via API.. may want to wrap with - // LambdaTestUtils.eventually() ? + // 3. Confirm it exists via API assertEquals("Should be one upload", 1, countUploadsAt(fs, path)); // 4. Confirm part does appear in listing with long age filter