HADOOP-18679. Add API for bulk/paged delete of files (#6726)

Applications can create a BulkDelete instance from a
BulkDeleteSource; the BulkDelete interface provides
the pageSize(): the maximum number of entries which can be
deleted, and a bulkDelete(Collection paths)
method which can take a collection up to pageSize() long.

This is optimized for object stores with bulk delete APIs;
the S3A connector will offer the page size of
fs.s3a.bulk.delete.page.size unless bulk delete has
been disabled.

Even with a page size of 1, the S3A implementation is
more efficient than delete(path)
as there are no safety checks for the path being a directory
or probes for the need to recreate directories.

The interface BulkDeleteSource is implemented by
all FileSystem implementations, with a page size
of 1 and mapped to delete(pathToDelete, false).
This means that callers do not need to have special
case handling for object stores versus classic filesystems.

To aid use through reflection APIs, the class
org.apache.hadoop.io.wrappedio.WrappedIO
has been created with "reflection friendly" methods.

Contributed by Mukund Thakur and Steve Loughran
This commit is contained in:
Mukund Thakur 2024-05-20 11:05:25 -05:00 committed by GitHub
parent 41eacf4914
commit 47be1ab3b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 2679 additions and 119 deletions

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import static java.util.Objects.requireNonNull;
/**
* API for bulk deletion of objects/files,
* <i>but not directories</i>.
* After use, call {@code close()} to release any resources and
* to guarantee store IOStatistics are updated.
* <p>
* Callers MUST have no expectation that parent directories will exist after the
* operation completes; if an object store needs to explicitly look for and create
* directory markers, that step will be omitted.
* <p>
* Be aware that on some stores (AWS S3) each object listed in a bulk delete counts
* against the write IOPS limit; large page sizes are counterproductive here, as
* are attempts at parallel submissions across multiple threads.
* @see <a href="https://issues.apache.org/jira/browse/HADOOP-16823">HADOOP-16823.
* Large DeleteObject requests are their own Thundering Herd</a>
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface BulkDelete extends IOStatisticsSource, Closeable {
/**
* The maximum number of objects/files to delete in a single request.
* @return a number greater than zero.
*/
int pageSize();
/**
* Base path of a bulk delete operation.
* All paths submitted in {@link #bulkDelete(Collection)} must be under this path.
* @return base path of a bulk delete operation.
*/
Path basePath();
/**
* Delete a list of files/objects.
* <ul>
* <li>Files must be under the path provided in {@link #basePath()}.</li>
* <li>The size of the list must be equal to or less than the page size
* declared in {@link #pageSize()}.</li>
* <li>Directories are not supported; the outcome of attempting to delete
* directories is undefined (ignored; undetected, listed as failures...).</li>
* <li>The operation is not atomic.</li>
* <li>The operation is treated as idempotent: network failures may
* trigger resubmission of the request -any new objects created under a
* path in the list may then be deleted.</li>
* <li>There is no guarantee that any parent directories exist after this call.
* </li>
* </ul>
* @param paths list of paths which must be absolute and under the base path.
* provided in {@link #basePath()}.
* @return a list of paths which failed to delete, with the exception message.
* @throws IOException IO problems including networking, authentication and more.
* @throws IllegalArgumentException if a path argument is invalid.
*/
List<Map.Entry<Path, String>> bulkDelete(Collection<Path> paths)
throws IOException, IllegalArgumentException;
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Interface for bulk deletion.
* Filesystems which support bulk deletion should implement this interface
* and MUST also declare their support in the path capability
* {@link CommonPathCapabilities#BULK_DELETE}.
* Exporting the interface does not guarantee that the operation is supported;
* returning a {@link BulkDelete} object from the call {@link #createBulkDelete(Path)}
* is.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface BulkDeleteSource {
/**
* Create a bulk delete operation.
* There is no network IO at this point, simply the creation of
* a bulk delete object.
* A path must be supplied to assist in link resolution.
* @param path path to delete under.
* @return the bulk delete.
* @throws UnsupportedOperationException bulk delete under that path is not supported.
* @throws IllegalArgumentException path not valid.
* @throws IOException problems resolving paths
*/
BulkDelete createBulkDelete(Path path)
throws UnsupportedOperationException, IllegalArgumentException, IOException;
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.util.Collection;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.Preconditions.checkArgument;
/**
* Utility class for bulk delete operations.
*/
public final class BulkDeleteUtils {
private BulkDeleteUtils() {
}
/**
* Preconditions for bulk delete paths.
* @param paths paths to delete.
* @param pageSize maximum number of paths to delete in a single operation.
* @param basePath base path for the delete operation.
*/
public static void validateBulkDeletePaths(Collection<Path> paths, int pageSize, Path basePath) {
requireNonNull(paths);
checkArgument(paths.size() <= pageSize,
"Number of paths (%d) is larger than the page size (%d)", paths.size(), pageSize);
paths.forEach(p -> {
checkArgument(p.isAbsolute(), "Path %s is not absolute", p);
checkArgument(validatePathIsUnderParent(p, basePath),
"Path %s is not under the base path %s", p, basePath);
});
}
/**
* Check if a path is under a base path.
* @param p path to check.
* @param basePath base path.
* @return true if the path is under the base path.
*/
public static boolean validatePathIsUnderParent(Path p, Path basePath) {
while (p.getParent() != null) {
if (p.getParent().equals(basePath)) {
return true;
}
p = p.getParent();
}
return false;
}
}

View File

@ -181,4 +181,10 @@ private CommonPathCapabilities() {
*/
public static final String DIRECTORY_LISTING_INCONSISTENT =
"fs.capability.directory.listing.inconsistent";
/**
* Capability string to probe for bulk delete: {@value}.
*/
public static final String BULK_DELETE = "fs.capability.bulk.delete";
}

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
@ -169,7 +170,8 @@
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured
implements Closeable, DelegationTokenIssuer, PathCapabilities {
implements Closeable, DelegationTokenIssuer,
PathCapabilities, BulkDeleteSource {
public static final String FS_DEFAULT_NAME_KEY =
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
public static final String DEFAULT_FS =
@ -3485,6 +3487,10 @@ public Collection<FileStatus> getTrashRoots(boolean allUsers) {
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
switch (validatePathCapabilityArgs(makeQualified(path), capability)) {
case CommonPathCapabilities.BULK_DELETE:
// bulk delete has default implementation which
// can called on any FileSystem.
return true;
case CommonPathCapabilities.FS_SYMLINKS:
// delegate to the existing supportsSymlinks() call.
return supportsSymlinks() && areSymlinksEnabled();
@ -4976,4 +4982,18 @@ public MultipartUploaderBuilder createMultipartUploader(Path basePath)
methodNotSupported();
return null;
}
/**
* Create a bulk delete operation.
* The default implementation returns an instance of {@link DefaultBulkDeleteOperation}.
* @param path base path for the operation.
* @return an instance of the bulk delete.
* @throws IllegalArgumentException any argument is invalid.
* @throws IOException if there is an IO problem.
*/
@Override
public BulkDelete createBulkDelete(Path path)
throws IllegalArgumentException, IOException {
return new DefaultBulkDeleteOperation(path, this);
}
}

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.functional.Tuples;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.BulkDeleteUtils.validateBulkDeletePaths;
/**
* Default implementation of the {@link BulkDelete} interface.
*/
public class DefaultBulkDeleteOperation implements BulkDelete {
private static Logger LOG = LoggerFactory.getLogger(DefaultBulkDeleteOperation.class);
/** Default page size for bulk delete. */
private static final int DEFAULT_PAGE_SIZE = 1;
/** Base path for the bulk delete operation. */
private final Path basePath;
/** Delegate File system make actual delete calls. */
private final FileSystem fs;
public DefaultBulkDeleteOperation(Path basePath,
FileSystem fs) {
this.basePath = requireNonNull(basePath);
this.fs = fs;
}
@Override
public int pageSize() {
return DEFAULT_PAGE_SIZE;
}
@Override
public Path basePath() {
return basePath;
}
/**
* {@inheritDoc}.
* The default impl just calls {@code FileSystem.delete(path, false)}
* on the single path in the list.
*/
@Override
public List<Map.Entry<Path, String>> bulkDelete(Collection<Path> paths)
throws IOException, IllegalArgumentException {
validateBulkDeletePaths(paths, DEFAULT_PAGE_SIZE, basePath);
List<Map.Entry<Path, String>> result = new ArrayList<>();
if (!paths.isEmpty()) {
// As the page size is always 1, this should be the only one
// path in the collection.
Path pathToDelete = paths.iterator().next();
try {
fs.delete(pathToDelete, false);
} catch (IOException ex) {
LOG.debug("Couldn't delete {} - exception occurred: {}", pathToDelete, ex);
result.add(Tuples.pair(pathToDelete, ex.toString()));
}
}
return result;
}
@Override
public void close() throws IOException {
}
}

View File

@ -46,6 +46,9 @@ public final class StoreStatisticNames {
/** {@value}. */
public static final String OP_APPEND = "op_append";
/** {@value}. */
public static final String OP_BULK_DELETE = "op_bulk-delete";
/** {@value}. */
public static final String OP_COPY_FROM_LOCAL_FILE =
"op_copy_from_local_file";
@ -194,6 +197,9 @@ public final class StoreStatisticNames {
public static final String STORE_IO_RETRY
= "store_io_retry";
public static final String STORE_IO_RATE_LIMITED_DURATION
= "store_io_rate_limited_duration";
/**
* A store's equivalent of a paged LIST request was initiated: {@value}.
*/

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.wrappedio;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* Reflection-friendly access to APIs which are not available in
* some of the older Hadoop versions which libraries still
* compile against.
* <p>
* The intent is to avoid the need for complex reflection operations
* including wrapping of parameter classes, direct instatiation of
* new classes etc.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class WrappedIO {
private WrappedIO() {
}
/**
* Get the maximum number of objects/files to delete in a single request.
* @param fs filesystem
* @param path path to delete under.
* @return a number greater than or equal to zero.
* @throws UnsupportedOperationException bulk delete under that path is not supported.
* @throws IllegalArgumentException path not valid.
* @throws IOException problems resolving paths
*/
public static int bulkDelete_PageSize(FileSystem fs, Path path) throws IOException {
try (BulkDelete bulk = fs.createBulkDelete(path)) {
return bulk.pageSize();
}
}
/**
* Delete a list of files/objects.
* <ul>
* <li>Files must be under the path provided in {@code base}.</li>
* <li>The size of the list must be equal to or less than the page size.</li>
* <li>Directories are not supported; the outcome of attempting to delete
* directories is undefined (ignored; undetected, listed as failures...).</li>
* <li>The operation is not atomic.</li>
* <li>The operation is treated as idempotent: network failures may
* trigger resubmission of the request -any new objects created under a
* path in the list may then be deleted.</li>
* <li>There is no guarantee that any parent directories exist after this call.
* </li>
* </ul>
* @param fs filesystem
* @param base path to delete under.
* @param paths list of paths which must be absolute and under the base path.
* @return a list of all the paths which couldn't be deleted for a reason other than "not found" and any associated error message.
* @throws UnsupportedOperationException bulk delete under that path is not supported.
* @throws IOException IO problems including networking, authentication and more.
* @throws IllegalArgumentException if a path argument is invalid.
*/
public static List<Map.Entry<Path, String>> bulkDelete_delete(FileSystem fs,
Path base,
Collection<Path> paths)
throws IOException {
try (BulkDelete bulk = fs.createBulkDelete(base)) {
return bulk.bulkDelete(paths);
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.functional;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Tuple support.
* This allows for tuples to be passed around as part of the public API without
* committing to a third-party library tuple implementation.
*/
@InterfaceStability.Unstable
public final class Tuples {
private Tuples() {
}
/**
* Create a 2-tuple.
* @param key element 1
* @param value element 2
* @return a tuple.
* @param <K> element 1 type
* @param <V> element 2 type
*/
public static <K, V> Map.Entry<K, V> pair(final K key, final V value) {
return new Tuple<>(key, value);
}
/**
* Simple tuple class: uses the Map.Entry interface as other
* implementations have done, so the API is available across
* all java versions.
* @param <K> key
* @param <V> value
*/
private static final class Tuple<K, V> implements Map.Entry<K, V> {
private final K key;
private final V value;
private Tuple(final K key, final V value) {
this.key = key;
this.value = value;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
@Override
public V setValue(final V value) {
throw new UnsupportedOperationException("Tuple is immutable");
}
@Override
public String toString() {
return "(" + key + ", " + value + ')';
}
}
}

View File

@ -0,0 +1,139 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
# <a name="BulkDelete"></a> interface `BulkDelete`
<!-- MACRO{toc|fromDepth=1|toDepth=2} -->
The `BulkDelete` interface provides an API to perform bulk delete of files/objects
in an object store or filesystem.
## Key Features
* An API for submitting a list of paths to delete.
* This list must be no larger than the "page size" supported by the client; This size is also exposed as a method.
* Triggers a request to delete files at the specific paths.
* Returns a list of which paths were reported as delete failures by the store.
* Does not consider a nonexistent file to be a failure.
* Does not offer any atomicity guarantees.
* Idempotency guarantees are weak: retries may delete files newly created by other clients.
* Provides no guarantees as to the outcome if a path references a directory.
* Provides no guarantees that parent directories will exist after the call.
The API is designed to match the semantics of the AWS S3 [Bulk Delete](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html) REST API call, but it is not
exclusively restricted to this store. This is why the "provides no guarantees"
restrictions do not state what the outcome will be when executed on other stores.
### Interface `org.apache.hadoop.fs.BulkDeleteSource`
The interface `BulkDeleteSource` is offered by a FileSystem/FileContext class if
it supports the API. The default implementation is implemented in base FileSystem
class that returns an instance of `org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation`.
The default implementation details are provided in below sections.
```java
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface BulkDeleteSource {
BulkDelete createBulkDelete(Path path)
throws UnsupportedOperationException, IllegalArgumentException, IOException;
}
```
### Interface `org.apache.hadoop.fs.BulkDelete`
This is the bulk delete implementation returned by the `createBulkDelete()` call.
```java
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface BulkDelete extends IOStatisticsSource, Closeable {
int pageSize();
Path basePath();
List<Map.Entry<Path, String>> bulkDelete(List<Path> paths)
throws IOException, IllegalArgumentException;
}
```
### `bulkDelete(paths)`
#### Preconditions
```python
if length(paths) > pageSize: throw IllegalArgumentException
```
#### Postconditions
All paths which refer to files are removed from the set of files.
```python
FS'Files = FS.Files - [paths]
```
No other restrictions are placed upon the outcome.
### Availability
The `BulkDeleteSource` interface is exported by `FileSystem` and `FileContext` storage clients
which is available for all FS via `org.apache.hadoop.fs.impl.DefaultBulkDeleteSource`. For
integration in applications like Apache Iceberg to work seamlessly, all implementations
of this interface MUST NOT reject the request but instead return a BulkDelete instance
of size >= 1.
Use the `PathCapabilities` probe `fs.capability.bulk.delete`.
```java
store.hasPathCapability(path, "fs.capability.bulk.delete")
```
### Invocation through Reflection.
The need for many libraries to compile against very old versions of Hadoop
means that most of the cloud-first Filesystem API calls cannot be used except
through reflection -And the more complicated The API and its data types are,
The harder that reflection is to implement.
To assist this, the class `org.apache.hadoop.io.wrappedio.WrappedIO` has few methods
which are intended to provide simple access to the API, especially
through reflection.
```java
public static int bulkDeletePageSize(FileSystem fs, Path path) throws IOException;
public static int bulkDeletePageSize(FileSystem fs, Path path) throws IOException;
public static List<Map.Entry<Path, String>> bulkDelete(FileSystem fs, Path base, Collection<Path> paths);
```
### Implementations
#### Default Implementation
The default implementation which will be used by all implementation of `FileSystem` of the
`BulkDelete` interface is `org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation` which fixes the page
size to be 1 and calls `FileSystem.delete(path, false)` on the single path in the list.
#### S3A Implementation
The S3A implementation is `org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation` which implements the
multi object delete semantics of the AWS S3 API [Bulk Delete](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html)
For more details please refer to the S3A Performance documentation.

View File

@ -44,3 +44,4 @@ HDFS as these are commonly expected by Hadoop client applications.
1. [openFile()](openfile.html)
1. [SafeMode](safemode.html)
1. [LeaseRecoverable](leaserecoverable.html)
1. [BulkDelete](bulkdelete.html)

View File

@ -0,0 +1,336 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.wrappedio.WrappedIO;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete_delete;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Contract tests for bulk delete operation.
*/
public abstract class AbstractContractBulkDeleteTest extends AbstractFSContractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractBulkDeleteTest.class);
/**
* Page size for bulk delete. This is calculated based
* on the store implementation.
*/
protected int pageSize;
/**
* Base path for the bulk delete tests.
* All the paths to be deleted should be under this base path.
*/
protected Path basePath;
/**
* Test file system.
*/
protected FileSystem fs;
@Before
public void setUp() throws Exception {
fs = getFileSystem();
basePath = path(getClass().getName());
pageSize = WrappedIO.bulkDelete_PageSize(getFileSystem(), basePath);
fs.mkdirs(basePath);
}
public Path getBasePath() {
return basePath;
}
protected int getExpectedPageSize() {
return 1;
}
/**
* Validate the page size for bulk delete operation. Different stores can have different
* implementations for bulk delete operation thus different page size.
*/
@Test
public void validatePageSize() throws Exception {
Assertions.assertThat(pageSize)
.describedAs("Page size should be 1 by default for all stores")
.isEqualTo(getExpectedPageSize());
}
@Test
public void testPathsSizeEqualsPageSizePrecondition() throws Exception {
List<Path> listOfPaths = createListOfPaths(pageSize, basePath);
// Bulk delete call should pass with no exception.
bulkDelete_delete(getFileSystem(), basePath, listOfPaths);
}
@Test
public void testPathsSizeGreaterThanPageSizePrecondition() throws Exception {
List<Path> listOfPaths = createListOfPaths(pageSize + 1, basePath);
intercept(IllegalArgumentException.class,
() -> bulkDelete_delete(getFileSystem(), basePath, listOfPaths));
}
@Test
public void testPathsSizeLessThanPageSizePrecondition() throws Exception {
List<Path> listOfPaths = createListOfPaths(pageSize - 1, basePath);
// Bulk delete call should pass with no exception.
bulkDelete_delete(getFileSystem(), basePath, listOfPaths);
}
@Test
public void testBulkDeleteSuccessful() throws Exception {
runBulkDelete(false);
}
@Test
public void testBulkDeleteSuccessfulUsingDirectFS() throws Exception {
runBulkDelete(true);
}
private void runBulkDelete(boolean useDirectFS) throws IOException {
List<Path> listOfPaths = createListOfPaths(pageSize, basePath);
for (Path path : listOfPaths) {
touch(fs, path);
}
FileStatus[] fileStatuses = fs.listStatus(basePath);
Assertions.assertThat(fileStatuses)
.describedAs("File count after create")
.hasSize(pageSize);
if (useDirectFS) {
assertSuccessfulBulkDelete(
fs.createBulkDelete(basePath).bulkDelete(listOfPaths));
} else {
// Using WrappedIO to call bulk delete.
assertSuccessfulBulkDelete(
bulkDelete_delete(getFileSystem(), basePath, listOfPaths));
}
FileStatus[] fileStatusesAfterDelete = fs.listStatus(basePath);
Assertions.assertThat(fileStatusesAfterDelete)
.describedAs("File statuses should be empty after delete")
.isEmpty();
}
@Test
public void validatePathCapabilityDeclared() throws Exception {
Assertions.assertThat(fs.hasPathCapability(basePath, CommonPathCapabilities.BULK_DELETE))
.describedAs("Path capability BULK_DELETE should be declared")
.isTrue();
}
/**
* This test should fail as path is not under the base path.
*/
@Test
public void testDeletePathsNotUnderBase() throws Exception {
List<Path> paths = new ArrayList<>();
Path pathNotUnderBase = path("not-under-base");
paths.add(pathNotUnderBase);
intercept(IllegalArgumentException.class,
() -> bulkDelete_delete(getFileSystem(), basePath, paths));
}
/**
* This test should fail as path is not absolute.
*/
@Test
public void testDeletePathsNotAbsolute() throws Exception {
List<Path> paths = new ArrayList<>();
Path pathNotAbsolute = new Path("not-absolute");
paths.add(pathNotAbsolute);
intercept(IllegalArgumentException.class,
() -> bulkDelete_delete(getFileSystem(), basePath, paths));
}
@Test
public void testDeletePathsNotExists() throws Exception {
List<Path> paths = new ArrayList<>();
Path pathNotExists = new Path(basePath, "not-exists");
paths.add(pathNotExists);
// bulk delete call doesn't verify if a path exist or not before deleting.
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
}
@Test
public void testDeletePathsDirectory() throws Exception {
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
paths.add(dirPath);
Path filePath = new Path(dirPath, "file");
paths.add(filePath);
pageSizePreconditionForTest(paths.size());
fs.mkdirs(dirPath);
touch(fs, filePath);
// Outcome is undefined. But call shouldn't fail.
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
}
@Test
public void testBulkDeleteParentDirectoryWithDirectories() throws Exception {
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
fs.mkdirs(dirPath);
Path subDir = new Path(dirPath, "subdir");
fs.mkdirs(subDir);
// adding parent directory to the list of paths.
paths.add(dirPath);
List<Map.Entry<Path, String>> entries = bulkDelete_delete(getFileSystem(), basePath, paths);
Assertions.assertThat(entries)
.describedAs("Parent non empty directory should not be deleted")
.hasSize(1);
// During the bulk delete operation, the non-empty directories are not deleted in default implementation.
assertIsDirectory(dirPath);
}
@Test
public void testBulkDeleteParentDirectoryWithFiles() throws Exception {
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
fs.mkdirs(dirPath);
Path file = new Path(dirPath, "file");
touch(fs, file);
// adding parent directory to the list of paths.
paths.add(dirPath);
List<Map.Entry<Path, String>> entries = bulkDelete_delete(getFileSystem(), basePath, paths);
Assertions.assertThat(entries)
.describedAs("Parent non empty directory should not be deleted")
.hasSize(1);
// During the bulk delete operation, the non-empty directories are not deleted in default implementation.
assertIsDirectory(dirPath);
}
@Test
public void testDeleteEmptyDirectory() throws Exception {
List<Path> paths = new ArrayList<>();
Path emptyDirPath = new Path(basePath, "empty-dir");
fs.mkdirs(emptyDirPath);
paths.add(emptyDirPath);
// Should pass as empty directory.
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
}
@Test
public void testDeleteEmptyList() throws Exception {
List<Path> paths = new ArrayList<>();
// Empty list should pass.
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
}
@Test
public void testDeleteSamePathsMoreThanOnce() throws Exception {
List<Path> paths = new ArrayList<>();
Path path = new Path(basePath, "file");
paths.add(path);
paths.add(path);
Path another = new Path(basePath, "another-file");
paths.add(another);
pageSizePreconditionForTest(paths.size());
touch(fs, path);
touch(fs, another);
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
}
/**
* Skip test if paths size is greater than page size.
*/
protected void pageSizePreconditionForTest(int size) {
if (size > pageSize) {
skip("Test requires paths size less than or equal to page size: " + pageSize);
}
}
/**
* This test validates that files to be deleted don't have
* to be direct children of the base path.
*/
@Test
public void testDeepDirectoryFilesDelete() throws Exception {
List<Path> paths = new ArrayList<>();
Path dir1 = new Path(basePath, "dir1");
Path dir2 = new Path(dir1, "dir2");
Path dir3 = new Path(dir2, "dir3");
fs.mkdirs(dir3);
Path file1 = new Path(dir3, "file1");
touch(fs, file1);
paths.add(file1);
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
}
@Test
public void testChildPaths() throws Exception {
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
fs.mkdirs(dirPath);
paths.add(dirPath);
Path filePath = new Path(dirPath, "file");
touch(fs, filePath);
paths.add(filePath);
pageSizePreconditionForTest(paths.size());
// Should pass as both paths are under the base path.
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
}
/**
* Assert on returned entries after bulk delete operation.
* Entries should be empty after successful delete.
*/
public static void assertSuccessfulBulkDelete(List<Map.Entry<Path, String>> entries) {
Assertions.assertThat(entries)
.describedAs("Bulk delete failed, " +
"return entries should be empty after successful delete")
.isEmpty();
}
/**
* Create a list of paths with the given count
* under the given base path.
*/
private List<Path> createListOfPaths(int count, Path basePath) {
List<Path> paths = new ArrayList<>();
for (int i = 0; i < count; i++) {
Path path = new Path(basePath, "file-" + i);
paths.add(path);
}
return paths;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.localfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Bulk delete contract tests for the local filesystem.
*/
public class TestLocalFSContractBulkDelete extends AbstractContractBulkDeleteTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new LocalFSContract(conf);
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.rawlocal;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Bulk delete contract tests for the raw local filesystem.
*/
public class TestRawLocalContractBulkDelete extends AbstractContractBulkDeleteTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new RawlocalFSContract(conf);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.hdfs;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
/**
* Bulk delete contract tests for the HDFS filesystem.
*/
public class TestHDFSContractBulkDelete extends AbstractContractBulkDeleteTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new HDFSContract(conf);
}
@BeforeClass
public static void createCluster() throws IOException {
HDFSContract.createCluster();
}
@AfterClass
public static void teardownCluster() throws IOException {
HDFSContract.destroyCluster();
}
}

View File

@ -1641,4 +1641,16 @@ private Constants() {
*/
public static final String AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED =
"fs.s3a.access.grants.fallback.to.iam";
/**
* Default value for {@link #S3A_IO_RATE_LIMIT}.
* Value: {@value}.
* 0 means no rate limiting.
*/
public static final int DEFAULT_S3A_IO_RATE_LIMIT = 0;
/**
* Config to set the rate limit for S3A IO operations.
* Value: {@value}.
*/
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
}

View File

@ -81,7 +81,6 @@
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
@ -103,6 +102,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@ -120,7 +120,8 @@
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.impl.AWSCannedACL;
import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
@ -141,9 +142,11 @@
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
import org.apache.hadoop.fs.s3a.impl.S3AStoreBuilder;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
@ -162,10 +165,6 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -203,10 +202,15 @@
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.fs.store.EtagChecksum;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.RateLimitingFactory;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@ -244,7 +248,6 @@
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403_FORBIDDEN;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
@ -258,11 +261,11 @@
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.pairedTrackerFactory;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.RateLimitingFactory.unlimitedRate;
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator;
@ -283,7 +286,8 @@
@InterfaceStability.Evolving
public class S3AFileSystem extends FileSystem implements StreamCapabilities,
AWSPolicyProvider, DelegationTokenProvider, IOStatisticsSource,
AuditSpanSource<AuditSpanS3A>, ActiveThreadSpanSource<AuditSpanS3A> {
AuditSpanSource<AuditSpanS3A>, ActiveThreadSpanSource<AuditSpanS3A>,
StoreContextFactory {
/**
* Default blocksize as used in blocksize and FS status queries.
@ -296,6 +300,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private String username;
/**
* Store back end.
*/
private S3AStore store;
private S3Client s3Client;
/** Async client is used for transfer manager. */
@ -680,9 +689,6 @@ public void initialize(URI name, Configuration originalConf)
// the encryption algorithms)
bindAWSClient(name, delegationTokensEnabled);
// This initiates a probe against S3 for the bucket existing.
doBucketProbing();
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE,
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
@ -729,9 +735,6 @@ public void initialize(URI name, Configuration originalConf)
directoryPolicy = DirectoryPolicyImpl.getDirectoryPolicy(conf,
this::allowAuthoritative);
LOG.debug("Directory marker retention policy is {}", directoryPolicy);
initMultipartUploads(conf);
pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
@ -756,6 +759,26 @@ public void initialize(URI name, Configuration originalConf)
OPTIMIZED_COPY_FROM_LOCAL_DEFAULT);
LOG.debug("Using optimized copyFromLocal implementation: {}", optimizedCopyFromLocal);
s3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false);
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
// now create the store
store = new S3AStoreBuilder()
.withS3Client(s3Client)
.withDurationTrackerFactory(getDurationTrackerFactory())
.withStoreContextFactory(this)
.withAuditSpanSource(getAuditManager())
.withInstrumentation(getInstrumentation())
.withStatisticsContext(statisticsContext)
.withStorageStatistics(getStorageStatistics())
.withReadRateLimiter(unlimitedRate())
.withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
.build();
// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
doBucketProbing();
initMultipartUploads(conf);
} catch (SdkException e) {
// amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span);
@ -1417,6 +1440,11 @@ public S3Client getAmazonS3Client(String reason) {
return s3Client;
}
@Override
public S3AStore getStore() {
return store;
}
/**
* S3AInternals method.
* {@inheritDoc}.
@ -3064,29 +3092,10 @@ public void incrementWriteOperations() {
@Retries.RetryRaw
protected void deleteObject(String key)
throws SdkException, IOException {
blockRootDelete(key);
incrementWriteOperations();
try (DurationInfo ignored =
new DurationInfo(LOG, false,
"deleting %s", key)) {
invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key),
DELETE_CONSIDERED_IDEMPOTENT,
() -> {
incrementStatistic(OBJECT_DELETE_OBJECTS);
trackDurationOfInvocation(getDurationTrackerFactory(),
OBJECT_DELETE_REQUEST.getSymbol(),
() -> s3Client.deleteObject(getRequestFactory()
store.deleteObject(getRequestFactory()
.newDeleteObjectRequestBuilder(key)
.build()));
return null;
});
} catch (AwsServiceException ase) {
// 404 errors get swallowed; this can be raised by
// third party stores (GCS).
if (!isObjectNotFound(ase)) {
throw ase;
}
}
.build());
}
/**
@ -3112,19 +3121,6 @@ void deleteObjectAtPath(Path f,
deleteObject(key);
}
/**
* Reject any request to delete an object where the key is root.
* @param key key to validate
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
*/
private void blockRootDelete(String key) throws InvalidRequestException {
if (key.isEmpty() || "/".equals(key)) {
throw new InvalidRequestException("Bucket "+ bucket
+" cannot be deleted");
}
}
/**
* Perform a bulk object delete operation against S3.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
@ -3151,39 +3147,12 @@ private void blockRootDelete(String key) throws InvalidRequestException {
private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest)
throws MultiObjectDeleteException, SdkException, IOException {
incrementWriteOperations();
BulkDeleteRetryHandler retryHandler =
new BulkDeleteRetryHandler(createStoreContext());
int keyCount = deleteRequest.delete().objects().size();
try (DurationInfo ignored =
new DurationInfo(LOG, false, "DELETE %d keys",
keyCount)) {
DeleteObjectsResponse response =
invoker.retryUntranslated("delete", DELETE_CONSIDERED_IDEMPOTENT,
(text, e, r, i) -> {
// handle the failure
retryHandler.bulkDeleteRetried(deleteRequest, e);
},
// duration is tracked in the bulk delete counters
trackDurationOfOperation(getDurationTrackerFactory(),
OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> {
incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
return s3Client.deleteObjects(deleteRequest);
}));
DeleteObjectsResponse response = store.deleteObjects(deleteRequest).getValue();
if (!response.errors().isEmpty()) {
// one or more of the keys could not be deleted.
// log and then throw
List<S3Error> errors = response.errors();
LOG.debug("Partial failure of delete, {} errors", errors.size());
for (S3Error error : errors) {
LOG.debug("{}: \"{}\" - {}", error.key(), error.code(), error.message());
throw new MultiObjectDeleteException(response.errors());
}
throw new MultiObjectDeleteException(errors);
}
return response;
}
}
/**
* Create a putObject request builder.
@ -3391,20 +3360,16 @@ private void removeKeysS3(
List<ObjectIdentifier> keysToDelete,
boolean deleteFakeDir)
throws MultiObjectDeleteException, AwsServiceException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Initiating delete operation for {} objects",
keysToDelete.size());
for (ObjectIdentifier objectIdentifier : keysToDelete) {
LOG.debug(" \"{}\" {}", objectIdentifier.key(),
objectIdentifier.versionId() != null ? objectIdentifier.versionId() : "");
}
}
if (keysToDelete.isEmpty()) {
// exit fast if there are no keys to delete
return;
}
for (ObjectIdentifier objectIdentifier : keysToDelete) {
blockRootDelete(objectIdentifier.key());
if (keysToDelete.size() == 1) {
// single object is a single delete call.
// this is more informative in server logs and may be more efficient..
deleteObject(keysToDelete.get(0).key());
noteDeleted(1, deleteFakeDir);
return;
}
try {
if (enableMultiObjectsDelete) {
@ -5481,7 +5446,6 @@ public boolean hasPathCapability(final Path path, final String capability)
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
return true;
// multi object delete flag
case ENABLE_MULTI_DELETE:
return enableMultiObjectsDelete;
@ -5667,6 +5631,7 @@ public S3AMultipartUploaderBuilder createMultipartUploader(
* new store context instances should be created as appropriate.
* @return the store context of this FS.
*/
@Override
@InterfaceAudience.Private
public StoreContext createStoreContext() {
return new StoreContextBuilder().setFsURI(getUri())
@ -5768,4 +5733,36 @@ public boolean isMultipartUploadEnabled() {
return isMultipartUploadEnabled;
}
/**
* S3A implementation to create a bulk delete operation using
* which actual bulk delete calls can be made.
* @return an implementation of the bulk delete.
*/
@Override
public BulkDelete createBulkDelete(final Path path)
throws IllegalArgumentException, IOException {
final Path p = makeQualified(path);
final AuditSpanS3A span = createSpan("bulkdelete", p.toString(), null);
final int size = enableMultiObjectsDelete ? pageSize : 1;
return new BulkDeleteOperation(
createStoreContext(),
createBulkDeleteCallbacks(p, size, span),
p,
size,
span);
}
/**
* Create the callbacks for the bulk delete operation.
* @param path path to delete.
* @param pageSize page size.
* @param span span for operations.
* @return an instance of the Bulk Delete callbacks.
*/
protected BulkDeleteOperation.BulkDeleteOperationCallbacks createBulkDeleteCallbacks(
Path path, int pageSize, AuditSpanS3A span) {
return new BulkDeleteOperationCallbacksImpl(store, pathToKey(path), pageSize, span);
}
}

View File

@ -33,6 +33,9 @@
/**
* This is an unstable interface for access to S3A Internal state, S3 operations
* and the S3 client connector itself.
* <p>
* Note for maintainers: this is documented in {@code aws_sdk_upgrade.md}; update
* on changes.
*/
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate("testing/diagnostics")
@ -52,13 +55,19 @@ public interface S3AInternals {
* set to false.
* <p>
* Mocking note: this is the same S3Client as is used by the owning
* filesystem; changes to this client will be reflected by changes
* filesystem and S3AStore; changes to this client will be reflected by changes
* in the behavior of that filesystem.
* @param reason a justification for requesting access.
* @return S3Client
*/
S3Client getAmazonS3Client(String reason);
/**
* Get the store for low-level operations.
* @return the store the S3A FS is working through.
*/
S3AStore getStore();
/**
* Get the region of a bucket.
* Invoked from StoreContext; consider an entry point.
@ -131,4 +140,5 @@ public interface S3AInternals {
@AuditEntryPoint
@Retries.RetryTranslated
long abortMultipartUploads(Path path) throws IOException;
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
/**
* Interface for the S3A Store;
* S3 client interactions should be via this; mocking
* is possible for unit tests.
*/
@InterfaceAudience.LimitedPrivate("Extensions")
@InterfaceStability.Unstable
public interface S3AStore extends IOStatisticsSource {
/**
* Acquire write capacity for operations.
* This should be done within retry loops.
* @param capacity capacity to acquire.
* @return time spent waiting for output.
*/
Duration acquireWriteCapacity(int capacity);
/**
* Acquire read capacity for operations.
* This should be done within retry loops.
* @param capacity capacity to acquire.
* @return time spent waiting for output.
*/
Duration acquireReadCapacity(int capacity);
StoreContext getStoreContext();
DurationTrackerFactory getDurationTrackerFactory();
S3AStatisticsContext getStatisticsContext();
RequestFactory getRequestFactory();
/**
* Perform a bulk object delete operation against S3.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
* operation statistics
* <p>
* {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
* of objects deleted in the request.
* <p>
* Retry policy: retry untranslated; delete considered idempotent.
* If the request is throttled, this is logged in the throttle statistics,
* with the counter set to the number of keys, rather than the number
* of invocations of the delete operation.
* This is because S3 considers each key as one mutating operation on
* the store when updating its load counters on a specific partition
* of an S3 bucket.
* If only the request was measured, this operation would under-report.
* A write capacity will be requested proportional to the number of keys
* preset in the request and will be re-requested during retries such that
* retries throttle better. If the request is throttled, the time spent is
* recorded in a duration IOStat named {@code STORE_IO_RATE_LIMITED_DURATION}.
* @param deleteRequest keys to delete on the s3-backend
* @return the AWS response
* @throws MultiObjectDeleteException one or more of the keys could not
* be deleted.
* @throws SdkException amazon-layer failure.
* @throws IOException IO problems.
*/
@Retries.RetryRaw
Map.Entry<Duration, DeleteObjectsResponse> deleteObjects(DeleteObjectsRequest deleteRequest)
throws MultiObjectDeleteException, SdkException, IOException;
/**
* Delete an object.
* Increments the {@code OBJECT_DELETE_REQUESTS} statistics.
* <p>
* Retry policy: retry untranslated; delete considered idempotent.
* 404 errors other than bucket not found are swallowed;
* this can be raised by third party stores (GCS).
* <p>
* A write capacity of 1 ( as it is signle object delete) will be requested before
* the delete call and will be re-requested during retries such that
* retries throttle better. If the request is throttled, the time spent is
* recorded in a duration IOStat named {@code STORE_IO_RATE_LIMITED_DURATION}.
* If an exception is caught and swallowed, the response will be empty;
* otherwise it will be the response from the delete operation.
* @param request request to make
* @return the total duration and response.
* @throws SdkException problems working with S3
* @throws IllegalArgumentException if the request was rejected due to
* a mistaken attempt to delete the root directory.
*/
@Retries.RetryRaw
Map.Entry<Duration, Optional<DeleteObjectResponse>> deleteObject(
DeleteObjectRequest request) throws SdkException;
}

View File

@ -103,6 +103,10 @@ public enum Statistic {
StoreStatisticNames.OP_ACCESS,
"Calls of access()",
TYPE_DURATION),
INVOCATION_BULK_DELETE(
StoreStatisticNames.OP_BULK_DELETE,
"Calls of bulk delete()",
TYPE_COUNTER),
INVOCATION_COPY_FROM_LOCAL_FILE(
StoreStatisticNames.OP_COPY_FROM_LOCAL_FILE,
"Calls of copyFromLocalFile()",
@ -539,6 +543,10 @@ public enum Statistic {
"retried requests made of the remote store",
TYPE_COUNTER),
STORE_IO_RATE_LIMITED(StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION,
"Duration of rate limited operations",
TYPE_DURATION),
STORE_IO_THROTTLED(
StoreStatisticNames.STORE_IO_THROTTLED,
"Requests throttled and retried",

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.util.functional.Tuples;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.fs.BulkDeleteUtils.validatePathIsUnderParent;
import static org.apache.hadoop.util.Preconditions.checkArgument;
/**
* S3A Implementation of the {@link BulkDelete} interface.
*/
public class BulkDeleteOperation extends AbstractStoreOperation implements BulkDelete {
private final BulkDeleteOperationCallbacks callbacks;
private final Path basePath;
private final int pageSize;
public BulkDeleteOperation(
final StoreContext storeContext,
final BulkDeleteOperationCallbacks callbacks,
final Path basePath,
final int pageSize,
final AuditSpan span) {
super(storeContext, span);
this.callbacks = requireNonNull(callbacks);
this.basePath = requireNonNull(basePath);
checkArgument(pageSize > 0, "Page size must be greater than 0");
this.pageSize = pageSize;
}
@Override
public int pageSize() {
return pageSize;
}
@Override
public Path basePath() {
return basePath;
}
/**
* {@inheritDoc}
*/
@Override
public List<Map.Entry<Path, String>> bulkDelete(final Collection<Path> paths)
throws IOException, IllegalArgumentException {
requireNonNull(paths);
checkArgument(paths.size() <= pageSize,
"Number of paths (%d) is larger than the page size (%d)", paths.size(), pageSize);
final StoreContext context = getStoreContext();
final List<ObjectIdentifier> objects = paths.stream().map(p -> {
checkArgument(p.isAbsolute(), "Path %s is not absolute", p);
checkArgument(validatePathIsUnderParent(p, basePath),
"Path %s is not under the base path %s", p, basePath);
final String k = context.pathToKey(p);
return ObjectIdentifier.builder().key(k).build();
}).collect(toList());
final List<Map.Entry<String, String>> errors = callbacks.bulkDelete(objects);
if (!errors.isEmpty()) {
final List<Map.Entry<Path, String>> outcomeElements = errors
.stream()
.map(error -> Tuples.pair(
context.keyToPath(error.getKey()),
error.getValue()
))
.collect(toList());
return outcomeElements;
}
return emptyList();
}
@Override
public void close() throws IOException {
}
/**
* Callbacks for the bulk delete operation.
*/
public interface BulkDeleteOperationCallbacks {
/**
* Perform a bulk delete operation.
* @param keys key list
* @return paths which failed to delete (if any).
* @throws IOException IO Exception.
* @throws IllegalArgumentException illegal arguments
*/
@Retries.RetryTranslated
List<Map.Entry<String, String>> bulkDelete(final List<ObjectIdentifier> keys)
throws IOException, IllegalArgumentException;
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Error;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AStore;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.util.functional.Tuples;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.hadoop.fs.s3a.Invoker.once;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.functional.Tuples.pair;
/**
* Callbacks for the bulk delete operation.
*/
public class BulkDeleteOperationCallbacksImpl implements
BulkDeleteOperation.BulkDeleteOperationCallbacks {
/**
* Path for logging.
*/
private final String path;
/** Page size for bulk delete. */
private final int pageSize;
/** span for operations. */
private final AuditSpan span;
/**
* Store.
*/
private final S3AStore store;
public BulkDeleteOperationCallbacksImpl(final S3AStore store,
String path, int pageSize, AuditSpan span) {
this.span = span;
this.pageSize = pageSize;
this.path = path;
this.store = store;
}
@Override
@Retries.RetryTranslated
public List<Map.Entry<String, String>> bulkDelete(final List<ObjectIdentifier> keysToDelete)
throws IOException, IllegalArgumentException {
span.activate();
final int size = keysToDelete.size();
checkArgument(size <= pageSize,
"Too many paths to delete in one operation: %s", size);
if (size == 0) {
return emptyList();
}
if (size == 1) {
return deleteSingleObject(keysToDelete.get(0).key());
}
final DeleteObjectsResponse response = once("bulkDelete", path, () ->
store.deleteObjects(store.getRequestFactory()
.newBulkDeleteRequestBuilder(keysToDelete)
.build())).getValue();
final List<S3Error> errors = response.errors();
if (errors.isEmpty()) {
// all good.
return emptyList();
} else {
return errors.stream()
.map(e -> pair(e.key(), e.toString()))
.collect(Collectors.toList());
}
}
/**
* Delete a single object.
* @param key key to delete
* @return list of keys which failed to delete: length 0 or 1.
* @throws IOException IO problem other than AccessDeniedException
*/
@Retries.RetryTranslated
private List<Map.Entry<String, String>> deleteSingleObject(final String key) throws IOException {
try {
once("bulkDelete", path, () ->
store.deleteObject(store.getRequestFactory()
.newDeleteObjectRequestBuilder(key)
.build()));
} catch (AccessDeniedException e) {
return singletonList(pair(key, e.toString()));
}
return emptyList();
}
}

View File

@ -118,11 +118,7 @@ public IOException translateException(final String message) {
String exitCode = "";
for (S3Error error : errors()) {
String code = error.code();
String item = String.format("%s: %s%s: %s%n", code, error.key(),
(error.versionId() != null
? (" (" + error.versionId() + ")")
: ""),
error.message());
String item = errorToString(error);
LOG.info(item);
result.append(item);
if (exitCode == null || exitCode.isEmpty() || ACCESS_DENIED.equals(code)) {
@ -136,4 +132,18 @@ public IOException translateException(final String message) {
return new AWSS3IOException(result.toString(), this);
}
}
/**
* Convert an error to a string.
* @param error error from a delete request
* @return string value
*/
public static String errorToString(final S3Error error) {
String code = error.code();
return String.format("%s: %s%s: %s%n", code, error.key(),
(error.versionId() != null
? (" (" + error.versionId() + ")")
: ""),
error.message());
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import software.amazon.awssdk.services.s3.S3Client;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.S3AStore;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.util.RateLimiting;
/**
* Builder for the S3AStore.
*/
public class S3AStoreBuilder {
private StoreContextFactory storeContextFactory;
private S3Client s3Client;
private DurationTrackerFactory durationTrackerFactory;
private S3AInstrumentation instrumentation;
private S3AStatisticsContext statisticsContext;
private S3AStorageStatistics storageStatistics;
private RateLimiting readRateLimiter;
private RateLimiting writeRateLimiter;
private AuditSpanSource<AuditSpanS3A> auditSpanSource;
public S3AStoreBuilder withStoreContextFactory(
final StoreContextFactory storeContextFactoryValue) {
this.storeContextFactory = storeContextFactoryValue;
return this;
}
public S3AStoreBuilder withS3Client(
final S3Client s3ClientValue) {
this.s3Client = s3ClientValue;
return this;
}
public S3AStoreBuilder withDurationTrackerFactory(
final DurationTrackerFactory durationTrackerFactoryValue) {
this.durationTrackerFactory = durationTrackerFactoryValue;
return this;
}
public S3AStoreBuilder withInstrumentation(
final S3AInstrumentation instrumentationValue) {
this.instrumentation = instrumentationValue;
return this;
}
public S3AStoreBuilder withStatisticsContext(
final S3AStatisticsContext statisticsContextValue) {
this.statisticsContext = statisticsContextValue;
return this;
}
public S3AStoreBuilder withStorageStatistics(
final S3AStorageStatistics storageStatisticsValue) {
this.storageStatistics = storageStatisticsValue;
return this;
}
public S3AStoreBuilder withReadRateLimiter(
final RateLimiting readRateLimiterValue) {
this.readRateLimiter = readRateLimiterValue;
return this;
}
public S3AStoreBuilder withWriteRateLimiter(
final RateLimiting writeRateLimiterValue) {
this.writeRateLimiter = writeRateLimiterValue;
return this;
}
public S3AStoreBuilder withAuditSpanSource(
final AuditSpanSource<AuditSpanS3A> auditSpanSourceValue) {
this.auditSpanSource = auditSpanSourceValue;
return this;
}
public S3AStore build() {
return new S3AStoreImpl(storeContextFactory, s3Client, durationTrackerFactory, instrumentation,
statisticsContext, storageStatistics, readRateLimiter, writeRateLimiter, auditSpanSource);
}
}

View File

@ -0,0 +1,400 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.S3Error;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.S3AStore;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.RateLimiting;
import org.apache.hadoop.util.functional.Tuples;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.util.Preconditions.checkArgument;
/**
* Store Layer.
* This is where lower level storage operations are intended
* to move.
*/
public class S3AStoreImpl implements S3AStore {
private static final Logger LOG = LoggerFactory.getLogger(S3AStoreImpl.class);
/** Factory to create store contexts. */
private final StoreContextFactory storeContextFactory;
/** The S3 client used to communicate with S3 bucket. */
private final S3Client s3Client;
/** The S3 bucket to communicate with. */
private final String bucket;
/** Request factory for creating requests. */
private final RequestFactory requestFactory;
/** Async client is used for transfer manager. */
private S3AsyncClient s3AsyncClient;
/** Duration tracker factory. */
private final DurationTrackerFactory durationTrackerFactory;
/** The core instrumentation. */
private final S3AInstrumentation instrumentation;
/** Accessors to statistics for this FS. */
private final S3AStatisticsContext statisticsContext;
/** Storage Statistics Bonded to the instrumentation. */
private final S3AStorageStatistics storageStatistics;
/** Rate limiter for read operations. */
private final RateLimiting readRateLimiter;
/** Rate limiter for write operations. */
private final RateLimiting writeRateLimiter;
/** Store context. */
private final StoreContext storeContext;
/** Invoker for retry operations. */
private final Invoker invoker;
/** Audit span source. */
private final AuditSpanSource<AuditSpanS3A> auditSpanSource;
/** Constructor to create S3A store. */
S3AStoreImpl(StoreContextFactory storeContextFactory,
S3Client s3Client,
DurationTrackerFactory durationTrackerFactory,
S3AInstrumentation instrumentation,
S3AStatisticsContext statisticsContext,
S3AStorageStatistics storageStatistics,
RateLimiting readRateLimiter,
RateLimiting writeRateLimiter,
AuditSpanSource<AuditSpanS3A> auditSpanSource) {
this.storeContextFactory = requireNonNull(storeContextFactory);
this.s3Client = requireNonNull(s3Client);
this.durationTrackerFactory = requireNonNull(durationTrackerFactory);
this.instrumentation = requireNonNull(instrumentation);
this.statisticsContext = requireNonNull(statisticsContext);
this.storageStatistics = requireNonNull(storageStatistics);
this.readRateLimiter = requireNonNull(readRateLimiter);
this.writeRateLimiter = requireNonNull(writeRateLimiter);
this.auditSpanSource = requireNonNull(auditSpanSource);
this.storeContext = requireNonNull(storeContextFactory.createStoreContext());
this.invoker = storeContext.getInvoker();
this.bucket = storeContext.getBucket();
this.requestFactory = storeContext.getRequestFactory();
}
/** Acquire write capacity for rate limiting {@inheritDoc}. */
@Override
public Duration acquireWriteCapacity(final int capacity) {
return writeRateLimiter.acquire(capacity);
}
/** Acquire read capacity for rate limiting {@inheritDoc}. */
@Override
public Duration acquireReadCapacity(final int capacity) {
return readRateLimiter.acquire(capacity);
}
/**
* Create a new store context.
* @return a new store context.
*/
private StoreContext createStoreContext() {
return storeContextFactory.createStoreContext();
}
@Override
public StoreContext getStoreContext() {
return storeContext;
}
private S3Client getS3Client() {
return s3Client;
}
@Override
public DurationTrackerFactory getDurationTrackerFactory() {
return durationTrackerFactory;
}
private S3AInstrumentation getInstrumentation() {
return instrumentation;
}
@Override
public S3AStatisticsContext getStatisticsContext() {
return statisticsContext;
}
private S3AStorageStatistics getStorageStatistics() {
return storageStatistics;
}
@Override
public RequestFactory getRequestFactory() {
return requestFactory;
}
/**
* Increment a statistic by 1.
* This increments both the instrumentation and storage statistics.
* @param statistic The operation to increment
*/
protected void incrementStatistic(Statistic statistic) {
incrementStatistic(statistic, 1);
}
/**
* Increment a statistic by a specific value.
* This increments both the instrumentation and storage statistics.
* @param statistic The operation to increment
* @param count the count to increment
*/
protected void incrementStatistic(Statistic statistic, long count) {
statisticsContext.incrementCounter(statistic, count);
}
/**
* Decrement a gauge by a specific value.
* @param statistic The operation to decrement
* @param count the count to decrement
*/
protected void decrementGauge(Statistic statistic, long count) {
statisticsContext.decrementGauge(statistic, count);
}
/**
* Increment a gauge by a specific value.
* @param statistic The operation to increment
* @param count the count to increment
*/
protected void incrementGauge(Statistic statistic, long count) {
statisticsContext.incrementGauge(statistic, count);
}
/**
* Callback when an operation was retried.
* Increments the statistics of ignored errors or throttled requests,
* depending up on the exception class.
* @param ex exception.
*/
public void operationRetried(Exception ex) {
if (isThrottleException(ex)) {
LOG.debug("Request throttled");
incrementStatistic(STORE_IO_THROTTLED);
statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
} else {
incrementStatistic(STORE_IO_RETRY);
incrementStatistic(IGNORED_ERRORS);
}
}
/**
* Callback from {@link Invoker} when an operation is retried.
* @param text text of the operation
* @param ex exception
* @param retries number of retries
* @param idempotent is the method idempotent
*/
public void operationRetried(String text, Exception ex, int retries, boolean idempotent) {
operationRetried(ex);
}
/**
* Get the instrumentation's IOStatistics.
* @return statistics
*/
@Override
public IOStatistics getIOStatistics() {
return instrumentation.getIOStatistics();
}
/**
* Start an operation; this informs the audit service of the event
* and then sets it as the active span.
* @param operation operation name.
* @param path1 first path of operation
* @param path2 second path of operation
* @return a span for the audit
* @throws IOException failure
*/
public AuditSpanS3A createSpan(String operation, @Nullable String path1, @Nullable String path2)
throws IOException {
return auditSpanSource.createSpan(operation, path1, path2);
}
/**
* Reject any request to delete an object where the key is root.
* @param key key to validate
* @throws IllegalArgumentException if the request was rejected due to
* a mistaken attempt to delete the root directory.
*/
private void blockRootDelete(String key) throws IllegalArgumentException {
checkArgument(!key.isEmpty() && !"/".equals(key), "Bucket %s cannot be deleted", bucket);
}
/**
* {@inheritDoc}.
*/
@Override
@Retries.RetryRaw
public Map.Entry<Duration, DeleteObjectsResponse> deleteObjects(
final DeleteObjectsRequest deleteRequest)
throws SdkException {
DeleteObjectsResponse response;
BulkDeleteRetryHandler retryHandler = new BulkDeleteRetryHandler(createStoreContext());
final List<ObjectIdentifier> keysToDelete = deleteRequest.delete().objects();
int keyCount = keysToDelete.size();
if (LOG.isDebugEnabled()) {
LOG.debug("Initiating delete operation for {} objects", keysToDelete.size());
keysToDelete.stream().forEach(objectIdentifier -> {
LOG.debug(" \"{}\" {}", objectIdentifier.key(),
objectIdentifier.versionId() != null ? objectIdentifier.versionId() : "");
});
}
// block root calls
keysToDelete.stream().map(ObjectIdentifier::key).forEach(this::blockRootDelete);
try (DurationInfo d = new DurationInfo(LOG, false, "DELETE %d keys", keyCount)) {
response =
invoker.retryUntranslated("delete",
DELETE_CONSIDERED_IDEMPOTENT, (text, e, r, i) -> {
// handle the failure
retryHandler.bulkDeleteRetried(deleteRequest, e);
},
// duration is tracked in the bulk delete counters
trackDurationOfOperation(getDurationTrackerFactory(),
OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> {
// acquire the write capacity for the number of keys to delete and record the duration.
Duration durationToAcquireWriteCapacity = acquireWriteCapacity(keyCount);
instrumentation.recordDuration(STORE_IO_RATE_LIMITED,
true,
durationToAcquireWriteCapacity);
incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
return s3Client.deleteObjects(deleteRequest);
}));
if (!response.errors().isEmpty()) {
// one or more of the keys could not be deleted.
// log and then throw
List<S3Error> errors = response.errors();
if (LOG.isDebugEnabled()) {
LOG.debug("Partial failure of delete, {} errors", errors.size());
for (S3Error error : errors) {
LOG.debug("{}: \"{}\" - {}", error.key(), error.code(), error.message());
}
}
}
d.close();
return Tuples.pair(d.asDuration(), response);
} catch (IOException e) {
// this is part of the retry signature, nothing else.
// convert to unchecked.
throw new UncheckedIOException(e);
}
}
/**
* {@inheritDoc}.
*/
@Override
@Retries.RetryRaw
public Map.Entry<Duration, Optional<DeleteObjectResponse>> deleteObject(
final DeleteObjectRequest request)
throws SdkException {
String key = request.key();
blockRootDelete(key);
DurationInfo d = new DurationInfo(LOG, false, "deleting %s", key);
try {
DeleteObjectResponse response =
invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key),
DELETE_CONSIDERED_IDEMPOTENT,
trackDurationOfOperation(getDurationTrackerFactory(),
OBJECT_DELETE_REQUEST.getSymbol(), () -> {
incrementStatistic(OBJECT_DELETE_OBJECTS);
// We try to acquire write capacity just before delete call.
Duration durationToAcquireWriteCapacity = acquireWriteCapacity(1);
instrumentation.recordDuration(STORE_IO_RATE_LIMITED,
true, durationToAcquireWriteCapacity);
return s3Client.deleteObject(request);
}));
d.close();
return Tuples.pair(d.asDuration(), Optional.of(response));
} catch (AwsServiceException ase) {
// 404 errors get swallowed; this can be raised by
// third party stores (GCS).
if (!isObjectNotFound(ase)) {
throw ase;
}
d.close();
return Tuples.pair(d.asDuration(), Optional.empty());
} catch (IOException e) {
// this is part of the retry signature, nothing else.
// convert to unchecked.
throw new UncheckedIOException(e);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Factory for creating store contexts.
*/
@InterfaceAudience.Private
public interface StoreContextFactory {
/**
* Build an immutable store context, including picking
* up the current audit span.
* @return the store context.
*/
StoreContext createStoreContext();
}

View File

@ -324,6 +324,7 @@ They have also been updated to return V2 SDK classes.
public interface S3AInternals {
S3Client getAmazonS3V2Client(String reason);
S3AStore getStore();
@Retries.RetryTranslated
@AuditEntryPoint
String getBucketLocation() throws IOException;

View File

@ -59,7 +59,7 @@ To make most efficient use of S3, care is needed.
The S3A FileSystem supports implementation of vectored read api using which
a client can provide a list of file ranges to read returning a future read
object associated with each range. For full api specification please see
[FSDataInputStream](../../hadoop-common-project/hadoop-common/filesystem/fsdatainputstream.html).
[FSDataInputStream](../../../../../../hadoop-common-project/hadoop-common/target/site/filesystem/fsdatainputstream.html).
The following properties can be configured to optimise vectored reads based
on the client requirements.
@ -94,6 +94,86 @@ on the client requirements.
</property>
```
## <a name="bulkdelete"></a> Improving delete performance through bulkdelete API.
For bulk delete API spec refer to File System specification. [BulkDelete](../../../../../../hadoop-common-project/hadoop-common/target/site/filesystem/bulkdelete.html)
The S3A client exports this API.
### S3A Implementation of Bulk Delete.
If multi-object delete is enabled (`fs.s3a.multiobjectdelete.enable` = true), as
it is by default, then the page size is limited to that defined in
`fs.s3a.bulk.delete.page.size`, which MUST be less than or equal to 1000.
* The entire list of paths to delete is aggregated into a single bulk delete request,
issued to the store.
* Provided the caller has the correct permissions, every entry in the list
will, if the path references an object, cause that object to be deleted.
* If the path does not reference an object: the path will not be deleted
"This is for deleting objects, not directories"
* No probes for the existence of parent directories will take place; no
parent directory markers will be created.
"If you need parent directories, call mkdir() yourself"
* The list of failed keys listed in the `DeleteObjectsResponse` response
are converted into paths and returned along with their error messages.
* Network and other IO errors are raised as exceptions.
If multi-object delete is disabled (or the list of size 1)
* A single `DELETE` call is issued
* Any `AccessDeniedException` raised is converted to a result in the error list.
* Any 404 response from a (non-AWS) store will be ignored.
* Network and other IO errors are raised as exceptions.
Because there are no probes to ensure the call does not overwrite a directory,
or to see if a parentDirectory marker needs to be created,
this API is still faster than issuing a normal `FileSystem.delete(path)` call.
That is: all the overhead normally undertaken to preserve the Posix System model are omitted.
### S3 Scalability and Performance
Every entry in a bulk delete request counts as one write operation
against AWS S3 storage.
With the default write rate under a prefix on AWS S3 Standard storage
restricted to 3,500 writes/second, it is very easy to overload
the store by issuing a few bulk delete requests simultaneously.
* If throttling is triggered then all clients interacting with
the store may observe performance issues.
* The write quota applies even for paths which do not exist.
* The S3A client *may* perform rate throttling as well as page size limiting.
What does that mean? it means that attempting to issue multiple
bulk delete calls in parallel can be counterproductive.
When overloaded, the S3 store returns a 403 throttle response.
This will trigger it back off and retry of posting the request.
However, the repeated request will still include the same number of objects and
*so generate the same load*.
This can lead to a pathological situation where the repeated requests will
never be satisfied because the request itself is sufficient to overload the store.
See [HADOOP-16823.Large DeleteObject requests are their own Thundering Herd]
(https://issues.apache.org/jira/browse/HADOOP-16823)
for an example of where this did actually surface in production.
This is why the default page size of S3A clients is 250 paths, not the store limit of 1000 entries.
It is also why the S3A delete/rename operations do not attempt to do massive parallel deletions,
Instead bulk delete requests are queued for a single blocking thread to issue.
Consider a similar design.
When working with versioned S3 buckets, every path deleted will add a tombstone marker
to the store at that location, even if there was no object at that path.
While this has no negative performance impact on the bulk delete call,
it will slow down list requests subsequently made against that path.
That is: bulk delete requests of paths which do not exist will hurt future queries.
Avoid this. Note also that TPC-DS Benchmark do not create the right load to make the
performance problems observable -but they can surface in production.
* Configure buckets to have a limited number of days for tombstones to be preserved.
* Do not delete paths which you know reference nonexistent files or directories.
## <a name="fadvise"></a> Improving data input performance through fadvise
The S3A Filesystem client supports the notion of input policies, similar

View File

@ -0,0 +1,230 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.s3a;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.statistics.MeanStatistic;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED_DURATION;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MEAN;
import static org.apache.hadoop.io.wrappedio.WrappedIO.bulkDelete_delete;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Contract tests for bulk delete operation for S3A Implementation.
*/
@RunWith(Parameterized.class)
public class ITestS3AContractBulkDelete extends AbstractContractBulkDeleteTest {
private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractBulkDelete.class);
/**
* Delete Page size: {@value}.
* This is the default page size for bulk delete operation for this contract test.
* All the tests in this class should pass number of paths equal to or less than
* this page size during the bulk delete operation.
*/
private static final int DELETE_PAGE_SIZE = 20;
private final boolean enableMultiObjectDelete;
@Parameterized.Parameters(name = "enableMultiObjectDelete = {0}")
public static Iterable<Object[]> enableMultiObjectDelete() {
return Arrays.asList(new Object[][]{
{true},
{false}
});
}
public ITestS3AContractBulkDelete(boolean enableMultiObjectDelete) {
this.enableMultiObjectDelete = enableMultiObjectDelete;
}
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
conf = propagateBucketOptions(conf, getTestBucketName(conf));
if (enableMultiObjectDelete) {
// if multi-object delete is disabled, skip the test.
skipIfNotEnabled(conf, Constants.ENABLE_MULTI_DELETE,
"Bulk delete is explicitly disabled for this bucket");
}
S3ATestUtils.removeBaseAndBucketOverrides(conf,
Constants.BULK_DELETE_PAGE_SIZE);
conf.setInt(Constants.BULK_DELETE_PAGE_SIZE, DELETE_PAGE_SIZE);
conf.setBoolean(Constants.ENABLE_MULTI_DELETE, enableMultiObjectDelete);
return conf;
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(createConfiguration());
}
@Override
protected int getExpectedPageSize() {
if (!enableMultiObjectDelete) {
// if multi-object delete is disabled, page size should be 1.
return 1;
}
return DELETE_PAGE_SIZE;
}
@Override
public void validatePageSize() throws Exception {
Assertions.assertThat(pageSize)
.describedAs("Page size should match the configured page size")
.isEqualTo(getExpectedPageSize());
}
@Test
public void testBulkDeleteZeroPageSizePrecondition() throws Exception {
if (!enableMultiObjectDelete) {
// if multi-object delete is disabled, skip this test as
// page size is always 1.
skip("Multi-object delete is disabled");
}
Configuration conf = getContract().getConf();
conf.setInt(Constants.BULK_DELETE_PAGE_SIZE, 0);
Path testPath = path(getMethodName());
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
intercept(IllegalArgumentException.class,
() -> fs.createBulkDelete(testPath));
}
}
@Test
public void testPageSizeWhenMultiObjectsDisabled() throws Exception {
Configuration conf = getContract().getConf();
conf.setBoolean(Constants.ENABLE_MULTI_DELETE, false);
Path testPath = path(getMethodName());
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
BulkDelete bulkDelete = fs.createBulkDelete(testPath);
Assertions.assertThat(bulkDelete.pageSize())
.describedAs("Page size should be 1 when multi-object delete is disabled")
.isEqualTo(1);
}
}
@Override
public void testDeletePathsDirectory() throws Exception {
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
fs.mkdirs(dirPath);
paths.add(dirPath);
Path filePath = new Path(dirPath, "file");
touch(fs, filePath);
if (enableMultiObjectDelete) {
// Adding more paths only if multi-object delete is enabled.
paths.add(filePath);
}
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
// During the bulk delete operation, the directories are not deleted in S3A.
assertIsDirectory(dirPath);
}
@Test
public void testBulkDeleteParentDirectoryWithDirectories() throws Exception {
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
fs.mkdirs(dirPath);
Path subDir = new Path(dirPath, "subdir");
fs.mkdirs(subDir);
// adding parent directory to the list of paths.
paths.add(dirPath);
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
// During the bulk delete operation, the directories are not deleted in S3A.
assertIsDirectory(dirPath);
assertIsDirectory(subDir);
}
public void testBulkDeleteParentDirectoryWithFiles() throws Exception {
List<Path> paths = new ArrayList<>();
Path dirPath = new Path(basePath, "dir");
fs.mkdirs(dirPath);
Path file = new Path(dirPath, "file");
touch(fs, file);
// adding parent directory to the list of paths.
paths.add(dirPath);
assertSuccessfulBulkDelete(bulkDelete_delete(getFileSystem(), basePath, paths));
// During the bulk delete operation,
// the directories are not deleted in S3A.
assertIsDirectory(dirPath);
}
@Test
public void testRateLimiting() throws Exception {
if (!enableMultiObjectDelete) {
skip("Multi-object delete is disabled so hard to trigger rate limiting");
}
Configuration conf = getContract().getConf();
conf.setInt(Constants.S3A_IO_RATE_LIMIT, 5);
Path basePath = path(getMethodName());
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
createFiles(fs, basePath, 1, 20, 0);
FileStatus[] fileStatuses = fs.listStatus(basePath);
List<Path> paths = Arrays.stream(fileStatuses)
.map(FileStatus::getPath)
.collect(toList());
pageSizePreconditionForTest(paths.size());
BulkDelete bulkDelete = fs.createBulkDelete(basePath);
bulkDelete.bulkDelete(paths);
MeanStatistic meanStatisticBefore = lookupMeanStatistic(fs.getIOStatistics(),
STORE_IO_RATE_LIMITED_DURATION + SUFFIX_MEAN);
Assertions.assertThat(meanStatisticBefore.mean())
.describedAs("Rate limiting should not have happened during first delete call")
.isEqualTo(0.0);
bulkDelete.bulkDelete(paths);
bulkDelete.bulkDelete(paths);
bulkDelete.bulkDelete(paths);
MeanStatistic meanStatisticAfter = lookupMeanStatistic(fs.getIOStatistics(),
STORE_IO_RATE_LIMITED_DURATION + SUFFIX_MEAN);
Assertions.assertThat(meanStatisticAfter.mean())
.describedAs("Rate limiting should have happened during multiple delete calls")
.isGreaterThan(0.0);
}
}
}

View File

@ -35,8 +35,7 @@
/**
* Abstract base class for S3A unit tests using a mock S3 client and a null
* metadata store.
* Abstract base class for S3A unit tests using a mock S3 client.
*/
public abstract class AbstractS3AMockTest {

View File

@ -61,9 +61,8 @@ public boolean deleteOnExit(Path f) throws IOException {
// processDeleteOnExit.
@Override
protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOException {
boolean result = super.deleteWithoutCloseCheck(f, recursive);
deleteOnDnExitCount--;
return result;
return true;
}
}

View File

@ -23,7 +23,11 @@
import java.io.IOException;
import java.net.URI;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
@ -35,9 +39,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BulkDelete;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@ -55,7 +60,10 @@
import org.apache.hadoop.fs.s3a.impl.InstantiationIOException;
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
import org.apache.hadoop.io.wrappedio.WrappedIO;
import static org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest.assertSuccessfulBulkDelete;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Constants.S3EXPRESS_CREATE_SESSION;
@ -702,6 +710,122 @@ public void testPartialDeleteSingleDelete() throws Throwable {
executePartialDelete(createAssumedRoleConfig(), true);
}
@Test
public void testBulkDeleteOnReadOnlyAccess() throws Throwable {
describe("Bulk delete with part of the child tree read only");
executeBulkDeleteOnReadOnlyFiles(createAssumedRoleConfig());
}
@Test
public void testBulkDeleteWithReadWriteAccess() throws Throwable {
describe("Bulk delete with read write access");
executeBulkDeleteOnSomeReadOnlyFiles(createAssumedRoleConfig());
}
/**
* Execute bulk delete on read only files and some read write files.
*/
private void executeBulkDeleteOnReadOnlyFiles(Configuration assumedRoleConfig) throws Exception {
Path destDir = methodPath();
Path readOnlyDir = new Path(destDir, "readonlyDir");
// the full FS
S3AFileSystem fs = getFileSystem();
WrappedIO.bulkDelete_delete(fs, destDir, new ArrayList<>());
bindReadOnlyRolePolicy(assumedRoleConfig, readOnlyDir);
roleFS = (S3AFileSystem) destDir.getFileSystem(assumedRoleConfig);
int bulkDeletePageSize = WrappedIO.bulkDelete_PageSize(roleFS, destDir);
int range = bulkDeletePageSize == 1 ? bulkDeletePageSize : 10;
touchFiles(fs, readOnlyDir, range);
touchFiles(roleFS, destDir, range);
FileStatus[] fileStatuses = roleFS.listStatus(readOnlyDir);
List<Path> pathsToDelete = Arrays.stream(fileStatuses)
.map(FileStatus::getPath)
.collect(Collectors.toList());
// bulk delete in the read only FS should fail.
BulkDelete bulkDelete = roleFS.createBulkDelete(readOnlyDir);
assertAccessDeniedForEachPath(bulkDelete.bulkDelete(pathsToDelete));
BulkDelete bulkDelete2 = roleFS.createBulkDelete(destDir);
assertAccessDeniedForEachPath(bulkDelete2.bulkDelete(pathsToDelete));
// delete the files in the original FS should succeed.
BulkDelete bulkDelete3 = fs.createBulkDelete(readOnlyDir);
assertSuccessfulBulkDelete(bulkDelete3.bulkDelete(pathsToDelete));
FileStatus[] fileStatusesUnderDestDir = roleFS.listStatus(destDir);
List<Path> pathsToDeleteUnderDestDir = Arrays.stream(fileStatusesUnderDestDir)
.map(FileStatus::getPath)
.collect(Collectors.toList());
BulkDelete bulkDelete4 = fs.createBulkDelete(destDir);
assertSuccessfulBulkDelete(bulkDelete4.bulkDelete(pathsToDeleteUnderDestDir));
}
/**
* Execute bulk delete on some read only files and some read write files.
*/
private void executeBulkDeleteOnSomeReadOnlyFiles(Configuration assumedRoleConfig)
throws IOException {
Path destDir = methodPath();
Path readOnlyDir = new Path(destDir, "readonlyDir");
bindReadOnlyRolePolicy(assumedRoleConfig, readOnlyDir);
roleFS = (S3AFileSystem) destDir.getFileSystem(assumedRoleConfig);
S3AFileSystem fs = getFileSystem();
if (WrappedIO.bulkDelete_PageSize(fs, destDir) == 1) {
String msg = "Skipping as this test requires more than one path to be deleted in bulk";
LOG.debug(msg);
skip(msg);
}
WrappedIO.bulkDelete_delete(fs, destDir, new ArrayList<>());
// creating 5 files in the read only dir.
int readOnlyRange = 5;
int readWriteRange = 3;
touchFiles(fs, readOnlyDir, readOnlyRange);
// creating 3 files in the base destination dir.
touchFiles(roleFS, destDir, readWriteRange);
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = roleFS.listFiles(destDir, true);
List<Path> pathsToDelete2 = new ArrayList<>();
while (locatedFileStatusRemoteIterator.hasNext()) {
pathsToDelete2.add(locatedFileStatusRemoteIterator.next().getPath());
}
Assertions.assertThat(pathsToDelete2.size())
.describedAs("Number of paths to delete in base destination dir")
.isEqualTo(readOnlyRange + readWriteRange);
BulkDelete bulkDelete5 = roleFS.createBulkDelete(destDir);
List<Map.Entry<Path, String>> entries = bulkDelete5.bulkDelete(pathsToDelete2);
Assertions.assertThat(entries.size())
.describedAs("Number of error entries in bulk delete result")
.isEqualTo(readOnlyRange);
assertAccessDeniedForEachPath(entries);
// delete the files in the original FS should succeed.
BulkDelete bulkDelete6 = fs.createBulkDelete(destDir);
assertSuccessfulBulkDelete(bulkDelete6.bulkDelete(pathsToDelete2));
}
/**
* Bind a read only role policy to a directory to the FS conf.
*/
private static void bindReadOnlyRolePolicy(Configuration assumedRoleConfig,
Path readOnlyDir)
throws JsonProcessingException {
bindRolePolicyStatements(assumedRoleConfig, STATEMENT_ALLOW_KMS_RW,
statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS),
new Statement(Effects.Deny)
.addActions(S3_PATH_WRITE_OPERATIONS)
.addResources(directory(readOnlyDir))
);
}
/**
* Validate delete results for each path in the list
* has access denied error.
*/
private void assertAccessDeniedForEachPath(List<Map.Entry<Path, String>> entries) {
for (Map.Entry<Path, String> entry : entries) {
Assertions.assertThat(entry.getValue())
.describedAs("Error message for path %s is %s", entry.getKey(), entry.getValue())
.contains("AccessDenied");
}
}
/**
* Have a directory with full R/W permissions, but then remove
* write access underneath, and try to delete it.
@ -719,12 +843,7 @@ public void executePartialDelete(final Configuration conf,
S3AFileSystem fs = getFileSystem();
fs.delete(destDir, true);
bindRolePolicyStatements(conf, STATEMENT_ALLOW_KMS_RW,
statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS),
new Statement(Effects.Deny)
.addActions(S3_PATH_WRITE_OPERATIONS)
.addResources(directory(readOnlyDir))
);
bindReadOnlyRolePolicy(conf, readOnlyDir);
roleFS = (S3AFileSystem) destDir.getFileSystem(conf);
int range = 10;

View File

@ -831,4 +831,6 @@ protected void delete(Path path, boolean recursive) throws IOException {
timer.end("time to delete %s", path);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractBulkDeleteTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
public class ITestAbfsContractBulkDelete extends AbstractContractBulkDeleteTest {
private final boolean isSecure;
private final ABFSContractTestBinding binding;
public ITestAbfsContractBulkDelete() throws Exception {
binding = new ABFSContractTestBinding();
this.isSecure = binding.isSecureMode();
}
@Override
public void setup() throws Exception {
binding.setup();
super.setup();
}
@Override
protected Configuration createConfiguration() {
return binding.getRawConfiguration();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new AbfsFileSystemContract(conf, isSecure);
}
}

View File

@ -26,6 +26,7 @@ log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG
log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG
log4j.logger.org.apache.hadoop.fs.azurebfs.contracts.services.TracingService=TRACE
log4j.logger.org.apache.hadoop.fs.azurebfs.services.AbfsClient=DEBUG
log4j.logger.org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation=DEBUG
# after here: turn off log messages from other parts of the system
# which only clutter test reports.