From 47be1ab3b68b987ed8ab349fc351f438c00d9871 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Mon, 20 May 2024 11:05:25 -0500 Subject: [PATCH] HADOOP-18679. Add API for bulk/paged delete of files (#6726) Applications can create a BulkDelete instance from a BulkDeleteSource; the BulkDelete interface provides the pageSize(): the maximum number of entries which can be deleted, and a bulkDelete(Collection paths) method which can take a collection up to pageSize() long. This is optimized for object stores with bulk delete APIs; the S3A connector will offer the page size of fs.s3a.bulk.delete.page.size unless bulk delete has been disabled. Even with a page size of 1, the S3A implementation is more efficient than delete(path) as there are no safety checks for the path being a directory or probes for the need to recreate directories. The interface BulkDeleteSource is implemented by all FileSystem implementations, with a page size of 1 and mapped to delete(pathToDelete, false). This means that callers do not need to have special case handling for object stores versus classic filesystems. To aid use through reflection APIs, the class org.apache.hadoop.io.wrappedio.WrappedIO has been created with "reflection friendly" methods. Contributed by Mukund Thakur and Steve Loughran --- .../java/org/apache/hadoop/fs/BulkDelete.java | 90 ++++ .../apache/hadoop/fs/BulkDeleteSource.java | 53 +++ .../org/apache/hadoop/fs/BulkDeleteUtils.java | 66 +++ .../hadoop/fs/CommonPathCapabilities.java | 6 + .../java/org/apache/hadoop/fs/FileSystem.java | 34 +- .../fs/impl/DefaultBulkDeleteOperation.java | 97 +++++ .../fs/statistics/StoreStatisticNames.java | 6 + .../apache/hadoop/io/wrappedio/WrappedIO.java | 93 ++++ .../apache/hadoop/util/functional/Tuples.java | 87 ++++ .../site/markdown/filesystem/bulkdelete.md | 139 ++++++ .../src/site/markdown/filesystem/index.md | 3 +- .../AbstractContractBulkDeleteTest.java | 336 +++++++++++++++ .../TestLocalFSContractBulkDelete.java | 34 ++ .../TestRawLocalContractBulkDelete.java | 35 ++ .../hdfs/TestHDFSContractBulkDelete.java | 49 +++ .../org/apache/hadoop/fs/s3a/Constants.java | 12 + .../apache/hadoop/fs/s3a/S3AFileSystem.java | 183 ++++---- .../apache/hadoop/fs/s3a/S3AInternals.java | 12 +- .../org/apache/hadoop/fs/s3a/S3AStore.java | 129 ++++++ .../org/apache/hadoop/fs/s3a/Statistic.java | 8 + .../fs/s3a/impl/BulkDeleteOperation.java | 128 ++++++ .../BulkDeleteOperationCallbacksImpl.java | 125 ++++++ .../s3a/impl/MultiObjectDeleteException.java | 20 +- .../hadoop/fs/s3a/impl/S3AStoreBuilder.java | 113 +++++ .../hadoop/fs/s3a/impl/S3AStoreImpl.java | 400 ++++++++++++++++++ .../fs/s3a/impl/StoreContextFactory.java | 35 ++ .../tools/hadoop-aws/aws_sdk_upgrade.md | 1 + .../markdown/tools/hadoop-aws/performance.md | 82 +++- .../s3a/ITestS3AContractBulkDelete.java | 230 ++++++++++ .../hadoop/fs/s3a/AbstractS3AMockTest.java | 3 +- .../hadoop/fs/s3a/TestS3ADeleteOnExit.java | 3 +- .../hadoop/fs/s3a/auth/ITestAssumeRole.java | 133 +++++- .../s3a/scale/AbstractSTestS3AHugeFiles.java | 2 + .../contract/ITestAbfsContractBulkDelete.java | 50 +++ .../src/test/resources/log4j.properties | 1 + 35 files changed, 2679 insertions(+), 119 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteUtils.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/DefaultBulkDeleteOperation.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Tuples.java create mode 100644 hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractBulkDelete.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractBulkDelete.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractBulkDelete.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractBulkDelete.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java new file mode 100644 index 0000000000..ab5f73b562 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; + +import static java.util.Objects.requireNonNull; + +/** + * API for bulk deletion of objects/files, + * but not directories. + * After use, call {@code close()} to release any resources and + * to guarantee store IOStatistics are updated. + *

+ * Callers MUST have no expectation that parent directories will exist after the + * operation completes; if an object store needs to explicitly look for and create + * directory markers, that step will be omitted. + *

+ * Be aware that on some stores (AWS S3) each object listed in a bulk delete counts + * against the write IOPS limit; large page sizes are counterproductive here, as + * are attempts at parallel submissions across multiple threads. + * @see HADOOP-16823. + * Large DeleteObject requests are their own Thundering Herd + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDelete extends IOStatisticsSource, Closeable { + + /** + * The maximum number of objects/files to delete in a single request. + * @return a number greater than zero. + */ + int pageSize(); + + /** + * Base path of a bulk delete operation. + * All paths submitted in {@link #bulkDelete(Collection)} must be under this path. + * @return base path of a bulk delete operation. + */ + Path basePath(); + + /** + * Delete a list of files/objects. + *

+ * @param paths list of paths which must be absolute and under the base path. + * provided in {@link #basePath()}. + * @return a list of paths which failed to delete, with the exception message. + * @throws IOException IO problems including networking, authentication and more. + * @throws IllegalArgumentException if a path argument is invalid. + */ + List> bulkDelete(Collection paths) + throws IOException, IllegalArgumentException; + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java new file mode 100644 index 0000000000..cad24babb3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface for bulk deletion. + * Filesystems which support bulk deletion should implement this interface + * and MUST also declare their support in the path capability + * {@link CommonPathCapabilities#BULK_DELETE}. + * Exporting the interface does not guarantee that the operation is supported; + * returning a {@link BulkDelete} object from the call {@link #createBulkDelete(Path)} + * is. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDeleteSource { + + /** + * Create a bulk delete operation. + * There is no network IO at this point, simply the creation of + * a bulk delete object. + * A path must be supplied to assist in link resolution. + * @param path path to delete under. + * @return the bulk delete. + * @throws UnsupportedOperationException bulk delete under that path is not supported. + * @throws IllegalArgumentException path not valid. + * @throws IOException problems resolving paths + */ + BulkDelete createBulkDelete(Path path) + throws UnsupportedOperationException, IllegalArgumentException, IOException; + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteUtils.java new file mode 100644 index 0000000000..d991642942 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteUtils.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.util.Collection; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * Utility class for bulk delete operations. + */ +public final class BulkDeleteUtils { + + private BulkDeleteUtils() { + } + + /** + * Preconditions for bulk delete paths. + * @param paths paths to delete. + * @param pageSize maximum number of paths to delete in a single operation. + * @param basePath base path for the delete operation. + */ + public static void validateBulkDeletePaths(Collection paths, int pageSize, Path basePath) { + requireNonNull(paths); + checkArgument(paths.size() <= pageSize, + "Number of paths (%d) is larger than the page size (%d)", paths.size(), pageSize); + paths.forEach(p -> { + checkArgument(p.isAbsolute(), "Path %s is not absolute", p); + checkArgument(validatePathIsUnderParent(p, basePath), + "Path %s is not under the base path %s", p, basePath); + }); + } + + /** + * Check if a path is under a base path. + * @param p path to check. + * @param basePath base path. + * @return true if the path is under the base path. + */ + public static boolean validatePathIsUnderParent(Path p, Path basePath) { + while (p.getParent() != null) { + if (p.getParent().equals(basePath)) { + return true; + } + p = p.getParent(); + } + return false; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java index 9ec07cbe96..2005f0ae3b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java @@ -181,4 +181,10 @@ private CommonPathCapabilities() { */ public static final String DIRECTORY_LISTING_INCONSISTENT = "fs.capability.directory.listing.inconsistent"; + + /** + * Capability string to probe for bulk delete: {@value}. + */ + public static final String BULK_DELETE = "fs.capability.bulk.delete"; + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 768fd5b5e1..2155e17328 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.Options.HandleOpt; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; +import org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; @@ -169,7 +170,8 @@ @InterfaceAudience.Public @InterfaceStability.Stable public abstract class FileSystem extends Configured - implements Closeable, DelegationTokenIssuer, PathCapabilities { + implements Closeable, DelegationTokenIssuer, + PathCapabilities, BulkDeleteSource { public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeys.FS_DEFAULT_NAME_KEY; public static final String DEFAULT_FS = @@ -3485,12 +3487,16 @@ public Collection getTrashRoots(boolean allUsers) { public boolean hasPathCapability(final Path path, final String capability) throws IOException { switch (validatePathCapabilityArgs(makeQualified(path), capability)) { - case CommonPathCapabilities.FS_SYMLINKS: - // delegate to the existing supportsSymlinks() call. - return supportsSymlinks() && areSymlinksEnabled(); - default: - // the feature is not implemented. - return false; + case CommonPathCapabilities.BULK_DELETE: + // bulk delete has default implementation which + // can called on any FileSystem. + return true; + case CommonPathCapabilities.FS_SYMLINKS: + // delegate to the existing supportsSymlinks() call. + return supportsSymlinks() && areSymlinksEnabled(); + default: + // the feature is not implemented. + return false; } } @@ -4976,4 +4982,18 @@ public MultipartUploaderBuilder createMultipartUploader(Path basePath) methodNotSupported(); return null; } + + /** + * Create a bulk delete operation. + * The default implementation returns an instance of {@link DefaultBulkDeleteOperation}. + * @param path base path for the operation. + * @return an instance of the bulk delete. + * @throws IllegalArgumentException any argument is invalid. + * @throws IOException if there is an IO problem. + */ + @Override + public BulkDelete createBulkDelete(Path path) + throws IllegalArgumentException, IOException { + return new DefaultBulkDeleteOperation(path, this); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/DefaultBulkDeleteOperation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/DefaultBulkDeleteOperation.java new file mode 100644 index 0000000000..56f6a4622f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/DefaultBulkDeleteOperation.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.functional.Tuples; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.BulkDeleteUtils.validateBulkDeletePaths; + +/** + * Default implementation of the {@link BulkDelete} interface. + */ +public class DefaultBulkDeleteOperation implements BulkDelete { + + private static Logger LOG = LoggerFactory.getLogger(DefaultBulkDeleteOperation.class); + + /** Default page size for bulk delete. */ + private static final int DEFAULT_PAGE_SIZE = 1; + + /** Base path for the bulk delete operation. */ + private final Path basePath; + + /** Delegate File system make actual delete calls. */ + private final FileSystem fs; + + public DefaultBulkDeleteOperation(Path basePath, + FileSystem fs) { + this.basePath = requireNonNull(basePath); + this.fs = fs; + } + + @Override + public int pageSize() { + return DEFAULT_PAGE_SIZE; + } + + @Override + public Path basePath() { + return basePath; + } + + /** + * {@inheritDoc}. + * The default impl just calls {@code FileSystem.delete(path, false)} + * on the single path in the list. + */ + @Override + public List> bulkDelete(Collection paths) + throws IOException, IllegalArgumentException { + validateBulkDeletePaths(paths, DEFAULT_PAGE_SIZE, basePath); + List> result = new ArrayList<>(); + if (!paths.isEmpty()) { + // As the page size is always 1, this should be the only one + // path in the collection. + Path pathToDelete = paths.iterator().next(); + try { + fs.delete(pathToDelete, false); + } catch (IOException ex) { + LOG.debug("Couldn't delete {} - exception occurred: {}", pathToDelete, ex); + result.add(Tuples.pair(pathToDelete, ex.toString())); + } + } + return result; + } + + @Override + public void close() throws IOException { + + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java index 19ee9d1414..a513cffd84 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java @@ -46,6 +46,9 @@ public final class StoreStatisticNames { /** {@value}. */ public static final String OP_APPEND = "op_append"; + /** {@value}. */ + public static final String OP_BULK_DELETE = "op_bulk-delete"; + /** {@value}. */ public static final String OP_COPY_FROM_LOCAL_FILE = "op_copy_from_local_file"; @@ -194,6 +197,9 @@ public final class StoreStatisticNames { public static final String STORE_IO_RETRY = "store_io_retry"; + public static final String STORE_IO_RATE_LIMITED_DURATION + = "store_io_rate_limited_duration"; + /** * A store's equivalent of a paged LIST request was initiated: {@value}. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java new file mode 100644 index 0000000000..696055895a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.wrappedio; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * Reflection-friendly access to APIs which are not available in + * some of the older Hadoop versions which libraries still + * compile against. + *

+ * The intent is to avoid the need for complex reflection operations + * including wrapping of parameter classes, direct instatiation of + * new classes etc. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class WrappedIO { + + private WrappedIO() { + } + + /** + * Get the maximum number of objects/files to delete in a single request. + * @param fs filesystem + * @param path path to delete under. + * @return a number greater than or equal to zero. + * @throws UnsupportedOperationException bulk delete under that path is not supported. + * @throws IllegalArgumentException path not valid. + * @throws IOException problems resolving paths + */ + public static int bulkDelete_PageSize(FileSystem fs, Path path) throws IOException { + try (BulkDelete bulk = fs.createBulkDelete(path)) { + return bulk.pageSize(); + } + } + + /** + * Delete a list of files/objects. + *

    + *
  • Files must be under the path provided in {@code base}.
  • + *
  • The size of the list must be equal to or less than the page size.
  • + *
  • Directories are not supported; the outcome of attempting to delete + * directories is undefined (ignored; undetected, listed as failures...).
  • + *
  • The operation is not atomic.
  • + *
  • The operation is treated as idempotent: network failures may + * trigger resubmission of the request -any new objects created under a + * path in the list may then be deleted.
  • + *
  • There is no guarantee that any parent directories exist after this call. + *
  • + *
+ * @param fs filesystem + * @param base path to delete under. + * @param paths list of paths which must be absolute and under the base path. + * @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message. + * @throws UnsupportedOperationException bulk delete under that path is not supported. + * @throws IOException IO problems including networking, authentication and more. + * @throws IllegalArgumentException if a path argument is invalid. + */ + public static List> bulkDelete_delete(FileSystem fs, + Path base, + Collection paths) + throws IOException { + try (BulkDelete bulk = fs.createBulkDelete(base)) { + return bulk.bulkDelete(paths); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Tuples.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Tuples.java new file mode 100644 index 0000000000..ed80c1daca --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Tuples.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.functional; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Tuple support. + * This allows for tuples to be passed around as part of the public API without + * committing to a third-party library tuple implementation. + */ +@InterfaceStability.Unstable +public final class Tuples { + + private Tuples() { + } + + /** + * Create a 2-tuple. + * @param key element 1 + * @param value element 2 + * @return a tuple. + * @param element 1 type + * @param element 2 type + */ + public static Map.Entry pair(final K key, final V value) { + return new Tuple<>(key, value); + } + + /** + * Simple tuple class: uses the Map.Entry interface as other + * implementations have done, so the API is available across + * all java versions. + * @param key + * @param value + */ + private static final class Tuple implements Map.Entry { + + private final K key; + + private final V value; + + private Tuple(final K key, final V value) { + this.key = key; + this.value = value; + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return value; + } + + @Override + public V setValue(final V value) { + throw new UnsupportedOperationException("Tuple is immutable"); + } + + @Override + public String toString() { + return "(" + key + ", " + value + ')'; + } + + } +} diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md new file mode 100644 index 0000000000..de0e4e893b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md @@ -0,0 +1,139 @@ + + +# interface `BulkDelete` + + + +The `BulkDelete` interface provides an API to perform bulk delete of files/objects +in an object store or filesystem. + +## Key Features + +* An API for submitting a list of paths to delete. +* This list must be no larger than the "page size" supported by the client; This size is also exposed as a method. +* Triggers a request to delete files at the specific paths. +* Returns a list of which paths were reported as delete failures by the store. +* Does not consider a nonexistent file to be a failure. +* Does not offer any atomicity guarantees. +* Idempotency guarantees are weak: retries may delete files newly created by other clients. +* Provides no guarantees as to the outcome if a path references a directory. +* Provides no guarantees that parent directories will exist after the call. + + +The API is designed to match the semantics of the AWS S3 [Bulk Delete](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) REST API call, but it is not +exclusively restricted to this store. This is why the "provides no guarantees" +restrictions do not state what the outcome will be when executed on other stores. + +### Interface `org.apache.hadoop.fs.BulkDeleteSource` + +The interface `BulkDeleteSource` is offered by a FileSystem/FileContext class if +it supports the API. The default implementation is implemented in base FileSystem +class that returns an instance of `org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation`. +The default implementation details are provided in below sections. + + +```java +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDeleteSource { + BulkDelete createBulkDelete(Path path) + throws UnsupportedOperationException, IllegalArgumentException, IOException; + +} + +``` + +### Interface `org.apache.hadoop.fs.BulkDelete` + +This is the bulk delete implementation returned by the `createBulkDelete()` call. + +```java +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDelete extends IOStatisticsSource, Closeable { + int pageSize(); + Path basePath(); + List> bulkDelete(List paths) + throws IOException, IllegalArgumentException; + +} + +``` + +### `bulkDelete(paths)` + +#### Preconditions + +```python +if length(paths) > pageSize: throw IllegalArgumentException +``` + +#### Postconditions + +All paths which refer to files are removed from the set of files. +```python +FS'Files = FS.Files - [paths] +``` + +No other restrictions are placed upon the outcome. + + +### Availability + +The `BulkDeleteSource` interface is exported by `FileSystem` and `FileContext` storage clients +which is available for all FS via `org.apache.hadoop.fs.impl.DefaultBulkDeleteSource`. For +integration in applications like Apache Iceberg to work seamlessly, all implementations +of this interface MUST NOT reject the request but instead return a BulkDelete instance +of size >= 1. + +Use the `PathCapabilities` probe `fs.capability.bulk.delete`. + +```java +store.hasPathCapability(path, "fs.capability.bulk.delete") +``` + +### Invocation through Reflection. + +The need for many libraries to compile against very old versions of Hadoop +means that most of the cloud-first Filesystem API calls cannot be used except +through reflection -And the more complicated The API and its data types are, +The harder that reflection is to implement. + +To assist this, the class `org.apache.hadoop.io.wrappedio.WrappedIO` has few methods +which are intended to provide simple access to the API, especially +through reflection. + +```java + + public static int bulkDeletePageSize(FileSystem fs, Path path) throws IOException; + + public static int bulkDeletePageSize(FileSystem fs, Path path) throws IOException; + + public static List> bulkDelete(FileSystem fs, Path base, Collection paths); +``` + +### Implementations + +#### Default Implementation + +The default implementation which will be used by all implementation of `FileSystem` of the +`BulkDelete` interface is `org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation` which fixes the page +size to be 1 and calls `FileSystem.delete(path, false)` on the single path in the list. + + +#### S3A Implementation +The S3A implementation is `org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation` which implements the +multi object delete semantics of the AWS S3 API [Bulk Delete](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) +For more details please refer to the S3A Performance documentation. \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md index df39839e83..be72f35789 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md @@ -43,4 +43,5 @@ HDFS as these are commonly expected by Hadoop client applications. 1. [IOStatistics](iostatistics.html) 1. [openFile()](openfile.html) 1. [SafeMode](safemode.html) -1. [LeaseRecoverable](leaserecoverable.html) \ No newline at end of file +1. [LeaseRecoverable](leaserecoverable.html) +1. [BulkDelete](bulkdelete.html) \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java new file mode 100644 index 0000000000..9ebf9923f3 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.CommonPathCapabilities; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.wrappedio.WrappedIO; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; +import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete_delete; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Contract tests for bulk delete operation. + */ +public abstract class AbstractContractBulkDeleteTest extends AbstractFSContractTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractContractBulkDeleteTest.class); + + /** + * Page size for bulk delete. This is calculated based + * on the store implementation. + */ + protected int pageSize; + + /** + * Base path for the bulk delete tests. + * All the paths to be deleted should be under this base path. + */ + protected Path basePath; + + /** + * Test file system. + */ + protected FileSystem fs; + + @Before + public void setUp() throws Exception { + fs = getFileSystem(); + basePath = path(getClass().getName()); + pageSize = WrappedIO.bulkDelete_PageSize(getFileSystem(), basePath); + fs.mkdirs(basePath); + } + + public Path getBasePath() { + return basePath; + } + + protected int getExpectedPageSize() { + return 1; + } + + /** + * Validate the page size for bulk delete operation. Different stores can have different + * implementations for bulk delete operation thus different page size. + */ + @Test + public void validatePageSize() throws Exception { + Assertions.assertThat(pageSize) + .describedAs("Page size should be 1 by default for all stores") + .isEqualTo(getExpectedPageSize()); + } + + @Test + public void testPathsSizeEqualsPageSizePrecondition() throws Exception { + List listOfPaths = createListOfPaths(pageSize, basePath); + // Bulk delete call should pass with no exception. + bulkDelete_delete(getFileSystem(), basePath, listOfPaths); + } + + @Test + public void testPathsSizeGreaterThanPageSizePrecondition() throws Exception { + List listOfPaths = createListOfPaths(pageSize + 1, basePath); + intercept(IllegalArgumentException.class, + () -> bulkDelete_delete(getFileSystem(), basePath, listOfPaths)); + } + + @Test + public void testPathsSizeLessThanPageSizePrecondition() throws Exception { + List listOfPaths = createListOfPaths(pageSize - 1, basePath); + // Bulk delete call should pass with no exception. + bulkDelete_delete(getFileSystem(), basePath, listOfPaths); + } + + @Test + public void testBulkDeleteSuccessful() throws Exception { + runBulkDelete(false); + } + + @Test + public void testBulkDeleteSuccessfulUsingDirectFS() throws Exception { + runBulkDelete(true); + } + + private void runBulkDelete(boolean useDirectFS) throws IOException { + List listOfPaths = createListOfPaths(pageSize, basePath); + for (Path path : listOfPaths) { + touch(fs, path); + } + FileStatus[] fileStatuses = fs.listStatus(basePath); + Assertions.assertThat(fileStatuses) + .describedAs("File count after create") + .hasSize(pageSize); + if (useDirectFS) { + assertSuccessfulBulkDelete( + fs.createBulkDelete(basePath).bulkDelete(listOfPaths)); + } else { + // Using WrappedIO to call bulk delete. + assertSuccessfulBulkDelete( + bulkDelete_delete(getFileSystem(), basePath, listOfPaths)); + } + + FileStatus[] fileStatusesAfterDelete = fs.listStatus(basePath); + Assertions.assertThat(fileStatusesAfterDelete) + .describedAs("File statuses should be empty after delete") + .isEmpty(); + } + + + @Test + public void validatePathCapabilityDeclared() throws Exception { + Assertions.assertThat(fs.hasPathCapability(basePath, CommonPathCapabilities.BULK_DELETE)) + .describedAs("Path capability BULK_DELETE should be declared") + .isTrue(); + } + + /** + * This test should fail as path is not under the base path. + */ + @Test + public void testDeletePathsNotUnderBase() throws Exception { + List paths = new ArrayList<>(); + Path pathNotUnderBase = path("not-under-base"); + paths.add(pathNotUnderBase); + intercept(IllegalArgumentException.class, + () -> bulkDelete_delete(getFileSystem(), basePath, paths)); + } + + /** + * This test should fail as path is not absolute. + */ + @Test + public void testDeletePathsNotAbsolute() throws Exception { + List paths = new ArrayList<>(); + Path pathNotAbsolute = new Path("not-absolute"); + paths.add(pathNotAbsolute); + intercept(IllegalArgumentException.class, + () -> bulkDelete_delete(getFileSystem(), basePath, paths)); + } + + @Test + public void testDeletePathsNotExists() throws Exception { + List paths = new ArrayList<>(); + Path pathNotExists = new Path(basePath, "not-exists"); + paths.add(pathNotExists); + // bulk delete call doesn't verify if a path exist or not before deleting. + assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths)); + } + + @Test + public void testDeletePathsDirectory() throws Exception { + List paths = new ArrayList<>(); + Path dirPath = new Path(basePath, "dir"); + paths.add(dirPath); + Path filePath = new Path(dirPath, "file"); + paths.add(filePath); + pageSizePreconditionForTest(paths.size()); + fs.mkdirs(dirPath); + touch(fs, filePath); + // Outcome is undefined. But call shouldn't fail. + assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths)); + } + + @Test + public void testBulkDeleteParentDirectoryWithDirectories() throws Exception { + List paths = new ArrayList<>(); + Path dirPath = new Path(basePath, "dir"); + fs.mkdirs(dirPath); + Path subDir = new Path(dirPath, "subdir"); + fs.mkdirs(subDir); + // adding parent directory to the list of paths. + paths.add(dirPath); + List> entries = bulkDelete_delete(getFileSystem(), basePath, paths); + Assertions.assertThat(entries) + .describedAs("Parent non empty directory should not be deleted") + .hasSize(1); + // During the bulk delete operation, the non-empty directories are not deleted in default implementation. + assertIsDirectory(dirPath); + } + + @Test + public void testBulkDeleteParentDirectoryWithFiles() throws Exception { + List paths = new ArrayList<>(); + Path dirPath = new Path(basePath, "dir"); + fs.mkdirs(dirPath); + Path file = new Path(dirPath, "file"); + touch(fs, file); + // adding parent directory to the list of paths. + paths.add(dirPath); + List> entries = bulkDelete_delete(getFileSystem(), basePath, paths); + Assertions.assertThat(entries) + .describedAs("Parent non empty directory should not be deleted") + .hasSize(1); + // During the bulk delete operation, the non-empty directories are not deleted in default implementation. + assertIsDirectory(dirPath); + } + + + @Test + public void testDeleteEmptyDirectory() throws Exception { + List paths = new ArrayList<>(); + Path emptyDirPath = new Path(basePath, "empty-dir"); + fs.mkdirs(emptyDirPath); + paths.add(emptyDirPath); + // Should pass as empty directory. + assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths)); + } + + @Test + public void testDeleteEmptyList() throws Exception { + List paths = new ArrayList<>(); + // Empty list should pass. + assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths)); + } + + @Test + public void testDeleteSamePathsMoreThanOnce() throws Exception { + List paths = new ArrayList<>(); + Path path = new Path(basePath, "file"); + paths.add(path); + paths.add(path); + Path another = new Path(basePath, "another-file"); + paths.add(another); + pageSizePreconditionForTest(paths.size()); + touch(fs, path); + touch(fs, another); + assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths)); + } + + /** + * Skip test if paths size is greater than page size. + */ + protected void pageSizePreconditionForTest(int size) { + if (size > pageSize) { + skip("Test requires paths size less than or equal to page size: " + pageSize); + } + } + + /** + * This test validates that files to be deleted don't have + * to be direct children of the base path. + */ + @Test + public void testDeepDirectoryFilesDelete() throws Exception { + List paths = new ArrayList<>(); + Path dir1 = new Path(basePath, "dir1"); + Path dir2 = new Path(dir1, "dir2"); + Path dir3 = new Path(dir2, "dir3"); + fs.mkdirs(dir3); + Path file1 = new Path(dir3, "file1"); + touch(fs, file1); + paths.add(file1); + assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths)); + } + + + @Test + public void testChildPaths() throws Exception { + List paths = new ArrayList<>(); + Path dirPath = new Path(basePath, "dir"); + fs.mkdirs(dirPath); + paths.add(dirPath); + Path filePath = new Path(dirPath, "file"); + touch(fs, filePath); + paths.add(filePath); + pageSizePreconditionForTest(paths.size()); + // Should pass as both paths are under the base path. + assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths)); + } + + + /** + * Assert on returned entries after bulk delete operation. + * Entries should be empty after successful delete. + */ + public static void assertSuccessfulBulkDelete(List> entries) { + Assertions.assertThat(entries) + .describedAs("Bulk delete failed, " + + "return entries should be empty after successful delete") + .isEmpty(); + } + + /** + * Create a list of paths with the given count + * under the given base path. + */ + private List createListOfPaths(int count, Path basePath) { + List paths = new ArrayList<>(); + for (int i = 0; i < count; i++) { + Path path = new Path(basePath, "file-" + i); + paths.add(path); + } + return paths; + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractBulkDelete.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractBulkDelete.java new file mode 100644 index 0000000000..f1bd641806 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractBulkDelete.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.localfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Bulk delete contract tests for the local filesystem. + */ +public class TestLocalFSContractBulkDelete extends AbstractContractBulkDeleteTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new LocalFSContract(conf); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractBulkDelete.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractBulkDelete.java new file mode 100644 index 0000000000..46d98249ab --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractBulkDelete.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.rawlocal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Bulk delete contract tests for the raw local filesystem. + */ +public class TestRawLocalContractBulkDelete extends AbstractContractBulkDeleteTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RawlocalFSContract(conf); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractBulkDelete.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractBulkDelete.java new file mode 100644 index 0000000000..3a851b6ff1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractBulkDelete.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.hdfs; + +import java.io.IOException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +/** + * Bulk delete contract tests for the HDFS filesystem. + */ +public class TestHDFSContractBulkDelete extends AbstractContractBulkDeleteTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new HDFSContract(conf); + } + + @BeforeClass + public static void createCluster() throws IOException { + HDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + HDFSContract.destroyCluster(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 67df37e5eb..185389739c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1641,4 +1641,16 @@ private Constants() { */ public static final String AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED = "fs.s3a.access.grants.fallback.to.iam"; + /** + * Default value for {@link #S3A_IO_RATE_LIMIT}. + * Value: {@value}. + * 0 means no rate limiting. + */ + public static final int DEFAULT_S3A_IO_RATE_LIMIT = 0; + + /** + * Config to set the rate limit for S3A IO operations. + * Value: {@value}. + */ + public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit"; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 0e2ae0f74d..d04ca70a68 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -81,7 +81,6 @@ import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; -import software.amazon.awssdk.services.s3.model.S3Error; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.model.StorageClass; import software.amazon.awssdk.services.s3.model.UploadPartRequest; @@ -103,6 +102,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; import org.apache.hadoop.fs.CommonPathCapabilities; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -120,7 +120,8 @@ import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; import org.apache.hadoop.fs.s3a.impl.AWSCannedACL; import org.apache.hadoop.fs.s3a.impl.AWSHeaders; -import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler; +import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation; +import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper; import org.apache.hadoop.fs.s3a.impl.ContextAccessors; @@ -141,9 +142,11 @@ import org.apache.hadoop.fs.s3a.impl.RenameOperation; import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl; import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder; +import org.apache.hadoop.fs.s3a.impl.S3AStoreBuilder; import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; +import org.apache.hadoop.fs.s3a.impl.StoreContextFactory; import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl; @@ -162,10 +165,6 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.DelegationTokenIssuer; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.DurationInfo; -import org.apache.hadoop.util.LambdaUtils; -import org.apache.hadoop.util.Lists; -import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -203,10 +202,15 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.fs.store.EtagChecksum; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; +import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.LambdaUtils; +import org.apache.hadoop.util.Lists; +import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.RateLimitingFactory; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.concurrent.HadoopExecutors; @@ -244,7 +248,6 @@ import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; -import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403_FORBIDDEN; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; @@ -258,11 +261,11 @@ import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.pairedTrackerFactory; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; -import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.util.Preconditions.checkArgument; +import static org.apache.hadoop.util.RateLimitingFactory.unlimitedRate; import static org.apache.hadoop.util.functional.RemoteIterators.foreach; import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator; @@ -283,7 +286,8 @@ @InterfaceStability.Evolving public class S3AFileSystem extends FileSystem implements StreamCapabilities, AWSPolicyProvider, DelegationTokenProvider, IOStatisticsSource, - AuditSpanSource, ActiveThreadSpanSource { + AuditSpanSource, ActiveThreadSpanSource, + StoreContextFactory { /** * Default blocksize as used in blocksize and FS status queries. @@ -296,6 +300,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private String username; + /** + * Store back end. + */ + private S3AStore store; + private S3Client s3Client; /** Async client is used for transfer manager. */ @@ -680,9 +689,6 @@ public void initialize(URI name, Configuration originalConf) // the encryption algorithms) bindAWSClient(name, delegationTokensEnabled); - // This initiates a probe against S3 for the bucket existing. - doBucketProbing(); - inputPolicy = S3AInputPolicy.getPolicy( conf.getTrimmed(INPUT_FADVISE, Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT), @@ -729,9 +735,6 @@ public void initialize(URI name, Configuration originalConf) directoryPolicy = DirectoryPolicyImpl.getDirectoryPolicy(conf, this::allowAuthoritative); LOG.debug("Directory marker retention policy is {}", directoryPolicy); - - initMultipartUploads(conf); - pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE, BULK_DELETE_PAGE_SIZE_DEFAULT, 0); checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE, @@ -756,6 +759,26 @@ public void initialize(URI name, Configuration originalConf) OPTIMIZED_COPY_FROM_LOCAL_DEFAULT); LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal); s3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false); + + int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0); + // now create the store + store = new S3AStoreBuilder() + .withS3Client(s3Client) + .withDurationTrackerFactory(getDurationTrackerFactory()) + .withStoreContextFactory(this) + .withAuditSpanSource(getAuditManager()) + .withInstrumentation(getInstrumentation()) + .withStatisticsContext(statisticsContext) + .withStorageStatistics(getStorageStatistics()) + .withReadRateLimiter(unlimitedRate()) + .withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity)) + .build(); + + // The filesystem is now ready to perform operations against + // S3 + // This initiates a probe against S3 for the bucket existing. + doBucketProbing(); + initMultipartUploads(conf); } catch (SdkException e) { // amazon client exception: stop all services then throw the translation cleanupWithLogger(LOG, span); @@ -1417,6 +1440,11 @@ public S3Client getAmazonS3Client(String reason) { return s3Client; } + @Override + public S3AStore getStore() { + return store; + } + /** * S3AInternals method. * {@inheritDoc}. @@ -3064,29 +3092,10 @@ public void incrementWriteOperations() { @Retries.RetryRaw protected void deleteObject(String key) throws SdkException, IOException { - blockRootDelete(key); incrementWriteOperations(); - try (DurationInfo ignored = - new DurationInfo(LOG, false, - "deleting %s", key)) { - invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key), - DELETE_CONSIDERED_IDEMPOTENT, - () -> { - incrementStatistic(OBJECT_DELETE_OBJECTS); - trackDurationOfInvocation(getDurationTrackerFactory(), - OBJECT_DELETE_REQUEST.getSymbol(), - () -> s3Client.deleteObject(getRequestFactory() - .newDeleteObjectRequestBuilder(key) - .build())); - return null; - }); - } catch (AwsServiceException ase) { - // 404 errors get swallowed; this can be raised by - // third party stores (GCS). - if (!isObjectNotFound(ase)) { - throw ase; - } - } + store.deleteObject(getRequestFactory() + .newDeleteObjectRequestBuilder(key) + .build()); } /** @@ -3112,19 +3121,6 @@ void deleteObjectAtPath(Path f, deleteObject(key); } - /** - * Reject any request to delete an object where the key is root. - * @param key key to validate - * @throws InvalidRequestException if the request was rejected due to - * a mistaken attempt to delete the root directory. - */ - private void blockRootDelete(String key) throws InvalidRequestException { - if (key.isEmpty() || "/".equals(key)) { - throw new InvalidRequestException("Bucket "+ bucket - +" cannot be deleted"); - } - } - /** * Perform a bulk object delete operation against S3. * Increments the {@code OBJECT_DELETE_REQUESTS} and write @@ -3151,38 +3147,11 @@ private void blockRootDelete(String key) throws InvalidRequestException { private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest) throws MultiObjectDeleteException, SdkException, IOException { incrementWriteOperations(); - BulkDeleteRetryHandler retryHandler = - new BulkDeleteRetryHandler(createStoreContext()); - int keyCount = deleteRequest.delete().objects().size(); - try (DurationInfo ignored = - new DurationInfo(LOG, false, "DELETE %d keys", - keyCount)) { - DeleteObjectsResponse response = - invoker.retryUntranslated("delete", DELETE_CONSIDERED_IDEMPOTENT, - (text, e, r, i) -> { - // handle the failure - retryHandler.bulkDeleteRetried(deleteRequest, e); - }, - // duration is tracked in the bulk delete counters - trackDurationOfOperation(getDurationTrackerFactory(), - OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> { - incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount); - return s3Client.deleteObjects(deleteRequest); - })); - - if (!response.errors().isEmpty()) { - // one or more of the keys could not be deleted. - // log and then throw - List errors = response.errors(); - LOG.debug("Partial failure of delete, {} errors", errors.size()); - for (S3Error error : errors) { - LOG.debug("{}: \"{}\" - {}", error.key(), error.code(), error.message()); - } - throw new MultiObjectDeleteException(errors); - } - - return response; + DeleteObjectsResponse response = store.deleteObjects(deleteRequest).getValue(); + if (!response.errors().isEmpty()) { + throw new MultiObjectDeleteException(response.errors()); } + return response; } /** @@ -3391,20 +3360,16 @@ private void removeKeysS3( List keysToDelete, boolean deleteFakeDir) throws MultiObjectDeleteException, AwsServiceException, IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Initiating delete operation for {} objects", - keysToDelete.size()); - for (ObjectIdentifier objectIdentifier : keysToDelete) { - LOG.debug(" \"{}\" {}", objectIdentifier.key(), - objectIdentifier.versionId() != null ? objectIdentifier.versionId() : ""); - } - } if (keysToDelete.isEmpty()) { // exit fast if there are no keys to delete return; } - for (ObjectIdentifier objectIdentifier : keysToDelete) { - blockRootDelete(objectIdentifier.key()); + if (keysToDelete.size() == 1) { + // single object is a single delete call. + // this is more informative in server logs and may be more efficient.. + deleteObject(keysToDelete.get(0).key()); + noteDeleted(1, deleteFakeDir); + return; } try { if (enableMultiObjectsDelete) { @@ -5481,7 +5446,6 @@ public boolean hasPathCapability(final Path path, final String capability) case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE: return true; - // multi object delete flag case ENABLE_MULTI_DELETE: return enableMultiObjectsDelete; @@ -5667,6 +5631,7 @@ public S3AMultipartUploaderBuilder createMultipartUploader( * new store context instances should be created as appropriate. * @return the store context of this FS. */ + @Override @InterfaceAudience.Private public StoreContext createStoreContext() { return new StoreContextBuilder().setFsURI(getUri()) @@ -5768,4 +5733,36 @@ public boolean isMultipartUploadEnabled() { return isMultipartUploadEnabled; } + /** + * S3A implementation to create a bulk delete operation using + * which actual bulk delete calls can be made. + * @return an implementation of the bulk delete. + */ + @Override + public BulkDelete createBulkDelete(final Path path) + throws IllegalArgumentException, IOException { + + final Path p = makeQualified(path); + final AuditSpanS3A span = createSpan("bulkdelete", p.toString(), null); + final int size = enableMultiObjectsDelete ? pageSize : 1; + return new BulkDeleteOperation( + createStoreContext(), + createBulkDeleteCallbacks(p, size, span), + p, + size, + span); + } + + /** + * Create the callbacks for the bulk delete operation. + * @param path path to delete. + * @param pageSize page size. + * @param span span for operations. + * @return an instance of the Bulk Delete callbacks. + */ + protected BulkDeleteOperation.BulkDeleteOperationCallbacks createBulkDeleteCallbacks( + Path path, int pageSize, AuditSpanS3A span) { + return new BulkDeleteOperationCallbacksImpl(store, pathToKey(path), pageSize, span); + } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java index b411606856..3f3178c7e6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java @@ -33,6 +33,9 @@ /** * This is an unstable interface for access to S3A Internal state, S3 operations * and the S3 client connector itself. + *

+ * Note for maintainers: this is documented in {@code aws_sdk_upgrade.md}; update + * on changes. */ @InterfaceStability.Unstable @InterfaceAudience.LimitedPrivate("testing/diagnostics") @@ -52,13 +55,19 @@ public interface S3AInternals { * set to false. *

* Mocking note: this is the same S3Client as is used by the owning - * filesystem; changes to this client will be reflected by changes + * filesystem and S3AStore; changes to this client will be reflected by changes * in the behavior of that filesystem. * @param reason a justification for requesting access. * @return S3Client */ S3Client getAmazonS3Client(String reason); + /** + * Get the store for low-level operations. + * @return the store the S3A FS is working through. + */ + S3AStore getStore(); + /** * Get the region of a bucket. * Invoked from StoreContext; consider an entry point. @@ -131,4 +140,5 @@ public interface S3AInternals { @AuditEntryPoint @Retries.RetryTranslated long abortMultipartUploads(Path path) throws IOException; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java new file mode 100644 index 0000000000..68eacc35b1 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; + +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException; +import org.apache.hadoop.fs.s3a.impl.StoreContext; +import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; + +/** + * Interface for the S3A Store; + * S3 client interactions should be via this; mocking + * is possible for unit tests. + */ +@InterfaceAudience.LimitedPrivate("Extensions") +@InterfaceStability.Unstable +public interface S3AStore extends IOStatisticsSource { + + /** + * Acquire write capacity for operations. + * This should be done within retry loops. + * @param capacity capacity to acquire. + * @return time spent waiting for output. + */ + Duration acquireWriteCapacity(int capacity); + + /** + * Acquire read capacity for operations. + * This should be done within retry loops. + * @param capacity capacity to acquire. + * @return time spent waiting for output. + */ + Duration acquireReadCapacity(int capacity); + + StoreContext getStoreContext(); + + DurationTrackerFactory getDurationTrackerFactory(); + + S3AStatisticsContext getStatisticsContext(); + + RequestFactory getRequestFactory(); + + /** + * Perform a bulk object delete operation against S3. + * Increments the {@code OBJECT_DELETE_REQUESTS} and write + * operation statistics + *

+ * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number + * of objects deleted in the request. + *

+ * Retry policy: retry untranslated; delete considered idempotent. + * If the request is throttled, this is logged in the throttle statistics, + * with the counter set to the number of keys, rather than the number + * of invocations of the delete operation. + * This is because S3 considers each key as one mutating operation on + * the store when updating its load counters on a specific partition + * of an S3 bucket. + * If only the request was measured, this operation would under-report. + * A write capacity will be requested proportional to the number of keys + * preset in the request and will be re-requested during retries such that + * retries throttle better. If the request is throttled, the time spent is + * recorded in a duration IOStat named {@code STORE_IO_RATE_LIMITED_DURATION}. + * @param deleteRequest keys to delete on the s3-backend + * @return the AWS response + * @throws MultiObjectDeleteException one or more of the keys could not + * be deleted. + * @throws SdkException amazon-layer failure. + * @throws IOException IO problems. + */ + @Retries.RetryRaw + Map.Entry deleteObjects(DeleteObjectsRequest deleteRequest) + throws MultiObjectDeleteException, SdkException, IOException; + + /** + * Delete an object. + * Increments the {@code OBJECT_DELETE_REQUESTS} statistics. + *

+ * Retry policy: retry untranslated; delete considered idempotent. + * 404 errors other than bucket not found are swallowed; + * this can be raised by third party stores (GCS). + *

+ * A write capacity of 1 ( as it is signle object delete) will be requested before + * the delete call and will be re-requested during retries such that + * retries throttle better. If the request is throttled, the time spent is + * recorded in a duration IOStat named {@code STORE_IO_RATE_LIMITED_DURATION}. + * If an exception is caught and swallowed, the response will be empty; + * otherwise it will be the response from the delete operation. + * @param request request to make + * @return the total duration and response. + * @throws SdkException problems working with S3 + * @throws IllegalArgumentException if the request was rejected due to + * a mistaken attempt to delete the root directory. + */ + @Retries.RetryRaw + Map.Entry> deleteObject( + DeleteObjectRequest request) throws SdkException; + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index ce3af3de80..7c4883c3d9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -103,6 +103,10 @@ public enum Statistic { StoreStatisticNames.OP_ACCESS, "Calls of access()", TYPE_DURATION), + INVOCATION_BULK_DELETE( + StoreStatisticNames.OP_BULK_DELETE, + "Calls of bulk delete()", + TYPE_COUNTER), INVOCATION_COPY_FROM_LOCAL_FILE( StoreStatisticNames.OP_COPY_FROM_LOCAL_FILE, "Calls of copyFromLocalFile()", @@ -539,6 +543,10 @@ public enum Statistic { "retried requests made of the remote store", TYPE_COUNTER), + STORE_IO_RATE_LIMITED(StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION, + "Duration of rate limited operations", + TYPE_DURATION), + STORE_IO_THROTTLED( StoreStatisticNames.STORE_IO_THROTTLED, "Requests throttled and retried", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java new file mode 100644 index 0000000000..64bebd880c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; + +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.store.audit.AuditSpan; +import org.apache.hadoop.util.functional.Tuples; + +import static java.util.Collections.emptyList; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.fs.BulkDeleteUtils.validatePathIsUnderParent; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * S3A Implementation of the {@link BulkDelete} interface. + */ +public class BulkDeleteOperation extends AbstractStoreOperation implements BulkDelete { + + private final BulkDeleteOperationCallbacks callbacks; + + private final Path basePath; + + private final int pageSize; + + public BulkDeleteOperation( + final StoreContext storeContext, + final BulkDeleteOperationCallbacks callbacks, + final Path basePath, + final int pageSize, + final AuditSpan span) { + super(storeContext, span); + this.callbacks = requireNonNull(callbacks); + this.basePath = requireNonNull(basePath); + checkArgument(pageSize > 0, "Page size must be greater than 0"); + this.pageSize = pageSize; + } + + @Override + public int pageSize() { + return pageSize; + } + + @Override + public Path basePath() { + return basePath; + } + + /** + * {@inheritDoc} + */ + @Override + public List> bulkDelete(final Collection paths) + throws IOException, IllegalArgumentException { + requireNonNull(paths); + checkArgument(paths.size() <= pageSize, + "Number of paths (%d) is larger than the page size (%d)", paths.size(), pageSize); + final StoreContext context = getStoreContext(); + final List objects = paths.stream().map(p -> { + checkArgument(p.isAbsolute(), "Path %s is not absolute", p); + checkArgument(validatePathIsUnderParent(p, basePath), + "Path %s is not under the base path %s", p, basePath); + final String k = context.pathToKey(p); + return ObjectIdentifier.builder().key(k).build(); + }).collect(toList()); + + final List> errors = callbacks.bulkDelete(objects); + if (!errors.isEmpty()) { + + final List> outcomeElements = errors + .stream() + .map(error -> Tuples.pair( + context.keyToPath(error.getKey()), + error.getValue() + )) + .collect(toList()); + return outcomeElements; + } + return emptyList(); + } + + @Override + public void close() throws IOException { + + } + + /** + * Callbacks for the bulk delete operation. + */ + public interface BulkDeleteOperationCallbacks { + + /** + * Perform a bulk delete operation. + * @param keys key list + * @return paths which failed to delete (if any). + * @throws IOException IO Exception. + * @throws IllegalArgumentException illegal arguments + */ + @Retries.RetryTranslated + List> bulkDelete(final List keys) + throws IOException, IllegalArgumentException; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java new file mode 100644 index 0000000000..2edcc3c7bb --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.AccessDeniedException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Error; + +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.store.audit.AuditSpan; +import org.apache.hadoop.util.functional.Tuples; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.apache.hadoop.fs.s3a.Invoker.once; +import static org.apache.hadoop.util.Preconditions.checkArgument; +import static org.apache.hadoop.util.functional.Tuples.pair; + +/** + * Callbacks for the bulk delete operation. + */ +public class BulkDeleteOperationCallbacksImpl implements + BulkDeleteOperation.BulkDeleteOperationCallbacks { + + /** + * Path for logging. + */ + private final String path; + + /** Page size for bulk delete. */ + private final int pageSize; + + /** span for operations. */ + private final AuditSpan span; + + /** + * Store. + */ + private final S3AStore store; + + + public BulkDeleteOperationCallbacksImpl(final S3AStore store, + String path, int pageSize, AuditSpan span) { + this.span = span; + this.pageSize = pageSize; + this.path = path; + this.store = store; + } + + @Override + @Retries.RetryTranslated + public List> bulkDelete(final List keysToDelete) + throws IOException, IllegalArgumentException { + span.activate(); + final int size = keysToDelete.size(); + checkArgument(size <= pageSize, + "Too many paths to delete in one operation: %s", size); + if (size == 0) { + return emptyList(); + } + + if (size == 1) { + return deleteSingleObject(keysToDelete.get(0).key()); + } + + final DeleteObjectsResponse response = once("bulkDelete", path, () -> + store.deleteObjects(store.getRequestFactory() + .newBulkDeleteRequestBuilder(keysToDelete) + .build())).getValue(); + final List errors = response.errors(); + if (errors.isEmpty()) { + // all good. + return emptyList(); + } else { + return errors.stream() + .map(e -> pair(e.key(), e.toString())) + .collect(Collectors.toList()); + } + } + + /** + * Delete a single object. + * @param key key to delete + * @return list of keys which failed to delete: length 0 or 1. + * @throws IOException IO problem other than AccessDeniedException + */ + @Retries.RetryTranslated + private List> deleteSingleObject(final String key) throws IOException { + try { + once("bulkDelete", path, () -> + store.deleteObject(store.getRequestFactory() + .newDeleteObjectRequestBuilder(key) + .build())); + } catch (AccessDeniedException e) { + return singletonList(pair(key, e.toString())); + } + return emptyList(); + + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java index 72ead1fb15..14ad559ead 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java @@ -118,11 +118,7 @@ public IOException translateException(final String message) { String exitCode = ""; for (S3Error error : errors()) { String code = error.code(); - String item = String.format("%s: %s%s: %s%n", code, error.key(), - (error.versionId() != null - ? (" (" + error.versionId() + ")") - : ""), - error.message()); + String item = errorToString(error); LOG.info(item); result.append(item); if (exitCode == null || exitCode.isEmpty() || ACCESS_DENIED.equals(code)) { @@ -136,4 +132,18 @@ public IOException translateException(final String message) { return new AWSS3IOException(result.toString(), this); } } + + /** + * Convert an error to a string. + * @param error error from a delete request + * @return string value + */ + public static String errorToString(final S3Error error) { + String code = error.code(); + return String.format("%s: %s%s: %s%n", code, error.key(), + (error.versionId() != null + ? (" (" + error.versionId() + ")") + : ""), + error.message()); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java new file mode 100644 index 0000000000..c1a6fcffab --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import software.amazon.awssdk.services.s3.S3Client; + +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.S3AStorageStatistics; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.store.audit.AuditSpanSource; +import org.apache.hadoop.util.RateLimiting; + +/** + * Builder for the S3AStore. + */ +public class S3AStoreBuilder { + + private StoreContextFactory storeContextFactory; + + private S3Client s3Client; + + private DurationTrackerFactory durationTrackerFactory; + + private S3AInstrumentation instrumentation; + + private S3AStatisticsContext statisticsContext; + + private S3AStorageStatistics storageStatistics; + + private RateLimiting readRateLimiter; + + private RateLimiting writeRateLimiter; + + private AuditSpanSource auditSpanSource; + + public S3AStoreBuilder withStoreContextFactory( + final StoreContextFactory storeContextFactoryValue) { + this.storeContextFactory = storeContextFactoryValue; + return this; + } + + public S3AStoreBuilder withS3Client( + final S3Client s3ClientValue) { + this.s3Client = s3ClientValue; + return this; + } + + public S3AStoreBuilder withDurationTrackerFactory( + final DurationTrackerFactory durationTrackerFactoryValue) { + this.durationTrackerFactory = durationTrackerFactoryValue; + return this; + } + + public S3AStoreBuilder withInstrumentation( + final S3AInstrumentation instrumentationValue) { + this.instrumentation = instrumentationValue; + return this; + } + + public S3AStoreBuilder withStatisticsContext( + final S3AStatisticsContext statisticsContextValue) { + this.statisticsContext = statisticsContextValue; + return this; + } + + public S3AStoreBuilder withStorageStatistics( + final S3AStorageStatistics storageStatisticsValue) { + this.storageStatistics = storageStatisticsValue; + return this; + } + + public S3AStoreBuilder withReadRateLimiter( + final RateLimiting readRateLimiterValue) { + this.readRateLimiter = readRateLimiterValue; + return this; + } + + public S3AStoreBuilder withWriteRateLimiter( + final RateLimiting writeRateLimiterValue) { + this.writeRateLimiter = writeRateLimiterValue; + return this; + } + + public S3AStoreBuilder withAuditSpanSource( + final AuditSpanSource auditSpanSourceValue) { + this.auditSpanSource = auditSpanSourceValue; + return this; + } + + public S3AStore build() { + return new S3AStoreImpl(storeContextFactory, s3Client, durationTrackerFactory, instrumentation, + statisticsContext, storageStatistics, readRateLimiter, writeRateLimiter, auditSpanSource); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java new file mode 100644 index 0000000000..6bfe42767d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Error; + +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3AInstrumentation; +import org.apache.hadoop.fs.s3a.S3AStorageStatistics; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.statistics.DurationTrackerFactory; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.store.audit.AuditSpanSource; +import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.RateLimiting; +import org.apache.hadoop.util.functional.Tuples; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException; +import static org.apache.hadoop.fs.s3a.Statistic.*; +import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * Store Layer. + * This is where lower level storage operations are intended + * to move. + */ +public class S3AStoreImpl implements S3AStore { + + private static final Logger LOG = LoggerFactory.getLogger(S3AStoreImpl.class); + + /** Factory to create store contexts. */ + private final StoreContextFactory storeContextFactory; + + /** The S3 client used to communicate with S3 bucket. */ + private final S3Client s3Client; + + /** The S3 bucket to communicate with. */ + private final String bucket; + + /** Request factory for creating requests. */ + private final RequestFactory requestFactory; + + /** Async client is used for transfer manager. */ + private S3AsyncClient s3AsyncClient; + + /** Duration tracker factory. */ + private final DurationTrackerFactory durationTrackerFactory; + + /** The core instrumentation. */ + private final S3AInstrumentation instrumentation; + + /** Accessors to statistics for this FS. */ + private final S3AStatisticsContext statisticsContext; + + /** Storage Statistics Bonded to the instrumentation. */ + private final S3AStorageStatistics storageStatistics; + + /** Rate limiter for read operations. */ + private final RateLimiting readRateLimiter; + + /** Rate limiter for write operations. */ + private final RateLimiting writeRateLimiter; + + /** Store context. */ + private final StoreContext storeContext; + + /** Invoker for retry operations. */ + private final Invoker invoker; + + /** Audit span source. */ + private final AuditSpanSource auditSpanSource; + + /** Constructor to create S3A store. */ + S3AStoreImpl(StoreContextFactory storeContextFactory, + S3Client s3Client, + DurationTrackerFactory durationTrackerFactory, + S3AInstrumentation instrumentation, + S3AStatisticsContext statisticsContext, + S3AStorageStatistics storageStatistics, + RateLimiting readRateLimiter, + RateLimiting writeRateLimiter, + AuditSpanSource auditSpanSource) { + this.storeContextFactory = requireNonNull(storeContextFactory); + this.s3Client = requireNonNull(s3Client); + this.durationTrackerFactory = requireNonNull(durationTrackerFactory); + this.instrumentation = requireNonNull(instrumentation); + this.statisticsContext = requireNonNull(statisticsContext); + this.storageStatistics = requireNonNull(storageStatistics); + this.readRateLimiter = requireNonNull(readRateLimiter); + this.writeRateLimiter = requireNonNull(writeRateLimiter); + this.auditSpanSource = requireNonNull(auditSpanSource); + this.storeContext = requireNonNull(storeContextFactory.createStoreContext()); + this.invoker = storeContext.getInvoker(); + this.bucket = storeContext.getBucket(); + this.requestFactory = storeContext.getRequestFactory(); + } + + /** Acquire write capacity for rate limiting {@inheritDoc}. */ + @Override + public Duration acquireWriteCapacity(final int capacity) { + return writeRateLimiter.acquire(capacity); + } + + /** Acquire read capacity for rate limiting {@inheritDoc}. */ + @Override + public Duration acquireReadCapacity(final int capacity) { + return readRateLimiter.acquire(capacity); + + } + + /** + * Create a new store context. + * @return a new store context. + */ + private StoreContext createStoreContext() { + return storeContextFactory.createStoreContext(); + } + + @Override + public StoreContext getStoreContext() { + return storeContext; + } + + private S3Client getS3Client() { + return s3Client; + } + + @Override + public DurationTrackerFactory getDurationTrackerFactory() { + return durationTrackerFactory; + } + + private S3AInstrumentation getInstrumentation() { + return instrumentation; + } + + @Override + public S3AStatisticsContext getStatisticsContext() { + return statisticsContext; + } + + private S3AStorageStatistics getStorageStatistics() { + return storageStatistics; + } + + @Override + public RequestFactory getRequestFactory() { + return requestFactory; + } + + /** + * Increment a statistic by 1. + * This increments both the instrumentation and storage statistics. + * @param statistic The operation to increment + */ + protected void incrementStatistic(Statistic statistic) { + incrementStatistic(statistic, 1); + } + + /** + * Increment a statistic by a specific value. + * This increments both the instrumentation and storage statistics. + * @param statistic The operation to increment + * @param count the count to increment + */ + protected void incrementStatistic(Statistic statistic, long count) { + statisticsContext.incrementCounter(statistic, count); + } + + /** + * Decrement a gauge by a specific value. + * @param statistic The operation to decrement + * @param count the count to decrement + */ + protected void decrementGauge(Statistic statistic, long count) { + statisticsContext.decrementGauge(statistic, count); + } + + /** + * Increment a gauge by a specific value. + * @param statistic The operation to increment + * @param count the count to increment + */ + protected void incrementGauge(Statistic statistic, long count) { + statisticsContext.incrementGauge(statistic, count); + } + + /** + * Callback when an operation was retried. + * Increments the statistics of ignored errors or throttled requests, + * depending up on the exception class. + * @param ex exception. + */ + public void operationRetried(Exception ex) { + if (isThrottleException(ex)) { + LOG.debug("Request throttled"); + incrementStatistic(STORE_IO_THROTTLED); + statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1); + } else { + incrementStatistic(STORE_IO_RETRY); + incrementStatistic(IGNORED_ERRORS); + } + } + + /** + * Callback from {@link Invoker} when an operation is retried. + * @param text text of the operation + * @param ex exception + * @param retries number of retries + * @param idempotent is the method idempotent + */ + public void operationRetried(String text, Exception ex, int retries, boolean idempotent) { + operationRetried(ex); + } + + /** + * Get the instrumentation's IOStatistics. + * @return statistics + */ + @Override + public IOStatistics getIOStatistics() { + return instrumentation.getIOStatistics(); + } + + /** + * Start an operation; this informs the audit service of the event + * and then sets it as the active span. + * @param operation operation name. + * @param path1 first path of operation + * @param path2 second path of operation + * @return a span for the audit + * @throws IOException failure + */ + public AuditSpanS3A createSpan(String operation, @Nullable String path1, @Nullable String path2) + throws IOException { + + return auditSpanSource.createSpan(operation, path1, path2); + } + + /** + * Reject any request to delete an object where the key is root. + * @param key key to validate + * @throws IllegalArgumentException if the request was rejected due to + * a mistaken attempt to delete the root directory. + */ + private void blockRootDelete(String key) throws IllegalArgumentException { + checkArgument(!key.isEmpty() && !"/".equals(key), "Bucket %s cannot be deleted", bucket); + } + + /** + * {@inheritDoc}. + */ + @Override + @Retries.RetryRaw + public Map.Entry deleteObjects( + final DeleteObjectsRequest deleteRequest) + throws SdkException { + + DeleteObjectsResponse response; + BulkDeleteRetryHandler retryHandler = new BulkDeleteRetryHandler(createStoreContext()); + + final List keysToDelete = deleteRequest.delete().objects(); + int keyCount = keysToDelete.size(); + if (LOG.isDebugEnabled()) { + LOG.debug("Initiating delete operation for {} objects", keysToDelete.size()); + keysToDelete.stream().forEach(objectIdentifier -> { + LOG.debug(" \"{}\" {}", objectIdentifier.key(), + objectIdentifier.versionId() != null ? objectIdentifier.versionId() : ""); + }); + } + // block root calls + keysToDelete.stream().map(ObjectIdentifier::key).forEach(this::blockRootDelete); + + try (DurationInfo d = new DurationInfo(LOG, false, "DELETE %d keys", keyCount)) { + response = + invoker.retryUntranslated("delete", + DELETE_CONSIDERED_IDEMPOTENT, (text, e, r, i) -> { + // handle the failure + retryHandler.bulkDeleteRetried(deleteRequest, e); + }, + // duration is tracked in the bulk delete counters + trackDurationOfOperation(getDurationTrackerFactory(), + OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> { + // acquire the write capacity for the number of keys to delete and record the duration. + Duration durationToAcquireWriteCapacity = acquireWriteCapacity(keyCount); + instrumentation.recordDuration(STORE_IO_RATE_LIMITED, + true, + durationToAcquireWriteCapacity); + incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount); + return s3Client.deleteObjects(deleteRequest); + })); + if (!response.errors().isEmpty()) { + // one or more of the keys could not be deleted. + // log and then throw + List errors = response.errors(); + if (LOG.isDebugEnabled()) { + LOG.debug("Partial failure of delete, {} errors", errors.size()); + for (S3Error error : errors) { + LOG.debug("{}: \"{}\" - {}", error.key(), error.code(), error.message()); + } + } + } + d.close(); + return Tuples.pair(d.asDuration(), response); + + } catch (IOException e) { + // this is part of the retry signature, nothing else. + // convert to unchecked. + throw new UncheckedIOException(e); + } + } + + /** + * {@inheritDoc}. + */ + @Override + @Retries.RetryRaw + public Map.Entry> deleteObject( + final DeleteObjectRequest request) + throws SdkException { + + String key = request.key(); + blockRootDelete(key); + DurationInfo d = new DurationInfo(LOG, false, "deleting %s", key); + try { + DeleteObjectResponse response = + invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key), + DELETE_CONSIDERED_IDEMPOTENT, + trackDurationOfOperation(getDurationTrackerFactory(), + OBJECT_DELETE_REQUEST.getSymbol(), () -> { + incrementStatistic(OBJECT_DELETE_OBJECTS); + // We try to acquire write capacity just before delete call. + Duration durationToAcquireWriteCapacity = acquireWriteCapacity(1); + instrumentation.recordDuration(STORE_IO_RATE_LIMITED, + true, durationToAcquireWriteCapacity); + return s3Client.deleteObject(request); + })); + d.close(); + return Tuples.pair(d.asDuration(), Optional.of(response)); + } catch (AwsServiceException ase) { + // 404 errors get swallowed; this can be raised by + // third party stores (GCS). + if (!isObjectNotFound(ase)) { + throw ase; + } + d.close(); + return Tuples.pair(d.asDuration(), Optional.empty()); + } catch (IOException e) { + // this is part of the retry signature, nothing else. + // convert to unchecked. + throw new UncheckedIOException(e); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java new file mode 100644 index 0000000000..9d8d708b2b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Factory for creating store contexts. + */ +@InterfaceAudience.Private +public interface StoreContextFactory { + + /** + * Build an immutable store context, including picking + * up the current audit span. + * @return the store context. + */ + StoreContext createStoreContext(); +} diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md index e2c095e531..abd58bffc6 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md @@ -324,6 +324,7 @@ They have also been updated to return V2 SDK classes. public interface S3AInternals { S3Client getAmazonS3V2Client(String reason); + S3AStore getStore(); @Retries.RetryTranslated @AuditEntryPoint String getBucketLocation() throws IOException; diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md index 4bb824356e..954823f217 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md @@ -59,7 +59,7 @@ To make most efficient use of S3, care is needed. The S3A FileSystem supports implementation of vectored read api using which a client can provide a list of file ranges to read returning a future read object associated with each range. For full api specification please see -[FSDataInputStream](../../hadoop-common-project/hadoop-common/filesystem/fsdatainputstream.html). +[FSDataInputStream](../../../../../../hadoop-common-project/hadoop-common/target/site/filesystem/fsdatainputstream.html). The following properties can be configured to optimise vectored reads based on the client requirements. @@ -94,6 +94,86 @@ on the client requirements. ``` +## Improving delete performance through bulkdelete API. + +For bulk delete API spec refer to File System specification. [BulkDelete](../../../../../../hadoop-common-project/hadoop-common/target/site/filesystem/bulkdelete.html) + +The S3A client exports this API. + +### S3A Implementation of Bulk Delete. +If multi-object delete is enabled (`fs.s3a.multiobjectdelete.enable` = true), as +it is by default, then the page size is limited to that defined in +`fs.s3a.bulk.delete.page.size`, which MUST be less than or equal to 1000. +* The entire list of paths to delete is aggregated into a single bulk delete request, + issued to the store. +* Provided the caller has the correct permissions, every entry in the list + will, if the path references an object, cause that object to be deleted. +* If the path does not reference an object: the path will not be deleted + "This is for deleting objects, not directories" +* No probes for the existence of parent directories will take place; no + parent directory markers will be created. + "If you need parent directories, call mkdir() yourself" +* The list of failed keys listed in the `DeleteObjectsResponse` response + are converted into paths and returned along with their error messages. +* Network and other IO errors are raised as exceptions. + +If multi-object delete is disabled (or the list of size 1) +* A single `DELETE` call is issued +* Any `AccessDeniedException` raised is converted to a result in the error list. +* Any 404 response from a (non-AWS) store will be ignored. +* Network and other IO errors are raised as exceptions. + +Because there are no probes to ensure the call does not overwrite a directory, +or to see if a parentDirectory marker needs to be created, +this API is still faster than issuing a normal `FileSystem.delete(path)` call. + +That is: all the overhead normally undertaken to preserve the Posix System model are omitted. + + +### S3 Scalability and Performance + +Every entry in a bulk delete request counts as one write operation +against AWS S3 storage. +With the default write rate under a prefix on AWS S3 Standard storage +restricted to 3,500 writes/second, it is very easy to overload +the store by issuing a few bulk delete requests simultaneously. + +* If throttling is triggered then all clients interacting with + the store may observe performance issues. +* The write quota applies even for paths which do not exist. +* The S3A client *may* perform rate throttling as well as page size limiting. + +What does that mean? it means that attempting to issue multiple +bulk delete calls in parallel can be counterproductive. + +When overloaded, the S3 store returns a 403 throttle response. +This will trigger it back off and retry of posting the request. +However, the repeated request will still include the same number of objects and +*so generate the same load*. + +This can lead to a pathological situation where the repeated requests will +never be satisfied because the request itself is sufficient to overload the store. +See [HADOOP-16823.Large DeleteObject requests are their own Thundering Herd] +(https://issues.apache.org/jira/browse/HADOOP-16823) +for an example of where this did actually surface in production. + +This is why the default page size of S3A clients is 250 paths, not the store limit of 1000 entries. +It is also why the S3A delete/rename operations do not attempt to do massive parallel deletions, +Instead bulk delete requests are queued for a single blocking thread to issue. +Consider a similar design. + + +When working with versioned S3 buckets, every path deleted will add a tombstone marker +to the store at that location, even if there was no object at that path. +While this has no negative performance impact on the bulk delete call, +it will slow down list requests subsequently made against that path. +That is: bulk delete requests of paths which do not exist will hurt future queries. + +Avoid this. Note also that TPC-DS Benchmark do not create the right load to make the +performance problems observable -but they can surface in production. +* Configure buckets to have a limited number of days for tombstones to be preserved. +* Do not delete paths which you know reference nonexistent files or directories. + ## Improving data input performance through fadvise The S3A Filesystem client supports the notion of input policies, similar diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java new file mode 100644 index 0000000000..71c3a30359 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.statistics.MeanStatistic; + +import static java.util.stream.Collectors.toList; +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN; +import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete_delete; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Contract tests for bulk delete operation for S3A Implementation. + */ +@RunWith(Parameterized.class) +public class ITestS3AContractBulkDelete extends AbstractContractBulkDeleteTest { + + private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractBulkDelete.class); + + /** + * Delete Page size: {@value}. + * This is the default page size for bulk delete operation for this contract test. + * All the tests in this class should pass number of paths equal to or less than + * this page size during the bulk delete operation. + */ + private static final int DELETE_PAGE_SIZE = 20; + + private final boolean enableMultiObjectDelete; + + @Parameterized.Parameters(name = "enableMultiObjectDelete = {0}") + public static Iterable enableMultiObjectDelete() { + return Arrays.asList(new Object[][]{ + {true}, + {false} + }); + } + + public ITestS3AContractBulkDelete(boolean enableMultiObjectDelete) { + this.enableMultiObjectDelete = enableMultiObjectDelete; + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + conf = propagateBucketOptions(conf, getTestBucketName(conf)); + if (enableMultiObjectDelete) { + // if multi-object delete is disabled, skip the test. + skipIfNotEnabled(conf, Constants.ENABLE_MULTI_DELETE, + "Bulk delete is explicitly disabled for this bucket"); + } + S3ATestUtils.removeBaseAndBucketOverrides(conf, + Constants.BULK_DELETE_PAGE_SIZE); + conf.setInt(Constants.BULK_DELETE_PAGE_SIZE, DELETE_PAGE_SIZE); + conf.setBoolean(Constants.ENABLE_MULTI_DELETE, enableMultiObjectDelete); + return conf; + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(createConfiguration()); + } + + @Override + protected int getExpectedPageSize() { + if (!enableMultiObjectDelete) { + // if multi-object delete is disabled, page size should be 1. + return 1; + } + return DELETE_PAGE_SIZE; + } + + @Override + public void validatePageSize() throws Exception { + Assertions.assertThat(pageSize) + .describedAs("Page size should match the configured page size") + .isEqualTo(getExpectedPageSize()); + } + + @Test + public void testBulkDeleteZeroPageSizePrecondition() throws Exception { + if (!enableMultiObjectDelete) { + // if multi-object delete is disabled, skip this test as + // page size is always 1. + skip("Multi-object delete is disabled"); + } + Configuration conf = getContract().getConf(); + conf.setInt(Constants.BULK_DELETE_PAGE_SIZE, 0); + Path testPath = path(getMethodName()); + try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) { + intercept(IllegalArgumentException.class, + () -> fs.createBulkDelete(testPath)); + } + } + + @Test + public void testPageSizeWhenMultiObjectsDisabled() throws Exception { + Configuration conf = getContract().getConf(); + conf.setBoolean(Constants.ENABLE_MULTI_DELETE, false); + Path testPath = path(getMethodName()); + try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) { + BulkDelete bulkDelete = fs.createBulkDelete(testPath); + Assertions.assertThat(bulkDelete.pageSize()) + .describedAs("Page size should be 1 when multi-object delete is disabled") + .isEqualTo(1); + } + } + + @Override + public void testDeletePathsDirectory() throws Exception { + List paths = new ArrayList<>(); + Path dirPath = new Path(basePath, "dir"); + fs.mkdirs(dirPath); + paths.add(dirPath); + Path filePath = new Path(dirPath, "file"); + touch(fs, filePath); + if (enableMultiObjectDelete) { + // Adding more paths only if multi-object delete is enabled. + paths.add(filePath); + } + assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths)); + // During the bulk delete operation, the directories are not deleted in S3A. + assertIsDirectory(dirPath); + } + + @Test + public void testBulkDeleteParentDirectoryWithDirectories() throws Exception { + List paths = new ArrayList<>(); + Path dirPath = new Path(basePath, "dir"); + fs.mkdirs(dirPath); + Path subDir = new Path(dirPath, "subdir"); + fs.mkdirs(subDir); + // adding parent directory to the list of paths. + paths.add(dirPath); + assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths)); + // During the bulk delete operation, the directories are not deleted in S3A. + assertIsDirectory(dirPath); + assertIsDirectory(subDir); + } + + public void testBulkDeleteParentDirectoryWithFiles() throws Exception { + List paths = new ArrayList<>(); + Path dirPath = new Path(basePath, "dir"); + fs.mkdirs(dirPath); + Path file = new Path(dirPath, "file"); + touch(fs, file); + // adding parent directory to the list of paths. + paths.add(dirPath); + assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths)); + // During the bulk delete operation, + // the directories are not deleted in S3A. + assertIsDirectory(dirPath); + } + + + @Test + public void testRateLimiting() throws Exception { + if (!enableMultiObjectDelete) { + skip("Multi-object delete is disabled so hard to trigger rate limiting"); + } + Configuration conf = getContract().getConf(); + conf.setInt(Constants.S3A_IO_RATE_LIMIT, 5); + Path basePath = path(getMethodName()); + try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) { + createFiles(fs, basePath, 1, 20, 0); + FileStatus[] fileStatuses = fs.listStatus(basePath); + List paths = Arrays.stream(fileStatuses) + .map(FileStatus::getPath) + .collect(toList()); + pageSizePreconditionForTest(paths.size()); + BulkDelete bulkDelete = fs.createBulkDelete(basePath); + bulkDelete.bulkDelete(paths); + MeanStatistic meanStatisticBefore = lookupMeanStatistic(fs.getIOStatistics(), + STORE_IO_RATE_LIMITED_DURATION + SUFFIX_MEAN); + Assertions.assertThat(meanStatisticBefore.mean()) + .describedAs("Rate limiting should not have happened during first delete call") + .isEqualTo(0.0); + bulkDelete.bulkDelete(paths); + bulkDelete.bulkDelete(paths); + bulkDelete.bulkDelete(paths); + MeanStatistic meanStatisticAfter = lookupMeanStatistic(fs.getIOStatistics(), + STORE_IO_RATE_LIMITED_DURATION + SUFFIX_MEAN); + Assertions.assertThat(meanStatisticAfter.mean()) + .describedAs("Rate limiting should have happened during multiple delete calls") + .isGreaterThan(0.0); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java index 734bcfd9c5..f43710cf25 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java @@ -35,8 +35,7 @@ /** - * Abstract base class for S3A unit tests using a mock S3 client and a null - * metadata store. + * Abstract base class for S3A unit tests using a mock S3 client. */ public abstract class AbstractS3AMockTest { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java index a4162f2121..28a443f04c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java @@ -61,9 +61,8 @@ public boolean deleteOnExit(Path f) throws IOException { // processDeleteOnExit. @Override protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException { - boolean result = super.deleteWithoutCloseCheck(f, recursive); deleteOnDnExitCount--; - return result; + return true; } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java index a7ccc92e13..0676dd5b16 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java @@ -23,7 +23,11 @@ import java.io.IOException; import java.net.URI; import java.nio.file.AccessDeniedException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.IntStream; import software.amazon.awssdk.auth.credentials.AwsCredentials; @@ -35,9 +39,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BulkDelete; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -55,7 +60,10 @@ import org.apache.hadoop.fs.s3a.impl.InstantiationIOException; import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; +import org.apache.hadoop.io.wrappedio.WrappedIO; +import static org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest.assertSuccessfulBulkDelete; +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION; @@ -702,6 +710,122 @@ public void testPartialDeleteSingleDelete() throws Throwable { executePartialDelete(createAssumedRoleConfig(), true); } + @Test + public void testBulkDeleteOnReadOnlyAccess() throws Throwable { + describe("Bulk delete with part of the child tree read only"); + executeBulkDeleteOnReadOnlyFiles(createAssumedRoleConfig()); + } + + @Test + public void testBulkDeleteWithReadWriteAccess() throws Throwable { + describe("Bulk delete with read write access"); + executeBulkDeleteOnSomeReadOnlyFiles(createAssumedRoleConfig()); + } + + /** + * Execute bulk delete on read only files and some read write files. + */ + private void executeBulkDeleteOnReadOnlyFiles(Configuration assumedRoleConfig) throws Exception { + Path destDir = methodPath(); + Path readOnlyDir = new Path(destDir, "readonlyDir"); + + // the full FS + S3AFileSystem fs = getFileSystem(); + WrappedIO.bulkDelete_delete(fs, destDir, new ArrayList<>()); + + bindReadOnlyRolePolicy(assumedRoleConfig, readOnlyDir); + roleFS = (S3AFileSystem) destDir.getFileSystem(assumedRoleConfig); + int bulkDeletePageSize = WrappedIO.bulkDelete_PageSize(roleFS, destDir); + int range = bulkDeletePageSize == 1 ? bulkDeletePageSize : 10; + touchFiles(fs, readOnlyDir, range); + touchFiles(roleFS, destDir, range); + FileStatus[] fileStatuses = roleFS.listStatus(readOnlyDir); + List pathsToDelete = Arrays.stream(fileStatuses) + .map(FileStatus::getPath) + .collect(Collectors.toList()); + // bulk delete in the read only FS should fail. + BulkDelete bulkDelete = roleFS.createBulkDelete(readOnlyDir); + assertAccessDeniedForEachPath(bulkDelete.bulkDelete(pathsToDelete)); + BulkDelete bulkDelete2 = roleFS.createBulkDelete(destDir); + assertAccessDeniedForEachPath(bulkDelete2.bulkDelete(pathsToDelete)); + // delete the files in the original FS should succeed. + BulkDelete bulkDelete3 = fs.createBulkDelete(readOnlyDir); + assertSuccessfulBulkDelete(bulkDelete3.bulkDelete(pathsToDelete)); + FileStatus[] fileStatusesUnderDestDir = roleFS.listStatus(destDir); + List pathsToDeleteUnderDestDir = Arrays.stream(fileStatusesUnderDestDir) + .map(FileStatus::getPath) + .collect(Collectors.toList()); + BulkDelete bulkDelete4 = fs.createBulkDelete(destDir); + assertSuccessfulBulkDelete(bulkDelete4.bulkDelete(pathsToDeleteUnderDestDir)); + } + + /** + * Execute bulk delete on some read only files and some read write files. + */ + private void executeBulkDeleteOnSomeReadOnlyFiles(Configuration assumedRoleConfig) + throws IOException { + Path destDir = methodPath(); + Path readOnlyDir = new Path(destDir, "readonlyDir"); + bindReadOnlyRolePolicy(assumedRoleConfig, readOnlyDir); + roleFS = (S3AFileSystem) destDir.getFileSystem(assumedRoleConfig); + S3AFileSystem fs = getFileSystem(); + if (WrappedIO.bulkDelete_PageSize(fs, destDir) == 1) { + String msg = "Skipping as this test requires more than one path to be deleted in bulk"; + LOG.debug(msg); + skip(msg); + } + WrappedIO.bulkDelete_delete(fs, destDir, new ArrayList<>()); + // creating 5 files in the read only dir. + int readOnlyRange = 5; + int readWriteRange = 3; + touchFiles(fs, readOnlyDir, readOnlyRange); + // creating 3 files in the base destination dir. + touchFiles(roleFS, destDir, readWriteRange); + RemoteIterator locatedFileStatusRemoteIterator = roleFS.listFiles(destDir, true); + List pathsToDelete2 = new ArrayList<>(); + while (locatedFileStatusRemoteIterator.hasNext()) { + pathsToDelete2.add(locatedFileStatusRemoteIterator.next().getPath()); + } + Assertions.assertThat(pathsToDelete2.size()) + .describedAs("Number of paths to delete in base destination dir") + .isEqualTo(readOnlyRange + readWriteRange); + BulkDelete bulkDelete5 = roleFS.createBulkDelete(destDir); + List> entries = bulkDelete5.bulkDelete(pathsToDelete2); + Assertions.assertThat(entries.size()) + .describedAs("Number of error entries in bulk delete result") + .isEqualTo(readOnlyRange); + assertAccessDeniedForEachPath(entries); + // delete the files in the original FS should succeed. + BulkDelete bulkDelete6 = fs.createBulkDelete(destDir); + assertSuccessfulBulkDelete(bulkDelete6.bulkDelete(pathsToDelete2)); + } + + /** + * Bind a read only role policy to a directory to the FS conf. + */ + private static void bindReadOnlyRolePolicy(Configuration assumedRoleConfig, + Path readOnlyDir) + throws JsonProcessingException { + bindRolePolicyStatements(assumedRoleConfig, STATEMENT_ALLOW_KMS_RW, + statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS), + new Statement(Effects.Deny) + .addActions(S3_PATH_WRITE_OPERATIONS) + .addResources(directory(readOnlyDir)) + ); + } + + /** + * Validate delete results for each path in the list + * has access denied error. + */ + private void assertAccessDeniedForEachPath(List> entries) { + for (Map.Entry entry : entries) { + Assertions.assertThat(entry.getValue()) + .describedAs("Error message for path %s is %s", entry.getKey(), entry.getValue()) + .contains("AccessDenied"); + } + } + /** * Have a directory with full R/W permissions, but then remove * write access underneath, and try to delete it. @@ -719,12 +843,7 @@ public void executePartialDelete(final Configuration conf, S3AFileSystem fs = getFileSystem(); fs.delete(destDir, true); - bindRolePolicyStatements(conf, STATEMENT_ALLOW_KMS_RW, - statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS), - new Statement(Effects.Deny) - .addActions(S3_PATH_WRITE_OPERATIONS) - .addResources(directory(readOnlyDir)) - ); + bindReadOnlyRolePolicy(conf, readOnlyDir); roleFS = (S3AFileSystem) destDir.getFileSystem(conf); int range = 10; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 79e5a93371..dc81077257 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -831,4 +831,6 @@ protected void delete(Path path, boolean recursive) throws IOException { timer.end("time to delete %s", path); } + + } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractBulkDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractBulkDelete.java new file mode 100644 index 0000000000..7ec11abe73 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractBulkDelete.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class ITestAbfsContractBulkDelete extends AbstractContractBulkDeleteTest { + + private final boolean isSecure; + private final ABFSContractTestBinding binding; + + public ITestAbfsContractBulkDelete() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); + } + + @Override + public void setup() throws Exception { + binding.setup(); + super.setup(); + } + + @Override + protected Configuration createConfiguration() { + return binding.getRawConfiguration(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new AbfsFileSystemContract(conf, isSecure); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties index 9f72d03653..64562ecdcf 100644 --- a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties @@ -26,6 +26,7 @@ log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG log4j.logger.org.apache.hadoop.fs.azurebfs.contracts.services.TracingService=TRACE log4j.logger.org.apache.hadoop.fs.azurebfs.services.AbfsClient=DEBUG +log4j.logger.org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation=DEBUG # after here: turn off log messages from other parts of the system # which only clutter test reports.