diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java
new file mode 100644
index 0000000000..ab5f73b562
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDelete.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * API for bulk deletion of objects/files,
+ * but not directories.
+ * After use, call {@code close()} to release any resources and
+ * to guarantee store IOStatistics are updated.
+ *
+ * Callers MUST have no expectation that parent directories will exist after the
+ * operation completes; if an object store needs to explicitly look for and create
+ * directory markers, that step will be omitted.
+ *
+ * Be aware that on some stores (AWS S3) each object listed in a bulk delete counts
+ * against the write IOPS limit; large page sizes are counterproductive here, as
+ * are attempts at parallel submissions across multiple threads.
+ * @see HADOOP-16823.
+ * Large DeleteObject requests are their own Thundering Herd
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface BulkDelete extends IOStatisticsSource, Closeable {
+
+ /**
+ * The maximum number of objects/files to delete in a single request.
+ * @return a number greater than zero.
+ */
+ int pageSize();
+
+ /**
+ * Base path of a bulk delete operation.
+ * All paths submitted in {@link #bulkDelete(Collection)} must be under this path.
+ * @return base path of a bulk delete operation.
+ */
+ Path basePath();
+
+ /**
+ * Delete a list of files/objects.
+ *
+ *
Files must be under the path provided in {@link #basePath()}.
+ *
The size of the list must be equal to or less than the page size
+ * declared in {@link #pageSize()}.
+ *
Directories are not supported; the outcome of attempting to delete
+ * directories is undefined (ignored; undetected, listed as failures...).
+ *
The operation is not atomic.
+ *
The operation is treated as idempotent: network failures may
+ * trigger resubmission of the request -any new objects created under a
+ * path in the list may then be deleted.
+ *
There is no guarantee that any parent directories exist after this call.
+ *
+ *
+ * @param paths list of paths which must be absolute and under the base path.
+ * provided in {@link #basePath()}.
+ * @return a list of paths which failed to delete, with the exception message.
+ * @throws IOException IO problems including networking, authentication and more.
+ * @throws IllegalArgumentException if a path argument is invalid.
+ */
+ List> bulkDelete(Collection paths)
+ throws IOException, IllegalArgumentException;
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java
new file mode 100644
index 0000000000..cad24babb3
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Interface for bulk deletion.
+ * Filesystems which support bulk deletion should implement this interface
+ * and MUST also declare their support in the path capability
+ * {@link CommonPathCapabilities#BULK_DELETE}.
+ * Exporting the interface does not guarantee that the operation is supported;
+ * returning a {@link BulkDelete} object from the call {@link #createBulkDelete(Path)}
+ * is.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface BulkDeleteSource {
+
+ /**
+ * Create a bulk delete operation.
+ * There is no network IO at this point, simply the creation of
+ * a bulk delete object.
+ * A path must be supplied to assist in link resolution.
+ * @param path path to delete under.
+ * @return the bulk delete.
+ * @throws UnsupportedOperationException bulk delete under that path is not supported.
+ * @throws IllegalArgumentException path not valid.
+ * @throws IOException problems resolving paths
+ */
+ BulkDelete createBulkDelete(Path path)
+ throws UnsupportedOperationException, IllegalArgumentException, IOException;
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteUtils.java
new file mode 100644
index 0000000000..d991642942
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.util.Collection;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for bulk delete operations.
+ */
+public final class BulkDeleteUtils {
+
+ private BulkDeleteUtils() {
+ }
+
+ /**
+ * Preconditions for bulk delete paths.
+ * @param paths paths to delete.
+ * @param pageSize maximum number of paths to delete in a single operation.
+ * @param basePath base path for the delete operation.
+ */
+ public static void validateBulkDeletePaths(Collection paths, int pageSize, Path basePath) {
+ requireNonNull(paths);
+ checkArgument(paths.size() <= pageSize,
+ "Number of paths (%d) is larger than the page size (%d)", paths.size(), pageSize);
+ paths.forEach(p -> {
+ checkArgument(p.isAbsolute(), "Path %s is not absolute", p);
+ checkArgument(validatePathIsUnderParent(p, basePath),
+ "Path %s is not under the base path %s", p, basePath);
+ });
+ }
+
+ /**
+ * Check if a path is under a base path.
+ * @param p path to check.
+ * @param basePath base path.
+ * @return true if the path is under the base path.
+ */
+ public static boolean validatePathIsUnderParent(Path p, Path basePath) {
+ while (p.getParent() != null) {
+ if (p.getParent().equals(basePath)) {
+ return true;
+ }
+ p = p.getParent();
+ }
+ return false;
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java
index 9ec07cbe96..2005f0ae3b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java
@@ -181,4 +181,10 @@ private CommonPathCapabilities() {
*/
public static final String DIRECTORY_LISTING_INCONSISTENT =
"fs.capability.directory.listing.inconsistent";
+
+ /**
+ * Capability string to probe for bulk delete: {@value}.
+ */
+ public static final String BULK_DELETE = "fs.capability.bulk.delete";
+
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 768fd5b5e1..2155e17328 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
+import org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
@@ -169,7 +170,8 @@
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured
- implements Closeable, DelegationTokenIssuer, PathCapabilities {
+ implements Closeable, DelegationTokenIssuer,
+ PathCapabilities, BulkDeleteSource {
public static final String FS_DEFAULT_NAME_KEY =
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
public static final String DEFAULT_FS =
@@ -3485,12 +3487,16 @@ public Collection getTrashRoots(boolean allUsers) {
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
- case CommonPathCapabilities.FS_SYMLINKS:
- // delegate to the existing supportsSymlinks() call.
- return supportsSymlinks() && areSymlinksEnabled();
- default:
- // the feature is not implemented.
- return false;
+ case CommonPathCapabilities.BULK_DELETE:
+ // bulk delete has default implementation which
+ // can called on any FileSystem.
+ return true;
+ case CommonPathCapabilities.FS_SYMLINKS:
+ // delegate to the existing supportsSymlinks() call.
+ return supportsSymlinks() && areSymlinksEnabled();
+ default:
+ // the feature is not implemented.
+ return false;
}
}
@@ -4976,4 +4982,18 @@ public MultipartUploaderBuilder createMultipartUploader(Path basePath)
methodNotSupported();
return null;
}
+
+ /**
+ * Create a bulk delete operation.
+ * The default implementation returns an instance of {@link DefaultBulkDeleteOperation}.
+ * @param path base path for the operation.
+ * @return an instance of the bulk delete.
+ * @throws IllegalArgumentException any argument is invalid.
+ * @throws IOException if there is an IO problem.
+ */
+ @Override
+ public BulkDelete createBulkDelete(Path path)
+ throws IllegalArgumentException, IOException {
+ return new DefaultBulkDeleteOperation(path, this);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/DefaultBulkDeleteOperation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/DefaultBulkDeleteOperation.java
new file mode 100644
index 0000000000..56f6a4622f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/DefaultBulkDeleteOperation.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.BulkDeleteUtils.validateBulkDeletePaths;
+
+/**
+ * Default implementation of the {@link BulkDelete} interface.
+ */
+public class DefaultBulkDeleteOperation implements BulkDelete {
+
+ private static Logger LOG = LoggerFactory.getLogger(DefaultBulkDeleteOperation.class);
+
+ /** Default page size for bulk delete. */
+ private static final int DEFAULT_PAGE_SIZE = 1;
+
+ /** Base path for the bulk delete operation. */
+ private final Path basePath;
+
+ /** Delegate File system make actual delete calls. */
+ private final FileSystem fs;
+
+ public DefaultBulkDeleteOperation(Path basePath,
+ FileSystem fs) {
+ this.basePath = requireNonNull(basePath);
+ this.fs = fs;
+ }
+
+ @Override
+ public int pageSize() {
+ return DEFAULT_PAGE_SIZE;
+ }
+
+ @Override
+ public Path basePath() {
+ return basePath;
+ }
+
+ /**
+ * {@inheritDoc}.
+ * The default impl just calls {@code FileSystem.delete(path, false)}
+ * on the single path in the list.
+ */
+ @Override
+ public List> bulkDelete(Collection paths)
+ throws IOException, IllegalArgumentException {
+ validateBulkDeletePaths(paths, DEFAULT_PAGE_SIZE, basePath);
+ List> result = new ArrayList<>();
+ if (!paths.isEmpty()) {
+ // As the page size is always 1, this should be the only one
+ // path in the collection.
+ Path pathToDelete = paths.iterator().next();
+ try {
+ fs.delete(pathToDelete, false);
+ } catch (IOException ex) {
+ LOG.debug("Couldn't delete {} - exception occurred: {}", pathToDelete, ex);
+ result.add(Tuples.pair(pathToDelete, ex.toString()));
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
index 19ee9d1414..a513cffd84 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java
@@ -46,6 +46,9 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_APPEND = "op_append";
+ /** {@value}. */
+ public static final String OP_BULK_DELETE = "op_bulk-delete";
+
/** {@value}. */
public static final String OP_COPY_FROM_LOCAL_FILE =
"op_copy_from_local_file";
@@ -194,6 +197,9 @@ public final class StoreStatisticNames {
public static final String STORE_IO_RETRY
= "store_io_retry";
+ public static final String STORE_IO_RATE_LIMITED_DURATION
+ = "store_io_rate_limited_duration";
+
/**
* A store's equivalent of a paged LIST request was initiated: {@value}.
*/
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java
new file mode 100644
index 0000000000..696055895a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/WrappedIO.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.wrappedio;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Reflection-friendly access to APIs which are not available in
+ * some of the older Hadoop versions which libraries still
+ * compile against.
+ *
+ * The intent is to avoid the need for complex reflection operations
+ * including wrapping of parameter classes, direct instatiation of
+ * new classes etc.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class WrappedIO {
+
+ private WrappedIO() {
+ }
+
+ /**
+ * Get the maximum number of objects/files to delete in a single request.
+ * @param fs filesystem
+ * @param path path to delete under.
+ * @return a number greater than or equal to zero.
+ * @throws UnsupportedOperationException bulk delete under that path is not supported.
+ * @throws IllegalArgumentException path not valid.
+ * @throws IOException problems resolving paths
+ */
+ public static int bulkDelete_PageSize(FileSystem fs, Path path) throws IOException {
+ try (BulkDelete bulk = fs.createBulkDelete(path)) {
+ return bulk.pageSize();
+ }
+ }
+
+ /**
+ * Delete a list of files/objects.
+ *
+ *
Files must be under the path provided in {@code base}.
+ *
The size of the list must be equal to or less than the page size.
+ *
Directories are not supported; the outcome of attempting to delete
+ * directories is undefined (ignored; undetected, listed as failures...).
+ *
The operation is not atomic.
+ *
The operation is treated as idempotent: network failures may
+ * trigger resubmission of the request -any new objects created under a
+ * path in the list may then be deleted.
+ *
There is no guarantee that any parent directories exist after this call.
+ *
+ *
+ * @param fs filesystem
+ * @param base path to delete under.
+ * @param paths list of paths which must be absolute and under the base path.
+ * @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message.
+ * @throws UnsupportedOperationException bulk delete under that path is not supported.
+ * @throws IOException IO problems including networking, authentication and more.
+ * @throws IllegalArgumentException if a path argument is invalid.
+ */
+ public static List> bulkDelete_delete(FileSystem fs,
+ Path base,
+ Collection paths)
+ throws IOException {
+ try (BulkDelete bulk = fs.createBulkDelete(base)) {
+ return bulk.bulkDelete(paths);
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Tuples.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Tuples.java
new file mode 100644
index 0000000000..ed80c1daca
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/Tuples.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Tuple support.
+ * This allows for tuples to be passed around as part of the public API without
+ * committing to a third-party library tuple implementation.
+ */
+@InterfaceStability.Unstable
+public final class Tuples {
+
+ private Tuples() {
+ }
+
+ /**
+ * Create a 2-tuple.
+ * @param key element 1
+ * @param value element 2
+ * @return a tuple.
+ * @param element 1 type
+ * @param element 2 type
+ */
+ public static Map.Entry pair(final K key, final V value) {
+ return new Tuple<>(key, value);
+ }
+
+ /**
+ * Simple tuple class: uses the Map.Entry interface as other
+ * implementations have done, so the API is available across
+ * all java versions.
+ * @param key
+ * @param value
+ */
+ private static final class Tuple implements Map.Entry {
+
+ private final K key;
+
+ private final V value;
+
+ private Tuple(final K key, final V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+
+ @Override
+ public V setValue(final V value) {
+ throw new UnsupportedOperationException("Tuple is immutable");
+ }
+
+ @Override
+ public String toString() {
+ return "(" + key + ", " + value + ')';
+ }
+
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
new file mode 100644
index 0000000000..de0e4e893b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
@@ -0,0 +1,139 @@
+
+
+# interface `BulkDelete`
+
+
+
+The `BulkDelete` interface provides an API to perform bulk delete of files/objects
+in an object store or filesystem.
+
+## Key Features
+
+* An API for submitting a list of paths to delete.
+* This list must be no larger than the "page size" supported by the client; This size is also exposed as a method.
+* Triggers a request to delete files at the specific paths.
+* Returns a list of which paths were reported as delete failures by the store.
+* Does not consider a nonexistent file to be a failure.
+* Does not offer any atomicity guarantees.
+* Idempotency guarantees are weak: retries may delete files newly created by other clients.
+* Provides no guarantees as to the outcome if a path references a directory.
+* Provides no guarantees that parent directories will exist after the call.
+
+
+The API is designed to match the semantics of the AWS S3 [Bulk Delete](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) REST API call, but it is not
+exclusively restricted to this store. This is why the "provides no guarantees"
+restrictions do not state what the outcome will be when executed on other stores.
+
+### Interface `org.apache.hadoop.fs.BulkDeleteSource`
+
+The interface `BulkDeleteSource` is offered by a FileSystem/FileContext class if
+it supports the API. The default implementation is implemented in base FileSystem
+class that returns an instance of `org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation`.
+The default implementation details are provided in below sections.
+
+
+```java
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface BulkDeleteSource {
+ BulkDelete createBulkDelete(Path path)
+ throws UnsupportedOperationException, IllegalArgumentException, IOException;
+
+}
+
+```
+
+### Interface `org.apache.hadoop.fs.BulkDelete`
+
+This is the bulk delete implementation returned by the `createBulkDelete()` call.
+
+```java
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface BulkDelete extends IOStatisticsSource, Closeable {
+ int pageSize();
+ Path basePath();
+ List> bulkDelete(List paths)
+ throws IOException, IllegalArgumentException;
+
+}
+
+```
+
+### `bulkDelete(paths)`
+
+#### Preconditions
+
+```python
+if length(paths) > pageSize: throw IllegalArgumentException
+```
+
+#### Postconditions
+
+All paths which refer to files are removed from the set of files.
+```python
+FS'Files = FS.Files - [paths]
+```
+
+No other restrictions are placed upon the outcome.
+
+
+### Availability
+
+The `BulkDeleteSource` interface is exported by `FileSystem` and `FileContext` storage clients
+which is available for all FS via `org.apache.hadoop.fs.impl.DefaultBulkDeleteSource`. For
+integration in applications like Apache Iceberg to work seamlessly, all implementations
+of this interface MUST NOT reject the request but instead return a BulkDelete instance
+of size >= 1.
+
+Use the `PathCapabilities` probe `fs.capability.bulk.delete`.
+
+```java
+store.hasPathCapability(path, "fs.capability.bulk.delete")
+```
+
+### Invocation through Reflection.
+
+The need for many libraries to compile against very old versions of Hadoop
+means that most of the cloud-first Filesystem API calls cannot be used except
+through reflection -And the more complicated The API and its data types are,
+The harder that reflection is to implement.
+
+To assist this, the class `org.apache.hadoop.io.wrappedio.WrappedIO` has few methods
+which are intended to provide simple access to the API, especially
+through reflection.
+
+```java
+
+ public static int bulkDeletePageSize(FileSystem fs, Path path) throws IOException;
+
+ public static int bulkDeletePageSize(FileSystem fs, Path path) throws IOException;
+
+ public static List> bulkDelete(FileSystem fs, Path base, Collection paths);
+```
+
+### Implementations
+
+#### Default Implementation
+
+The default implementation which will be used by all implementation of `FileSystem` of the
+`BulkDelete` interface is `org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation` which fixes the page
+size to be 1 and calls `FileSystem.delete(path, false)` on the single path in the list.
+
+
+#### S3A Implementation
+The S3A implementation is `org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation` which implements the
+multi object delete semantics of the AWS S3 API [Bulk Delete](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html)
+For more details please refer to the S3A Performance documentation.
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
index df39839e83..be72f35789 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/index.md
@@ -43,4 +43,5 @@ HDFS as these are commonly expected by Hadoop client applications.
1. [IOStatistics](iostatistics.html)
1. [openFile()](openfile.html)
1. [SafeMode](safemode.html)
-1. [LeaseRecoverable](leaserecoverable.html)
\ No newline at end of file
+1. [LeaseRecoverable](leaserecoverable.html)
+1. [BulkDelete](bulkdelete.html)
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java
new file mode 100644
index 0000000000..9ebf9923f3
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.CommonPathCapabilities;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.wrappedio.WrappedIO;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete_delete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for bulk delete operation.
+ */
+public abstract class AbstractContractBulkDeleteTest extends AbstractFSContractTestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractContractBulkDeleteTest.class);
+
+ /**
+ * Page size for bulk delete. This is calculated based
+ * on the store implementation.
+ */
+ protected int pageSize;
+
+ /**
+ * Base path for the bulk delete tests.
+ * All the paths to be deleted should be under this base path.
+ */
+ protected Path basePath;
+
+ /**
+ * Test file system.
+ */
+ protected FileSystem fs;
+
+ @Before
+ public void setUp() throws Exception {
+ fs = getFileSystem();
+ basePath = path(getClass().getName());
+ pageSize = WrappedIO.bulkDelete_PageSize(getFileSystem(), basePath);
+ fs.mkdirs(basePath);
+ }
+
+ public Path getBasePath() {
+ return basePath;
+ }
+
+ protected int getExpectedPageSize() {
+ return 1;
+ }
+
+ /**
+ * Validate the page size for bulk delete operation. Different stores can have different
+ * implementations for bulk delete operation thus different page size.
+ */
+ @Test
+ public void validatePageSize() throws Exception {
+ Assertions.assertThat(pageSize)
+ .describedAs("Page size should be 1 by default for all stores")
+ .isEqualTo(getExpectedPageSize());
+ }
+
+ @Test
+ public void testPathsSizeEqualsPageSizePrecondition() throws Exception {
+ List listOfPaths = createListOfPaths(pageSize, basePath);
+ // Bulk delete call should pass with no exception.
+ bulkDelete_delete(getFileSystem(), basePath, listOfPaths);
+ }
+
+ @Test
+ public void testPathsSizeGreaterThanPageSizePrecondition() throws Exception {
+ List listOfPaths = createListOfPaths(pageSize + 1, basePath);
+ intercept(IllegalArgumentException.class,
+ () -> bulkDelete_delete(getFileSystem(), basePath, listOfPaths));
+ }
+
+ @Test
+ public void testPathsSizeLessThanPageSizePrecondition() throws Exception {
+ List listOfPaths = createListOfPaths(pageSize - 1, basePath);
+ // Bulk delete call should pass with no exception.
+ bulkDelete_delete(getFileSystem(), basePath, listOfPaths);
+ }
+
+ @Test
+ public void testBulkDeleteSuccessful() throws Exception {
+ runBulkDelete(false);
+ }
+
+ @Test
+ public void testBulkDeleteSuccessfulUsingDirectFS() throws Exception {
+ runBulkDelete(true);
+ }
+
+ private void runBulkDelete(boolean useDirectFS) throws IOException {
+ List listOfPaths = createListOfPaths(pageSize, basePath);
+ for (Path path : listOfPaths) {
+ touch(fs, path);
+ }
+ FileStatus[] fileStatuses = fs.listStatus(basePath);
+ Assertions.assertThat(fileStatuses)
+ .describedAs("File count after create")
+ .hasSize(pageSize);
+ if (useDirectFS) {
+ assertSuccessfulBulkDelete(
+ fs.createBulkDelete(basePath).bulkDelete(listOfPaths));
+ } else {
+ // Using WrappedIO to call bulk delete.
+ assertSuccessfulBulkDelete(
+ bulkDelete_delete(getFileSystem(), basePath, listOfPaths));
+ }
+
+ FileStatus[] fileStatusesAfterDelete = fs.listStatus(basePath);
+ Assertions.assertThat(fileStatusesAfterDelete)
+ .describedAs("File statuses should be empty after delete")
+ .isEmpty();
+ }
+
+
+ @Test
+ public void validatePathCapabilityDeclared() throws Exception {
+ Assertions.assertThat(fs.hasPathCapability(basePath, CommonPathCapabilities.BULK_DELETE))
+ .describedAs("Path capability BULK_DELETE should be declared")
+ .isTrue();
+ }
+
+ /**
+ * This test should fail as path is not under the base path.
+ */
+ @Test
+ public void testDeletePathsNotUnderBase() throws Exception {
+ List paths = new ArrayList<>();
+ Path pathNotUnderBase = path("not-under-base");
+ paths.add(pathNotUnderBase);
+ intercept(IllegalArgumentException.class,
+ () -> bulkDelete_delete(getFileSystem(), basePath, paths));
+ }
+
+ /**
+ * This test should fail as path is not absolute.
+ */
+ @Test
+ public void testDeletePathsNotAbsolute() throws Exception {
+ List paths = new ArrayList<>();
+ Path pathNotAbsolute = new Path("not-absolute");
+ paths.add(pathNotAbsolute);
+ intercept(IllegalArgumentException.class,
+ () -> bulkDelete_delete(getFileSystem(), basePath, paths));
+ }
+
+ @Test
+ public void testDeletePathsNotExists() throws Exception {
+ List paths = new ArrayList<>();
+ Path pathNotExists = new Path(basePath, "not-exists");
+ paths.add(pathNotExists);
+ // bulk delete call doesn't verify if a path exist or not before deleting.
+ assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
+ }
+
+ @Test
+ public void testDeletePathsDirectory() throws Exception {
+ List paths = new ArrayList<>();
+ Path dirPath = new Path(basePath, "dir");
+ paths.add(dirPath);
+ Path filePath = new Path(dirPath, "file");
+ paths.add(filePath);
+ pageSizePreconditionForTest(paths.size());
+ fs.mkdirs(dirPath);
+ touch(fs, filePath);
+ // Outcome is undefined. But call shouldn't fail.
+ assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
+ }
+
+ @Test
+ public void testBulkDeleteParentDirectoryWithDirectories() throws Exception {
+ List paths = new ArrayList<>();
+ Path dirPath = new Path(basePath, "dir");
+ fs.mkdirs(dirPath);
+ Path subDir = new Path(dirPath, "subdir");
+ fs.mkdirs(subDir);
+ // adding parent directory to the list of paths.
+ paths.add(dirPath);
+ List> entries = bulkDelete_delete(getFileSystem(), basePath, paths);
+ Assertions.assertThat(entries)
+ .describedAs("Parent non empty directory should not be deleted")
+ .hasSize(1);
+ // During the bulk delete operation, the non-empty directories are not deleted in default implementation.
+ assertIsDirectory(dirPath);
+ }
+
+ @Test
+ public void testBulkDeleteParentDirectoryWithFiles() throws Exception {
+ List paths = new ArrayList<>();
+ Path dirPath = new Path(basePath, "dir");
+ fs.mkdirs(dirPath);
+ Path file = new Path(dirPath, "file");
+ touch(fs, file);
+ // adding parent directory to the list of paths.
+ paths.add(dirPath);
+ List> entries = bulkDelete_delete(getFileSystem(), basePath, paths);
+ Assertions.assertThat(entries)
+ .describedAs("Parent non empty directory should not be deleted")
+ .hasSize(1);
+ // During the bulk delete operation, the non-empty directories are not deleted in default implementation.
+ assertIsDirectory(dirPath);
+ }
+
+
+ @Test
+ public void testDeleteEmptyDirectory() throws Exception {
+ List paths = new ArrayList<>();
+ Path emptyDirPath = new Path(basePath, "empty-dir");
+ fs.mkdirs(emptyDirPath);
+ paths.add(emptyDirPath);
+ // Should pass as empty directory.
+ assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
+ }
+
+ @Test
+ public void testDeleteEmptyList() throws Exception {
+ List paths = new ArrayList<>();
+ // Empty list should pass.
+ assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
+ }
+
+ @Test
+ public void testDeleteSamePathsMoreThanOnce() throws Exception {
+ List paths = new ArrayList<>();
+ Path path = new Path(basePath, "file");
+ paths.add(path);
+ paths.add(path);
+ Path another = new Path(basePath, "another-file");
+ paths.add(another);
+ pageSizePreconditionForTest(paths.size());
+ touch(fs, path);
+ touch(fs, another);
+ assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
+ }
+
+ /**
+ * Skip test if paths size is greater than page size.
+ */
+ protected void pageSizePreconditionForTest(int size) {
+ if (size > pageSize) {
+ skip("Test requires paths size less than or equal to page size: " + pageSize);
+ }
+ }
+
+ /**
+ * This test validates that files to be deleted don't have
+ * to be direct children of the base path.
+ */
+ @Test
+ public void testDeepDirectoryFilesDelete() throws Exception {
+ List paths = new ArrayList<>();
+ Path dir1 = new Path(basePath, "dir1");
+ Path dir2 = new Path(dir1, "dir2");
+ Path dir3 = new Path(dir2, "dir3");
+ fs.mkdirs(dir3);
+ Path file1 = new Path(dir3, "file1");
+ touch(fs, file1);
+ paths.add(file1);
+ assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
+ }
+
+
+ @Test
+ public void testChildPaths() throws Exception {
+ List paths = new ArrayList<>();
+ Path dirPath = new Path(basePath, "dir");
+ fs.mkdirs(dirPath);
+ paths.add(dirPath);
+ Path filePath = new Path(dirPath, "file");
+ touch(fs, filePath);
+ paths.add(filePath);
+ pageSizePreconditionForTest(paths.size());
+ // Should pass as both paths are under the base path.
+ assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
+ }
+
+
+ /**
+ * Assert on returned entries after bulk delete operation.
+ * Entries should be empty after successful delete.
+ */
+ public static void assertSuccessfulBulkDelete(List> entries) {
+ Assertions.assertThat(entries)
+ .describedAs("Bulk delete failed, " +
+ "return entries should be empty after successful delete")
+ .isEmpty();
+ }
+
+ /**
+ * Create a list of paths with the given count
+ * under the given base path.
+ */
+ private List createListOfPaths(int count, Path basePath) {
+ List paths = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ Path path = new Path(basePath, "file-" + i);
+ paths.add(path);
+ }
+ return paths;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractBulkDelete.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractBulkDelete.java
new file mode 100644
index 0000000000..f1bd641806
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractBulkDelete.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.localfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Bulk delete contract tests for the local filesystem.
+ */
+public class TestLocalFSContractBulkDelete extends AbstractContractBulkDeleteTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new LocalFSContract(conf);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractBulkDelete.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractBulkDelete.java
new file mode 100644
index 0000000000..46d98249ab
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractBulkDelete.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.rawlocal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Bulk delete contract tests for the raw local filesystem.
+ */
+public class TestRawLocalContractBulkDelete extends AbstractContractBulkDeleteTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new RawlocalFSContract(conf);
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractBulkDelete.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractBulkDelete.java
new file mode 100644
index 0000000000..3a851b6ff1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractBulkDelete.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.hdfs;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Bulk delete contract tests for the HDFS filesystem.
+ */
+public class TestHDFSContractBulkDelete extends AbstractContractBulkDeleteTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new HDFSContract(conf);
+ }
+
+ @BeforeClass
+ public static void createCluster() throws IOException {
+ HDFSContract.createCluster();
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws IOException {
+ HDFSContract.destroyCluster();
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 67df37e5eb..185389739c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1641,4 +1641,16 @@ private Constants() {
*/
public static final String AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED =
"fs.s3a.access.grants.fallback.to.iam";
+ /**
+ * Default value for {@link #S3A_IO_RATE_LIMIT}.
+ * Value: {@value}.
+ * 0 means no rate limiting.
+ */
+ public static final int DEFAULT_S3A_IO_RATE_LIMIT = 0;
+
+ /**
+ * Config to set the rate limit for S3A IO operations.
+ * Value: {@value}.
+ */
+ public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 0e2ae0f74d..d04ca70a68 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -81,7 +81,6 @@
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
-import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
@@ -103,6 +102,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@@ -120,7 +120,8 @@
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.impl.AWSCannedACL;
import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
-import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
+import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation;
+import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
@@ -141,9 +142,11 @@
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
+import org.apache.hadoop.fs.s3a.impl.S3AStoreBuilder;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
+import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
@@ -162,10 +165,6 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.DurationInfo;
-import org.apache.hadoop.util.LambdaUtils;
-import org.apache.hadoop.util.Lists;
-import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -203,10 +202,15 @@
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.fs.store.EtagChecksum;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.LambdaUtils;
+import org.apache.hadoop.util.Lists;
+import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.RateLimitingFactory;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -244,7 +248,6 @@
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
-import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403_FORBIDDEN;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
@@ -258,11 +261,11 @@
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.pairedTrackerFactory;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
-import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.Preconditions.checkArgument;
+import static org.apache.hadoop.util.RateLimitingFactory.unlimitedRate;
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator;
@@ -283,7 +286,8 @@
@InterfaceStability.Evolving
public class S3AFileSystem extends FileSystem implements StreamCapabilities,
AWSPolicyProvider, DelegationTokenProvider, IOStatisticsSource,
- AuditSpanSource, ActiveThreadSpanSource {
+ AuditSpanSource, ActiveThreadSpanSource,
+ StoreContextFactory {
/**
* Default blocksize as used in blocksize and FS status queries.
@@ -296,6 +300,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private String username;
+ /**
+ * Store back end.
+ */
+ private S3AStore store;
+
private S3Client s3Client;
/** Async client is used for transfer manager. */
@@ -680,9 +689,6 @@ public void initialize(URI name, Configuration originalConf)
// the encryption algorithms)
bindAWSClient(name, delegationTokensEnabled);
- // This initiates a probe against S3 for the bucket existing.
- doBucketProbing();
-
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE,
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
@@ -729,9 +735,6 @@ public void initialize(URI name, Configuration originalConf)
directoryPolicy = DirectoryPolicyImpl.getDirectoryPolicy(conf,
this::allowAuthoritative);
LOG.debug("Directory marker retention policy is {}", directoryPolicy);
-
- initMultipartUploads(conf);
-
pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
@@ -756,6 +759,26 @@ public void initialize(URI name, Configuration originalConf)
OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal);
s3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false);
+
+ int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
+ // now create the store
+ store = new S3AStoreBuilder()
+ .withS3Client(s3Client)
+ .withDurationTrackerFactory(getDurationTrackerFactory())
+ .withStoreContextFactory(this)
+ .withAuditSpanSource(getAuditManager())
+ .withInstrumentation(getInstrumentation())
+ .withStatisticsContext(statisticsContext)
+ .withStorageStatistics(getStorageStatistics())
+ .withReadRateLimiter(unlimitedRate())
+ .withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
+ .build();
+
+ // The filesystem is now ready to perform operations against
+ // S3
+ // This initiates a probe against S3 for the bucket existing.
+ doBucketProbing();
+ initMultipartUploads(conf);
} catch (SdkException e) {
// amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span);
@@ -1417,6 +1440,11 @@ public S3Client getAmazonS3Client(String reason) {
return s3Client;
}
+ @Override
+ public S3AStore getStore() {
+ return store;
+ }
+
/**
* S3AInternals method.
* {@inheritDoc}.
@@ -3064,29 +3092,10 @@ public void incrementWriteOperations() {
@Retries.RetryRaw
protected void deleteObject(String key)
throws SdkException, IOException {
- blockRootDelete(key);
incrementWriteOperations();
- try (DurationInfo ignored =
- new DurationInfo(LOG, false,
- "deleting %s", key)) {
- invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key),
- DELETE_CONSIDERED_IDEMPOTENT,
- () -> {
- incrementStatistic(OBJECT_DELETE_OBJECTS);
- trackDurationOfInvocation(getDurationTrackerFactory(),
- OBJECT_DELETE_REQUEST.getSymbol(),
- () -> s3Client.deleteObject(getRequestFactory()
- .newDeleteObjectRequestBuilder(key)
- .build()));
- return null;
- });
- } catch (AwsServiceException ase) {
- // 404 errors get swallowed; this can be raised by
- // third party stores (GCS).
- if (!isObjectNotFound(ase)) {
- throw ase;
- }
- }
+ store.deleteObject(getRequestFactory()
+ .newDeleteObjectRequestBuilder(key)
+ .build());
}
/**
@@ -3112,19 +3121,6 @@ void deleteObjectAtPath(Path f,
deleteObject(key);
}
- /**
- * Reject any request to delete an object where the key is root.
- * @param key key to validate
- * @throws InvalidRequestException if the request was rejected due to
- * a mistaken attempt to delete the root directory.
- */
- private void blockRootDelete(String key) throws InvalidRequestException {
- if (key.isEmpty() || "/".equals(key)) {
- throw new InvalidRequestException("Bucket "+ bucket
- +" cannot be deleted");
- }
- }
-
/**
* Perform a bulk object delete operation against S3.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
@@ -3151,38 +3147,11 @@ private void blockRootDelete(String key) throws InvalidRequestException {
private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest)
throws MultiObjectDeleteException, SdkException, IOException {
incrementWriteOperations();
- BulkDeleteRetryHandler retryHandler =
- new BulkDeleteRetryHandler(createStoreContext());
- int keyCount = deleteRequest.delete().objects().size();
- try (DurationInfo ignored =
- new DurationInfo(LOG, false, "DELETE %d keys",
- keyCount)) {
- DeleteObjectsResponse response =
- invoker.retryUntranslated("delete", DELETE_CONSIDERED_IDEMPOTENT,
- (text, e, r, i) -> {
- // handle the failure
- retryHandler.bulkDeleteRetried(deleteRequest, e);
- },
- // duration is tracked in the bulk delete counters
- trackDurationOfOperation(getDurationTrackerFactory(),
- OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> {
- incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
- return s3Client.deleteObjects(deleteRequest);
- }));
-
- if (!response.errors().isEmpty()) {
- // one or more of the keys could not be deleted.
- // log and then throw
- List errors = response.errors();
- LOG.debug("Partial failure of delete, {} errors", errors.size());
- for (S3Error error : errors) {
- LOG.debug("{}: \"{}\" - {}", error.key(), error.code(), error.message());
- }
- throw new MultiObjectDeleteException(errors);
- }
-
- return response;
+ DeleteObjectsResponse response = store.deleteObjects(deleteRequest).getValue();
+ if (!response.errors().isEmpty()) {
+ throw new MultiObjectDeleteException(response.errors());
}
+ return response;
}
/**
@@ -3391,20 +3360,16 @@ private void removeKeysS3(
List keysToDelete,
boolean deleteFakeDir)
throws MultiObjectDeleteException, AwsServiceException, IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initiating delete operation for {} objects",
- keysToDelete.size());
- for (ObjectIdentifier objectIdentifier : keysToDelete) {
- LOG.debug(" \"{}\" {}", objectIdentifier.key(),
- objectIdentifier.versionId() != null ? objectIdentifier.versionId() : "");
- }
- }
if (keysToDelete.isEmpty()) {
// exit fast if there are no keys to delete
return;
}
- for (ObjectIdentifier objectIdentifier : keysToDelete) {
- blockRootDelete(objectIdentifier.key());
+ if (keysToDelete.size() == 1) {
+ // single object is a single delete call.
+ // this is more informative in server logs and may be more efficient..
+ deleteObject(keysToDelete.get(0).key());
+ noteDeleted(1, deleteFakeDir);
+ return;
}
try {
if (enableMultiObjectsDelete) {
@@ -5481,7 +5446,6 @@ public boolean hasPathCapability(final Path path, final String capability)
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
return true;
- // multi object delete flag
case ENABLE_MULTI_DELETE:
return enableMultiObjectsDelete;
@@ -5667,6 +5631,7 @@ public S3AMultipartUploaderBuilder createMultipartUploader(
* new store context instances should be created as appropriate.
* @return the store context of this FS.
*/
+ @Override
@InterfaceAudience.Private
public StoreContext createStoreContext() {
return new StoreContextBuilder().setFsURI(getUri())
@@ -5768,4 +5733,36 @@ public boolean isMultipartUploadEnabled() {
return isMultipartUploadEnabled;
}
+ /**
+ * S3A implementation to create a bulk delete operation using
+ * which actual bulk delete calls can be made.
+ * @return an implementation of the bulk delete.
+ */
+ @Override
+ public BulkDelete createBulkDelete(final Path path)
+ throws IllegalArgumentException, IOException {
+
+ final Path p = makeQualified(path);
+ final AuditSpanS3A span = createSpan("bulkdelete", p.toString(), null);
+ final int size = enableMultiObjectsDelete ? pageSize : 1;
+ return new BulkDeleteOperation(
+ createStoreContext(),
+ createBulkDeleteCallbacks(p, size, span),
+ p,
+ size,
+ span);
+ }
+
+ /**
+ * Create the callbacks for the bulk delete operation.
+ * @param path path to delete.
+ * @param pageSize page size.
+ * @param span span for operations.
+ * @return an instance of the Bulk Delete callbacks.
+ */
+ protected BulkDeleteOperation.BulkDeleteOperationCallbacks createBulkDeleteCallbacks(
+ Path path, int pageSize, AuditSpanS3A span) {
+ return new BulkDeleteOperationCallbacksImpl(store, pathToKey(path), pageSize, span);
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java
index b411606856..3f3178c7e6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java
@@ -33,6 +33,9 @@
/**
* This is an unstable interface for access to S3A Internal state, S3 operations
* and the S3 client connector itself.
+ *
+ * Note for maintainers: this is documented in {@code aws_sdk_upgrade.md}; update
+ * on changes.
*/
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate("testing/diagnostics")
@@ -52,13 +55,19 @@ public interface S3AInternals {
* set to false.
*
* Mocking note: this is the same S3Client as is used by the owning
- * filesystem; changes to this client will be reflected by changes
+ * filesystem and S3AStore; changes to this client will be reflected by changes
* in the behavior of that filesystem.
* @param reason a justification for requesting access.
* @return S3Client
*/
S3Client getAmazonS3Client(String reason);
+ /**
+ * Get the store for low-level operations.
+ * @return the store the S3A FS is working through.
+ */
+ S3AStore getStore();
+
/**
* Get the region of a bucket.
* Invoked from StoreContext; consider an entry point.
@@ -131,4 +140,5 @@ public interface S3AInternals {
@AuditEntryPoint
@Retries.RetryTranslated
long abortMultipartUploads(Path path) throws IOException;
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
new file mode 100644
index 0000000000..68eacc35b1
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
+import org.apache.hadoop.fs.s3a.impl.StoreContext;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+/**
+ * Interface for the S3A Store;
+ * S3 client interactions should be via this; mocking
+ * is possible for unit tests.
+ */
+@InterfaceAudience.LimitedPrivate("Extensions")
+@InterfaceStability.Unstable
+public interface S3AStore extends IOStatisticsSource {
+
+ /**
+ * Acquire write capacity for operations.
+ * This should be done within retry loops.
+ * @param capacity capacity to acquire.
+ * @return time spent waiting for output.
+ */
+ Duration acquireWriteCapacity(int capacity);
+
+ /**
+ * Acquire read capacity for operations.
+ * This should be done within retry loops.
+ * @param capacity capacity to acquire.
+ * @return time spent waiting for output.
+ */
+ Duration acquireReadCapacity(int capacity);
+
+ StoreContext getStoreContext();
+
+ DurationTrackerFactory getDurationTrackerFactory();
+
+ S3AStatisticsContext getStatisticsContext();
+
+ RequestFactory getRequestFactory();
+
+ /**
+ * Perform a bulk object delete operation against S3.
+ * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+ * operation statistics
+ *
+ * {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
+ * of objects deleted in the request.
+ *
+ * Retry policy: retry untranslated; delete considered idempotent.
+ * If the request is throttled, this is logged in the throttle statistics,
+ * with the counter set to the number of keys, rather than the number
+ * of invocations of the delete operation.
+ * This is because S3 considers each key as one mutating operation on
+ * the store when updating its load counters on a specific partition
+ * of an S3 bucket.
+ * If only the request was measured, this operation would under-report.
+ * A write capacity will be requested proportional to the number of keys
+ * preset in the request and will be re-requested during retries such that
+ * retries throttle better. If the request is throttled, the time spent is
+ * recorded in a duration IOStat named {@code STORE_IO_RATE_LIMITED_DURATION}.
+ * @param deleteRequest keys to delete on the s3-backend
+ * @return the AWS response
+ * @throws MultiObjectDeleteException one or more of the keys could not
+ * be deleted.
+ * @throws SdkException amazon-layer failure.
+ * @throws IOException IO problems.
+ */
+ @Retries.RetryRaw
+ Map.Entry deleteObjects(DeleteObjectsRequest deleteRequest)
+ throws MultiObjectDeleteException, SdkException, IOException;
+
+ /**
+ * Delete an object.
+ * Increments the {@code OBJECT_DELETE_REQUESTS} statistics.
+ *
+ * Retry policy: retry untranslated; delete considered idempotent.
+ * 404 errors other than bucket not found are swallowed;
+ * this can be raised by third party stores (GCS).
+ *
+ * A write capacity of 1 ( as it is signle object delete) will be requested before
+ * the delete call and will be re-requested during retries such that
+ * retries throttle better. If the request is throttled, the time spent is
+ * recorded in a duration IOStat named {@code STORE_IO_RATE_LIMITED_DURATION}.
+ * If an exception is caught and swallowed, the response will be empty;
+ * otherwise it will be the response from the delete operation.
+ * @param request request to make
+ * @return the total duration and response.
+ * @throws SdkException problems working with S3
+ * @throws IllegalArgumentException if the request was rejected due to
+ * a mistaken attempt to delete the root directory.
+ */
+ @Retries.RetryRaw
+ Map.Entry> deleteObject(
+ DeleteObjectRequest request) throws SdkException;
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index ce3af3de80..7c4883c3d9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -103,6 +103,10 @@ public enum Statistic {
StoreStatisticNames.OP_ACCESS,
"Calls of access()",
TYPE_DURATION),
+ INVOCATION_BULK_DELETE(
+ StoreStatisticNames.OP_BULK_DELETE,
+ "Calls of bulk delete()",
+ TYPE_COUNTER),
INVOCATION_COPY_FROM_LOCAL_FILE(
StoreStatisticNames.OP_COPY_FROM_LOCAL_FILE,
"Calls of copyFromLocalFile()",
@@ -539,6 +543,10 @@ public enum Statistic {
"retried requests made of the remote store",
TYPE_COUNTER),
+ STORE_IO_RATE_LIMITED(StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION,
+ "Duration of rate limited operations",
+ TYPE_DURATION),
+
STORE_IO_THROTTLED(
StoreStatisticNames.STORE_IO_THROTTLED,
"Requests throttled and retried",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java
new file mode 100644
index 0000000000..64bebd880c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperation.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.store.audit.AuditSpan;
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Collections.emptyList;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.fs.BulkDeleteUtils.validatePathIsUnderParent;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * S3A Implementation of the {@link BulkDelete} interface.
+ */
+public class BulkDeleteOperation extends AbstractStoreOperation implements BulkDelete {
+
+ private final BulkDeleteOperationCallbacks callbacks;
+
+ private final Path basePath;
+
+ private final int pageSize;
+
+ public BulkDeleteOperation(
+ final StoreContext storeContext,
+ final BulkDeleteOperationCallbacks callbacks,
+ final Path basePath,
+ final int pageSize,
+ final AuditSpan span) {
+ super(storeContext, span);
+ this.callbacks = requireNonNull(callbacks);
+ this.basePath = requireNonNull(basePath);
+ checkArgument(pageSize > 0, "Page size must be greater than 0");
+ this.pageSize = pageSize;
+ }
+
+ @Override
+ public int pageSize() {
+ return pageSize;
+ }
+
+ @Override
+ public Path basePath() {
+ return basePath;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List> bulkDelete(final Collection paths)
+ throws IOException, IllegalArgumentException {
+ requireNonNull(paths);
+ checkArgument(paths.size() <= pageSize,
+ "Number of paths (%d) is larger than the page size (%d)", paths.size(), pageSize);
+ final StoreContext context = getStoreContext();
+ final List objects = paths.stream().map(p -> {
+ checkArgument(p.isAbsolute(), "Path %s is not absolute", p);
+ checkArgument(validatePathIsUnderParent(p, basePath),
+ "Path %s is not under the base path %s", p, basePath);
+ final String k = context.pathToKey(p);
+ return ObjectIdentifier.builder().key(k).build();
+ }).collect(toList());
+
+ final List> errors = callbacks.bulkDelete(objects);
+ if (!errors.isEmpty()) {
+
+ final List> outcomeElements = errors
+ .stream()
+ .map(error -> Tuples.pair(
+ context.keyToPath(error.getKey()),
+ error.getValue()
+ ))
+ .collect(toList());
+ return outcomeElements;
+ }
+ return emptyList();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ /**
+ * Callbacks for the bulk delete operation.
+ */
+ public interface BulkDeleteOperationCallbacks {
+
+ /**
+ * Perform a bulk delete operation.
+ * @param keys key list
+ * @return paths which failed to delete (if any).
+ * @throws IOException IO Exception.
+ * @throws IllegalArgumentException illegal arguments
+ */
+ @Retries.RetryTranslated
+ List> bulkDelete(final List keys)
+ throws IOException, IllegalArgumentException;
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java
new file mode 100644
index 0000000000..2edcc3c7bb
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BulkDeleteOperationCallbacksImpl.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.S3Error;
+
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.store.audit.AuditSpan;
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.hadoop.fs.s3a.Invoker.once;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+import static org.apache.hadoop.util.functional.Tuples.pair;
+
+/**
+ * Callbacks for the bulk delete operation.
+ */
+public class BulkDeleteOperationCallbacksImpl implements
+ BulkDeleteOperation.BulkDeleteOperationCallbacks {
+
+ /**
+ * Path for logging.
+ */
+ private final String path;
+
+ /** Page size for bulk delete. */
+ private final int pageSize;
+
+ /** span for operations. */
+ private final AuditSpan span;
+
+ /**
+ * Store.
+ */
+ private final S3AStore store;
+
+
+ public BulkDeleteOperationCallbacksImpl(final S3AStore store,
+ String path, int pageSize, AuditSpan span) {
+ this.span = span;
+ this.pageSize = pageSize;
+ this.path = path;
+ this.store = store;
+ }
+
+ @Override
+ @Retries.RetryTranslated
+ public List> bulkDelete(final List keysToDelete)
+ throws IOException, IllegalArgumentException {
+ span.activate();
+ final int size = keysToDelete.size();
+ checkArgument(size <= pageSize,
+ "Too many paths to delete in one operation: %s", size);
+ if (size == 0) {
+ return emptyList();
+ }
+
+ if (size == 1) {
+ return deleteSingleObject(keysToDelete.get(0).key());
+ }
+
+ final DeleteObjectsResponse response = once("bulkDelete", path, () ->
+ store.deleteObjects(store.getRequestFactory()
+ .newBulkDeleteRequestBuilder(keysToDelete)
+ .build())).getValue();
+ final List errors = response.errors();
+ if (errors.isEmpty()) {
+ // all good.
+ return emptyList();
+ } else {
+ return errors.stream()
+ .map(e -> pair(e.key(), e.toString()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ /**
+ * Delete a single object.
+ * @param key key to delete
+ * @return list of keys which failed to delete: length 0 or 1.
+ * @throws IOException IO problem other than AccessDeniedException
+ */
+ @Retries.RetryTranslated
+ private List> deleteSingleObject(final String key) throws IOException {
+ try {
+ once("bulkDelete", path, () ->
+ store.deleteObject(store.getRequestFactory()
+ .newDeleteObjectRequestBuilder(key)
+ .build()));
+ } catch (AccessDeniedException e) {
+ return singletonList(pair(key, e.toString()));
+ }
+ return emptyList();
+
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java
index 72ead1fb15..14ad559ead 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteException.java
@@ -118,11 +118,7 @@ public IOException translateException(final String message) {
String exitCode = "";
for (S3Error error : errors()) {
String code = error.code();
- String item = String.format("%s: %s%s: %s%n", code, error.key(),
- (error.versionId() != null
- ? (" (" + error.versionId() + ")")
- : ""),
- error.message());
+ String item = errorToString(error);
LOG.info(item);
result.append(item);
if (exitCode == null || exitCode.isEmpty() || ACCESS_DENIED.equals(code)) {
@@ -136,4 +132,18 @@ public IOException translateException(final String message) {
return new AWSS3IOException(result.toString(), this);
}
}
+
+ /**
+ * Convert an error to a string.
+ * @param error error from a delete request
+ * @return string value
+ */
+ public static String errorToString(final S3Error error) {
+ String code = error.code();
+ return String.format("%s: %s%s: %s%n", code, error.key(),
+ (error.versionId() != null
+ ? (" (" + error.versionId() + ")")
+ : ""),
+ error.message());
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java
new file mode 100644
index 0000000000..c1a6fcffab
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreBuilder.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import software.amazon.awssdk.services.s3.S3Client;
+
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.store.audit.AuditSpanSource;
+import org.apache.hadoop.util.RateLimiting;
+
+/**
+ * Builder for the S3AStore.
+ */
+public class S3AStoreBuilder {
+
+ private StoreContextFactory storeContextFactory;
+
+ private S3Client s3Client;
+
+ private DurationTrackerFactory durationTrackerFactory;
+
+ private S3AInstrumentation instrumentation;
+
+ private S3AStatisticsContext statisticsContext;
+
+ private S3AStorageStatistics storageStatistics;
+
+ private RateLimiting readRateLimiter;
+
+ private RateLimiting writeRateLimiter;
+
+ private AuditSpanSource auditSpanSource;
+
+ public S3AStoreBuilder withStoreContextFactory(
+ final StoreContextFactory storeContextFactoryValue) {
+ this.storeContextFactory = storeContextFactoryValue;
+ return this;
+ }
+
+ public S3AStoreBuilder withS3Client(
+ final S3Client s3ClientValue) {
+ this.s3Client = s3ClientValue;
+ return this;
+ }
+
+ public S3AStoreBuilder withDurationTrackerFactory(
+ final DurationTrackerFactory durationTrackerFactoryValue) {
+ this.durationTrackerFactory = durationTrackerFactoryValue;
+ return this;
+ }
+
+ public S3AStoreBuilder withInstrumentation(
+ final S3AInstrumentation instrumentationValue) {
+ this.instrumentation = instrumentationValue;
+ return this;
+ }
+
+ public S3AStoreBuilder withStatisticsContext(
+ final S3AStatisticsContext statisticsContextValue) {
+ this.statisticsContext = statisticsContextValue;
+ return this;
+ }
+
+ public S3AStoreBuilder withStorageStatistics(
+ final S3AStorageStatistics storageStatisticsValue) {
+ this.storageStatistics = storageStatisticsValue;
+ return this;
+ }
+
+ public S3AStoreBuilder withReadRateLimiter(
+ final RateLimiting readRateLimiterValue) {
+ this.readRateLimiter = readRateLimiterValue;
+ return this;
+ }
+
+ public S3AStoreBuilder withWriteRateLimiter(
+ final RateLimiting writeRateLimiterValue) {
+ this.writeRateLimiter = writeRateLimiterValue;
+ return this;
+ }
+
+ public S3AStoreBuilder withAuditSpanSource(
+ final AuditSpanSource auditSpanSourceValue) {
+ this.auditSpanSource = auditSpanSourceValue;
+ return this;
+ }
+
+ public S3AStore build() {
+ return new S3AStoreImpl(storeContextFactory, s3Client, durationTrackerFactory, instrumentation,
+ statisticsContext, storageStatistics, readRateLimiter, writeRateLimiter, auditSpanSource);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
new file mode 100644
index 0000000000..6bfe42767d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+import software.amazon.awssdk.services.s3.model.S3Error;
+
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
+import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.store.audit.AuditSpanSource;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.RateLimiting;
+import org.apache.hadoop.util.functional.Tuples;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Store Layer.
+ * This is where lower level storage operations are intended
+ * to move.
+ */
+public class S3AStoreImpl implements S3AStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(S3AStoreImpl.class);
+
+ /** Factory to create store contexts. */
+ private final StoreContextFactory storeContextFactory;
+
+ /** The S3 client used to communicate with S3 bucket. */
+ private final S3Client s3Client;
+
+ /** The S3 bucket to communicate with. */
+ private final String bucket;
+
+ /** Request factory for creating requests. */
+ private final RequestFactory requestFactory;
+
+ /** Async client is used for transfer manager. */
+ private S3AsyncClient s3AsyncClient;
+
+ /** Duration tracker factory. */
+ private final DurationTrackerFactory durationTrackerFactory;
+
+ /** The core instrumentation. */
+ private final S3AInstrumentation instrumentation;
+
+ /** Accessors to statistics for this FS. */
+ private final S3AStatisticsContext statisticsContext;
+
+ /** Storage Statistics Bonded to the instrumentation. */
+ private final S3AStorageStatistics storageStatistics;
+
+ /** Rate limiter for read operations. */
+ private final RateLimiting readRateLimiter;
+
+ /** Rate limiter for write operations. */
+ private final RateLimiting writeRateLimiter;
+
+ /** Store context. */
+ private final StoreContext storeContext;
+
+ /** Invoker for retry operations. */
+ private final Invoker invoker;
+
+ /** Audit span source. */
+ private final AuditSpanSource auditSpanSource;
+
+ /** Constructor to create S3A store. */
+ S3AStoreImpl(StoreContextFactory storeContextFactory,
+ S3Client s3Client,
+ DurationTrackerFactory durationTrackerFactory,
+ S3AInstrumentation instrumentation,
+ S3AStatisticsContext statisticsContext,
+ S3AStorageStatistics storageStatistics,
+ RateLimiting readRateLimiter,
+ RateLimiting writeRateLimiter,
+ AuditSpanSource auditSpanSource) {
+ this.storeContextFactory = requireNonNull(storeContextFactory);
+ this.s3Client = requireNonNull(s3Client);
+ this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
+ this.instrumentation = requireNonNull(instrumentation);
+ this.statisticsContext = requireNonNull(statisticsContext);
+ this.storageStatistics = requireNonNull(storageStatistics);
+ this.readRateLimiter = requireNonNull(readRateLimiter);
+ this.writeRateLimiter = requireNonNull(writeRateLimiter);
+ this.auditSpanSource = requireNonNull(auditSpanSource);
+ this.storeContext = requireNonNull(storeContextFactory.createStoreContext());
+ this.invoker = storeContext.getInvoker();
+ this.bucket = storeContext.getBucket();
+ this.requestFactory = storeContext.getRequestFactory();
+ }
+
+ /** Acquire write capacity for rate limiting {@inheritDoc}. */
+ @Override
+ public Duration acquireWriteCapacity(final int capacity) {
+ return writeRateLimiter.acquire(capacity);
+ }
+
+ /** Acquire read capacity for rate limiting {@inheritDoc}. */
+ @Override
+ public Duration acquireReadCapacity(final int capacity) {
+ return readRateLimiter.acquire(capacity);
+
+ }
+
+ /**
+ * Create a new store context.
+ * @return a new store context.
+ */
+ private StoreContext createStoreContext() {
+ return storeContextFactory.createStoreContext();
+ }
+
+ @Override
+ public StoreContext getStoreContext() {
+ return storeContext;
+ }
+
+ private S3Client getS3Client() {
+ return s3Client;
+ }
+
+ @Override
+ public DurationTrackerFactory getDurationTrackerFactory() {
+ return durationTrackerFactory;
+ }
+
+ private S3AInstrumentation getInstrumentation() {
+ return instrumentation;
+ }
+
+ @Override
+ public S3AStatisticsContext getStatisticsContext() {
+ return statisticsContext;
+ }
+
+ private S3AStorageStatistics getStorageStatistics() {
+ return storageStatistics;
+ }
+
+ @Override
+ public RequestFactory getRequestFactory() {
+ return requestFactory;
+ }
+
+ /**
+ * Increment a statistic by 1.
+ * This increments both the instrumentation and storage statistics.
+ * @param statistic The operation to increment
+ */
+ protected void incrementStatistic(Statistic statistic) {
+ incrementStatistic(statistic, 1);
+ }
+
+ /**
+ * Increment a statistic by a specific value.
+ * This increments both the instrumentation and storage statistics.
+ * @param statistic The operation to increment
+ * @param count the count to increment
+ */
+ protected void incrementStatistic(Statistic statistic, long count) {
+ statisticsContext.incrementCounter(statistic, count);
+ }
+
+ /**
+ * Decrement a gauge by a specific value.
+ * @param statistic The operation to decrement
+ * @param count the count to decrement
+ */
+ protected void decrementGauge(Statistic statistic, long count) {
+ statisticsContext.decrementGauge(statistic, count);
+ }
+
+ /**
+ * Increment a gauge by a specific value.
+ * @param statistic The operation to increment
+ * @param count the count to increment
+ */
+ protected void incrementGauge(Statistic statistic, long count) {
+ statisticsContext.incrementGauge(statistic, count);
+ }
+
+ /**
+ * Callback when an operation was retried.
+ * Increments the statistics of ignored errors or throttled requests,
+ * depending up on the exception class.
+ * @param ex exception.
+ */
+ public void operationRetried(Exception ex) {
+ if (isThrottleException(ex)) {
+ LOG.debug("Request throttled");
+ incrementStatistic(STORE_IO_THROTTLED);
+ statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
+ } else {
+ incrementStatistic(STORE_IO_RETRY);
+ incrementStatistic(IGNORED_ERRORS);
+ }
+ }
+
+ /**
+ * Callback from {@link Invoker} when an operation is retried.
+ * @param text text of the operation
+ * @param ex exception
+ * @param retries number of retries
+ * @param idempotent is the method idempotent
+ */
+ public void operationRetried(String text, Exception ex, int retries, boolean idempotent) {
+ operationRetried(ex);
+ }
+
+ /**
+ * Get the instrumentation's IOStatistics.
+ * @return statistics
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return instrumentation.getIOStatistics();
+ }
+
+ /**
+ * Start an operation; this informs the audit service of the event
+ * and then sets it as the active span.
+ * @param operation operation name.
+ * @param path1 first path of operation
+ * @param path2 second path of operation
+ * @return a span for the audit
+ * @throws IOException failure
+ */
+ public AuditSpanS3A createSpan(String operation, @Nullable String path1, @Nullable String path2)
+ throws IOException {
+
+ return auditSpanSource.createSpan(operation, path1, path2);
+ }
+
+ /**
+ * Reject any request to delete an object where the key is root.
+ * @param key key to validate
+ * @throws IllegalArgumentException if the request was rejected due to
+ * a mistaken attempt to delete the root directory.
+ */
+ private void blockRootDelete(String key) throws IllegalArgumentException {
+ checkArgument(!key.isEmpty() && !"/".equals(key), "Bucket %s cannot be deleted", bucket);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ @Retries.RetryRaw
+ public Map.Entry deleteObjects(
+ final DeleteObjectsRequest deleteRequest)
+ throws SdkException {
+
+ DeleteObjectsResponse response;
+ BulkDeleteRetryHandler retryHandler = new BulkDeleteRetryHandler(createStoreContext());
+
+ final List keysToDelete = deleteRequest.delete().objects();
+ int keyCount = keysToDelete.size();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiating delete operation for {} objects", keysToDelete.size());
+ keysToDelete.stream().forEach(objectIdentifier -> {
+ LOG.debug(" \"{}\" {}", objectIdentifier.key(),
+ objectIdentifier.versionId() != null ? objectIdentifier.versionId() : "");
+ });
+ }
+ // block root calls
+ keysToDelete.stream().map(ObjectIdentifier::key).forEach(this::blockRootDelete);
+
+ try (DurationInfo d = new DurationInfo(LOG, false, "DELETE %d keys", keyCount)) {
+ response =
+ invoker.retryUntranslated("delete",
+ DELETE_CONSIDERED_IDEMPOTENT, (text, e, r, i) -> {
+ // handle the failure
+ retryHandler.bulkDeleteRetried(deleteRequest, e);
+ },
+ // duration is tracked in the bulk delete counters
+ trackDurationOfOperation(getDurationTrackerFactory(),
+ OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> {
+ // acquire the write capacity for the number of keys to delete and record the duration.
+ Duration durationToAcquireWriteCapacity = acquireWriteCapacity(keyCount);
+ instrumentation.recordDuration(STORE_IO_RATE_LIMITED,
+ true,
+ durationToAcquireWriteCapacity);
+ incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
+ return s3Client.deleteObjects(deleteRequest);
+ }));
+ if (!response.errors().isEmpty()) {
+ // one or more of the keys could not be deleted.
+ // log and then throw
+ List errors = response.errors();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Partial failure of delete, {} errors", errors.size());
+ for (S3Error error : errors) {
+ LOG.debug("{}: \"{}\" - {}", error.key(), error.code(), error.message());
+ }
+ }
+ }
+ d.close();
+ return Tuples.pair(d.asDuration(), response);
+
+ } catch (IOException e) {
+ // this is part of the retry signature, nothing else.
+ // convert to unchecked.
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ @Retries.RetryRaw
+ public Map.Entry> deleteObject(
+ final DeleteObjectRequest request)
+ throws SdkException {
+
+ String key = request.key();
+ blockRootDelete(key);
+ DurationInfo d = new DurationInfo(LOG, false, "deleting %s", key);
+ try {
+ DeleteObjectResponse response =
+ invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key),
+ DELETE_CONSIDERED_IDEMPOTENT,
+ trackDurationOfOperation(getDurationTrackerFactory(),
+ OBJECT_DELETE_REQUEST.getSymbol(), () -> {
+ incrementStatistic(OBJECT_DELETE_OBJECTS);
+ // We try to acquire write capacity just before delete call.
+ Duration durationToAcquireWriteCapacity = acquireWriteCapacity(1);
+ instrumentation.recordDuration(STORE_IO_RATE_LIMITED,
+ true, durationToAcquireWriteCapacity);
+ return s3Client.deleteObject(request);
+ }));
+ d.close();
+ return Tuples.pair(d.asDuration(), Optional.of(response));
+ } catch (AwsServiceException ase) {
+ // 404 errors get swallowed; this can be raised by
+ // third party stores (GCS).
+ if (!isObjectNotFound(ase)) {
+ throw ase;
+ }
+ d.close();
+ return Tuples.pair(d.asDuration(), Optional.empty());
+ } catch (IOException e) {
+ // this is part of the retry signature, nothing else.
+ // convert to unchecked.
+ throw new UncheckedIOException(e);
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java
new file mode 100644
index 0000000000..9d8d708b2b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Factory for creating store contexts.
+ */
+@InterfaceAudience.Private
+public interface StoreContextFactory {
+
+ /**
+ * Build an immutable store context, including picking
+ * up the current audit span.
+ * @return the store context.
+ */
+ StoreContext createStoreContext();
+}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md
index e2c095e531..abd58bffc6 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md
@@ -324,6 +324,7 @@ They have also been updated to return V2 SDK classes.
public interface S3AInternals {
S3Client getAmazonS3V2Client(String reason);
+ S3AStore getStore();
@Retries.RetryTranslated
@AuditEntryPoint
String getBucketLocation() throws IOException;
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
index 4bb824356e..954823f217 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
@@ -59,7 +59,7 @@ To make most efficient use of S3, care is needed.
The S3A FileSystem supports implementation of vectored read api using which
a client can provide a list of file ranges to read returning a future read
object associated with each range. For full api specification please see
-[FSDataInputStream](../../hadoop-common-project/hadoop-common/filesystem/fsdatainputstream.html).
+[FSDataInputStream](../../../../../../hadoop-common-project/hadoop-common/target/site/filesystem/fsdatainputstream.html).
The following properties can be configured to optimise vectored reads based
on the client requirements.
@@ -94,6 +94,86 @@ on the client requirements.
```
+## Improving delete performance through bulkdelete API.
+
+For bulk delete API spec refer to File System specification. [BulkDelete](../../../../../../hadoop-common-project/hadoop-common/target/site/filesystem/bulkdelete.html)
+
+The S3A client exports this API.
+
+### S3A Implementation of Bulk Delete.
+If multi-object delete is enabled (`fs.s3a.multiobjectdelete.enable` = true), as
+it is by default, then the page size is limited to that defined in
+`fs.s3a.bulk.delete.page.size`, which MUST be less than or equal to 1000.
+* The entire list of paths to delete is aggregated into a single bulk delete request,
+ issued to the store.
+* Provided the caller has the correct permissions, every entry in the list
+ will, if the path references an object, cause that object to be deleted.
+* If the path does not reference an object: the path will not be deleted
+ "This is for deleting objects, not directories"
+* No probes for the existence of parent directories will take place; no
+ parent directory markers will be created.
+ "If you need parent directories, call mkdir() yourself"
+* The list of failed keys listed in the `DeleteObjectsResponse` response
+ are converted into paths and returned along with their error messages.
+* Network and other IO errors are raised as exceptions.
+
+If multi-object delete is disabled (or the list of size 1)
+* A single `DELETE` call is issued
+* Any `AccessDeniedException` raised is converted to a result in the error list.
+* Any 404 response from a (non-AWS) store will be ignored.
+* Network and other IO errors are raised as exceptions.
+
+Because there are no probes to ensure the call does not overwrite a directory,
+or to see if a parentDirectory marker needs to be created,
+this API is still faster than issuing a normal `FileSystem.delete(path)` call.
+
+That is: all the overhead normally undertaken to preserve the Posix System model are omitted.
+
+
+### S3 Scalability and Performance
+
+Every entry in a bulk delete request counts as one write operation
+against AWS S3 storage.
+With the default write rate under a prefix on AWS S3 Standard storage
+restricted to 3,500 writes/second, it is very easy to overload
+the store by issuing a few bulk delete requests simultaneously.
+
+* If throttling is triggered then all clients interacting with
+ the store may observe performance issues.
+* The write quota applies even for paths which do not exist.
+* The S3A client *may* perform rate throttling as well as page size limiting.
+
+What does that mean? it means that attempting to issue multiple
+bulk delete calls in parallel can be counterproductive.
+
+When overloaded, the S3 store returns a 403 throttle response.
+This will trigger it back off and retry of posting the request.
+However, the repeated request will still include the same number of objects and
+*so generate the same load*.
+
+This can lead to a pathological situation where the repeated requests will
+never be satisfied because the request itself is sufficient to overload the store.
+See [HADOOP-16823.Large DeleteObject requests are their own Thundering Herd]
+(https://issues.apache.org/jira/browse/HADOOP-16823)
+for an example of where this did actually surface in production.
+
+This is why the default page size of S3A clients is 250 paths, not the store limit of 1000 entries.
+It is also why the S3A delete/rename operations do not attempt to do massive parallel deletions,
+Instead bulk delete requests are queued for a single blocking thread to issue.
+Consider a similar design.
+
+
+When working with versioned S3 buckets, every path deleted will add a tombstone marker
+to the store at that location, even if there was no object at that path.
+While this has no negative performance impact on the bulk delete call,
+it will slow down list requests subsequently made against that path.
+That is: bulk delete requests of paths which do not exist will hurt future queries.
+
+Avoid this. Note also that TPC-DS Benchmark do not create the right load to make the
+performance problems observable -but they can surface in production.
+* Configure buckets to have a limited number of days for tombstones to be preserved.
+* Do not delete paths which you know reference nonexistent files or directories.
+
## Improving data input performance through fadvise
The S3A Filesystem client supports the notion of input policies, similar
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
new file mode 100644
index 0000000000..71c3a30359
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract.s3a;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.MeanStatistic;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
+import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete_delete;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Contract tests for bulk delete operation for S3A Implementation.
+ */
+@RunWith(Parameterized.class)
+public class ITestS3AContractBulkDelete extends AbstractContractBulkDeleteTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractBulkDelete.class);
+
+ /**
+ * Delete Page size: {@value}.
+ * This is the default page size for bulk delete operation for this contract test.
+ * All the tests in this class should pass number of paths equal to or less than
+ * this page size during the bulk delete operation.
+ */
+ private static final int DELETE_PAGE_SIZE = 20;
+
+ private final boolean enableMultiObjectDelete;
+
+ @Parameterized.Parameters(name = "enableMultiObjectDelete = {0}")
+ public static Iterable