HADOOP-17244. S3A directory delete tombstones dir markers prematurely. (#2310)
This fixes the S3Guard/Directory Marker Retention integration so that when fs.s3a.directory.marker.retention=keep, failures during multipart delete are handled correctly, as are incremental deletes during directory tree operations. In both cases, when a directory marker with children is deleted from S3, the directory entry in S3Guard is not deleted, because it is still critical to representing the structure of the store. Contributed by Steve Loughran. Change-Id: I4ca133a23ea582cd42ec35dbf2dc85b286297d2f
This commit is contained in:
parent
4bb9d593da
commit
4687c25389
@ -265,7 +265,7 @@ public void testRecursiveRootListing() throws IOException {
|
|||||||
fs.listFiles(root, true));
|
fs.listFiles(root, true));
|
||||||
describe("verifying consistency with treewalk's files");
|
describe("verifying consistency with treewalk's files");
|
||||||
ContractTestUtils.TreeScanResults treeWalk = treeWalk(fs, root);
|
ContractTestUtils.TreeScanResults treeWalk = treeWalk(fs, root);
|
||||||
treeWalk.assertFieldsEquivalent("files", listing,
|
treeWalk.assertFieldsEquivalent("treewalk vs listFiles(/, true)", listing,
|
||||||
treeWalk.getFiles(),
|
treeWalk.getFiles(),
|
||||||
listing.getFiles());
|
listing.getFiles());
|
||||||
}
|
}
|
||||||
|
@ -1576,7 +1576,7 @@ public void deleteObjectAtPath(final Path path,
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Retries.RetryTranslated
|
@Retries.RetryTranslated
|
||||||
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
|
public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
|
||||||
final Path path,
|
final Path path,
|
||||||
final S3AFileStatus status,
|
final S3AFileStatus status,
|
||||||
final boolean collectTombstones,
|
final boolean collectTombstones,
|
||||||
@ -2081,6 +2081,7 @@ protected void deleteObject(String key)
|
|||||||
DELETE_CONSIDERED_IDEMPOTENT,
|
DELETE_CONSIDERED_IDEMPOTENT,
|
||||||
()-> {
|
()-> {
|
||||||
incrementStatistic(OBJECT_DELETE_REQUESTS);
|
incrementStatistic(OBJECT_DELETE_REQUESTS);
|
||||||
|
incrementStatistic(OBJECT_DELETE_OBJECTS);
|
||||||
s3.deleteObject(bucket, key);
|
s3.deleteObject(bucket, key);
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
@ -2127,9 +2128,14 @@ private void blockRootDelete(String key) throws InvalidRequestException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform a bulk object delete operation.
|
* Perform a bulk object delete operation against S3; leaves S3Guard
|
||||||
|
* alone.
|
||||||
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
|
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
|
||||||
* operation statistics.
|
* operation statistics
|
||||||
|
* <p></p>
|
||||||
|
* {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
|
||||||
|
* of objects deleted in the request.
|
||||||
|
* <p></p>
|
||||||
* Retry policy: retry untranslated; delete considered idempotent.
|
* Retry policy: retry untranslated; delete considered idempotent.
|
||||||
* If the request is throttled, this is logged in the throttle statistics,
|
* If the request is throttled, this is logged in the throttle statistics,
|
||||||
* with the counter set to the number of keys, rather than the number
|
* with the counter set to the number of keys, rather than the number
|
||||||
@ -2150,9 +2156,10 @@ private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
|
|||||||
incrementWriteOperations();
|
incrementWriteOperations();
|
||||||
BulkDeleteRetryHandler retryHandler =
|
BulkDeleteRetryHandler retryHandler =
|
||||||
new BulkDeleteRetryHandler(createStoreContext());
|
new BulkDeleteRetryHandler(createStoreContext());
|
||||||
|
int keyCount = deleteRequest.getKeys().size();
|
||||||
try(DurationInfo ignored =
|
try(DurationInfo ignored =
|
||||||
new DurationInfo(LOG, false, "DELETE %d keys",
|
new DurationInfo(LOG, false, "DELETE %d keys",
|
||||||
deleteRequest.getKeys().size())) {
|
keyCount)) {
|
||||||
return invoker.retryUntranslated("delete",
|
return invoker.retryUntranslated("delete",
|
||||||
DELETE_CONSIDERED_IDEMPOTENT,
|
DELETE_CONSIDERED_IDEMPOTENT,
|
||||||
(text, e, r, i) -> {
|
(text, e, r, i) -> {
|
||||||
@ -2161,6 +2168,7 @@ private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
|
|||||||
},
|
},
|
||||||
() -> {
|
() -> {
|
||||||
incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
|
incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
|
||||||
|
incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
|
||||||
return s3.deleteObjects(deleteRequest);
|
return s3.deleteObjects(deleteRequest);
|
||||||
});
|
});
|
||||||
} catch (MultiObjectDeleteException e) {
|
} catch (MultiObjectDeleteException e) {
|
||||||
@ -2550,8 +2558,8 @@ DeleteObjectsResult removeKeys(
|
|||||||
// entries so we only process these failures on "real" deletes.
|
// entries so we only process these failures on "real" deletes.
|
||||||
Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>> results =
|
Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>> results =
|
||||||
new MultiObjectDeleteSupport(createStoreContext(), operationState)
|
new MultiObjectDeleteSupport(createStoreContext(), operationState)
|
||||||
.processDeleteFailure(ex, keysToDelete);
|
.processDeleteFailure(ex, keysToDelete, new ArrayList<Path>());
|
||||||
undeletedObjectsOnFailure.addAll(results.getMiddle());
|
undeletedObjectsOnFailure.addAll(results.getLeft());
|
||||||
}
|
}
|
||||||
throw ex;
|
throw ex;
|
||||||
} catch (AmazonClientException | IOException ex) {
|
} catch (AmazonClientException | IOException ex) {
|
||||||
|
@ -156,6 +156,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
|
|||||||
INVOCATION_RENAME,
|
INVOCATION_RENAME,
|
||||||
OBJECT_COPY_REQUESTS,
|
OBJECT_COPY_REQUESTS,
|
||||||
OBJECT_DELETE_REQUESTS,
|
OBJECT_DELETE_REQUESTS,
|
||||||
|
OBJECT_DELETE_OBJECTS,
|
||||||
OBJECT_LIST_REQUESTS,
|
OBJECT_LIST_REQUESTS,
|
||||||
OBJECT_CONTINUE_LIST_REQUESTS,
|
OBJECT_CONTINUE_LIST_REQUESTS,
|
||||||
OBJECT_METADATA_REQUESTS,
|
OBJECT_METADATA_REQUESTS,
|
||||||
|
@ -85,6 +85,8 @@ public enum Statistic {
|
|||||||
"Calls of rename()"),
|
"Calls of rename()"),
|
||||||
OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),
|
OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),
|
||||||
OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
|
OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
|
||||||
|
OBJECT_DELETE_OBJECTS("object_delete_objects",
|
||||||
|
"Objects deleted in delete requests"),
|
||||||
OBJECT_LIST_REQUESTS("object_list_requests",
|
OBJECT_LIST_REQUESTS("object_list_requests",
|
||||||
"Number of object listings made"),
|
"Number of object listings made"),
|
||||||
OBJECT_CONTINUE_LIST_REQUESTS("object_continue_list_requests",
|
OBJECT_CONTINUE_LIST_REQUESTS("object_continue_list_requests",
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||||
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
||||||
@ -152,10 +153,13 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|||||||
/**
|
/**
|
||||||
* List of keys built up for the next delete batch.
|
* List of keys built up for the next delete batch.
|
||||||
*/
|
*/
|
||||||
private List<DeleteObjectsRequest.KeyVersion> keys;
|
private List<DeleteEntry> keys;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List of paths built up for deletion.
|
* List of paths built up for incremental deletion on tree delete.
|
||||||
|
* At the end of the entire delete the full tree is scanned in S3Guard
|
||||||
|
* and tombstones added. For this reason this list of paths <i>must not</i>
|
||||||
|
* include directory markers, as that will break the scan.
|
||||||
*/
|
*/
|
||||||
private List<Path> paths;
|
private List<Path> paths;
|
||||||
|
|
||||||
@ -279,7 +283,7 @@ public Boolean execute() throws IOException {
|
|||||||
LOG.debug("deleting simple file {}", path);
|
LOG.debug("deleting simple file {}", path);
|
||||||
deleteObjectAtPath(path, key, true);
|
deleteObjectAtPath(path, key, true);
|
||||||
}
|
}
|
||||||
LOG.debug("Deleted {} files", filesDeleted);
|
LOG.debug("Deleted {} objects", filesDeleted);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -323,7 +327,7 @@ protected void deleteDirectoryTree(final Path path,
|
|||||||
// list files including any under tombstones through S3Guard
|
// list files including any under tombstones through S3Guard
|
||||||
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
|
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
|
||||||
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
|
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
|
||||||
callbacks.listFilesAndEmptyDirectories(path, status,
|
callbacks.listFilesAndDirectoryMarkers(path, status,
|
||||||
false, true);
|
false, true);
|
||||||
|
|
||||||
// iterate through and delete. The next() call will block when a new S3
|
// iterate through and delete. The next() call will block when a new S3
|
||||||
@ -359,7 +363,10 @@ protected void deleteDirectoryTree(final Path path,
|
|||||||
while (objects.hasNext()) {
|
while (objects.hasNext()) {
|
||||||
// get the next entry in the listing.
|
// get the next entry in the listing.
|
||||||
extraFilesDeleted++;
|
extraFilesDeleted++;
|
||||||
queueForDeletion(deletionKey(objects.next()), null);
|
S3AFileStatus next = objects.next();
|
||||||
|
LOG.debug("Found Unlisted entry {}", next);
|
||||||
|
queueForDeletion(deletionKey(next), null,
|
||||||
|
next.isDirectory());
|
||||||
}
|
}
|
||||||
if (extraFilesDeleted > 0) {
|
if (extraFilesDeleted > 0) {
|
||||||
LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
|
LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
|
||||||
@ -402,7 +409,7 @@ private String deletionKey(final S3AFileStatus stat) {
|
|||||||
*/
|
*/
|
||||||
private void queueForDeletion(
|
private void queueForDeletion(
|
||||||
final S3AFileStatus stat) throws IOException {
|
final S3AFileStatus stat) throws IOException {
|
||||||
queueForDeletion(deletionKey(stat), stat.getPath());
|
queueForDeletion(deletionKey(stat), stat.getPath(), stat.isDirectory());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -413,14 +420,18 @@ private void queueForDeletion(
|
|||||||
*
|
*
|
||||||
* @param key key to delete
|
* @param key key to delete
|
||||||
* @param deletePath nullable path of the key
|
* @param deletePath nullable path of the key
|
||||||
|
* @param isDirMarker is the entry a directory?
|
||||||
* @throws IOException failure of the previous batch of deletions.
|
* @throws IOException failure of the previous batch of deletions.
|
||||||
*/
|
*/
|
||||||
private void queueForDeletion(final String key,
|
private void queueForDeletion(final String key,
|
||||||
@Nullable final Path deletePath) throws IOException {
|
@Nullable final Path deletePath,
|
||||||
|
boolean isDirMarker) throws IOException {
|
||||||
LOG.debug("Adding object to delete: \"{}\"", key);
|
LOG.debug("Adding object to delete: \"{}\"", key);
|
||||||
keys.add(new DeleteObjectsRequest.KeyVersion(key));
|
keys.add(new DeleteEntry(key, isDirMarker));
|
||||||
if (deletePath != null) {
|
if (deletePath != null) {
|
||||||
paths.add(deletePath);
|
if (!isDirMarker) {
|
||||||
|
paths.add(deletePath);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (keys.size() == pageSize) {
|
if (keys.size() == pageSize) {
|
||||||
@ -484,7 +495,7 @@ private void deleteObjectAtPath(
|
|||||||
* @return the submitted future or null
|
* @return the submitted future or null
|
||||||
*/
|
*/
|
||||||
private CompletableFuture<Void> submitDelete(
|
private CompletableFuture<Void> submitDelete(
|
||||||
final List<DeleteObjectsRequest.KeyVersion> keyList,
|
final List<DeleteEntry> keyList,
|
||||||
final List<Path> pathList) {
|
final List<Path> pathList) {
|
||||||
|
|
||||||
if (keyList.isEmpty() && pathList.isEmpty()) {
|
if (keyList.isEmpty() && pathList.isEmpty()) {
|
||||||
@ -514,31 +525,62 @@ private CompletableFuture<Void> submitDelete(
|
|||||||
@Retries.RetryTranslated
|
@Retries.RetryTranslated
|
||||||
private void asyncDeleteAction(
|
private void asyncDeleteAction(
|
||||||
final BulkOperationState state,
|
final BulkOperationState state,
|
||||||
final List<DeleteObjectsRequest.KeyVersion> keyList,
|
final List<DeleteEntry> keyList,
|
||||||
final List<Path> pathList,
|
final List<Path> pathList,
|
||||||
final boolean auditDeletedKeys)
|
final boolean auditDeletedKeys)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
|
||||||
try (DurationInfo ignored =
|
try (DurationInfo ignored =
|
||||||
new DurationInfo(LOG, false, "Delete page of keys")) {
|
new DurationInfo(LOG, false,
|
||||||
|
"Delete page of %d keys", keyList.size())) {
|
||||||
DeleteObjectsResult result = null;
|
DeleteObjectsResult result = null;
|
||||||
List<Path> undeletedObjects = new ArrayList<>();
|
List<Path> undeletedObjects = new ArrayList<>();
|
||||||
if (!keyList.isEmpty()) {
|
if (!keyList.isEmpty()) {
|
||||||
result = Invoker.once("Remove S3 Keys",
|
// first delete the files.
|
||||||
|
List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
|
||||||
|
.filter(e -> !e.isDirMarker)
|
||||||
|
.map(e -> e.keyVersion)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
LOG.debug("Deleting of {} file objects", files.size());
|
||||||
|
result = Invoker.once("Remove S3 Files",
|
||||||
status.getPath().toString(),
|
status.getPath().toString(),
|
||||||
() -> callbacks.removeKeys(
|
() -> callbacks.removeKeys(
|
||||||
keyList,
|
files,
|
||||||
false,
|
false,
|
||||||
undeletedObjects,
|
undeletedObjects,
|
||||||
state,
|
state,
|
||||||
!auditDeletedKeys));
|
!auditDeletedKeys));
|
||||||
|
if (result != null) {
|
||||||
|
deletedObjects.addAll(result.getDeletedObjects());
|
||||||
|
}
|
||||||
|
// now the dirs
|
||||||
|
List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
|
||||||
|
.filter(e -> e.isDirMarker)
|
||||||
|
.map(e -> e.keyVersion)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
LOG.debug("Deleting of {} directory markers", dirs.size());
|
||||||
|
// This is invoked with deleteFakeDir = true, so
|
||||||
|
// S3Guard is not updated.
|
||||||
|
result = Invoker.once("Remove S3 Dir Markers",
|
||||||
|
status.getPath().toString(),
|
||||||
|
() -> callbacks.removeKeys(
|
||||||
|
dirs,
|
||||||
|
true,
|
||||||
|
undeletedObjects,
|
||||||
|
state,
|
||||||
|
!auditDeletedKeys));
|
||||||
|
if (result != null) {
|
||||||
|
deletedObjects.addAll(result.getDeletedObjects());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (!pathList.isEmpty()) {
|
if (!pathList.isEmpty()) {
|
||||||
|
// delete file paths only. This stops tombstones
|
||||||
|
// being added until the final directory cleanup
|
||||||
|
// (HADOOP-17244)
|
||||||
metadataStore.deletePaths(pathList, state);
|
metadataStore.deletePaths(pathList, state);
|
||||||
}
|
}
|
||||||
if (auditDeletedKeys && result != null) {
|
if (auditDeletedKeys) {
|
||||||
// audit the deleted keys
|
// audit the deleted keys
|
||||||
List<DeleteObjectsResult.DeletedObject> deletedObjects =
|
|
||||||
result.getDeletedObjects();
|
|
||||||
if (deletedObjects.size() != keyList.size()) {
|
if (deletedObjects.size() != keyList.size()) {
|
||||||
// size mismatch
|
// size mismatch
|
||||||
LOG.warn("Size mismatch in deletion operation. "
|
LOG.warn("Size mismatch in deletion operation. "
|
||||||
@ -549,7 +591,7 @@ private void asyncDeleteAction(
|
|||||||
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
|
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
|
||||||
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
|
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
|
||||||
}
|
}
|
||||||
for (DeleteObjectsRequest.KeyVersion kv : keyList) {
|
for (DeleteEntry kv : keyList) {
|
||||||
LOG.debug("{}", kv.getKey());
|
LOG.debug("{}", kv.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -557,5 +599,31 @@ private void asyncDeleteAction(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletion entry; dir marker state is tracked to control S3Guard
|
||||||
|
* update policy.
|
||||||
|
*/
|
||||||
|
private static final class DeleteEntry {
|
||||||
|
private final DeleteObjectsRequest.KeyVersion keyVersion;
|
||||||
|
|
||||||
|
private final boolean isDirMarker;
|
||||||
|
|
||||||
|
private DeleteEntry(final String key, final boolean isDirMarker) {
|
||||||
|
this.keyVersion = new DeleteObjectsRequest.KeyVersion(key);
|
||||||
|
this.isDirMarker = isDirMarker;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKey() {
|
||||||
|
return keyVersion.getKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "DeleteEntry{" +
|
||||||
|
"key='" + getKey() + '\'' +
|
||||||
|
", isDirMarker=" + isDirMarker +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@ -37,8 +38,10 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.AWSS3IOException;
|
import org.apache.hadoop.fs.s3a.AWSS3IOException;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
import org.apache.hadoop.fs.s3a.Tristate;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
@ -84,15 +87,25 @@ public MultiObjectDeleteSupport(final StoreContext context,
|
|||||||
public static IOException translateDeleteException(
|
public static IOException translateDeleteException(
|
||||||
final String message,
|
final String message,
|
||||||
final MultiObjectDeleteException deleteException) {
|
final MultiObjectDeleteException deleteException) {
|
||||||
|
List<MultiObjectDeleteException.DeleteError> errors
|
||||||
|
= deleteException.getErrors();
|
||||||
|
LOG.warn("Bulk delete operation failed to delete all objects;"
|
||||||
|
+ " failure count = {}",
|
||||||
|
errors.size());
|
||||||
final StringBuilder result = new StringBuilder(
|
final StringBuilder result = new StringBuilder(
|
||||||
deleteException.getErrors().size() * 256);
|
errors.size() * 256);
|
||||||
result.append(message).append(": ");
|
result.append(message).append(": ");
|
||||||
String exitCode = "";
|
String exitCode = "";
|
||||||
for (MultiObjectDeleteException.DeleteError error :
|
for (MultiObjectDeleteException.DeleteError error :
|
||||||
deleteException.getErrors()) {
|
deleteException.getErrors()) {
|
||||||
String code = error.getCode();
|
String code = error.getCode();
|
||||||
result.append(String.format("%s: %s: %s%n", code, error.getKey(),
|
String item = String.format("%s: %s%s: %s%n", code, error.getKey(),
|
||||||
error.getMessage()));
|
(error.getVersionId() != null
|
||||||
|
? (" (" + error.getVersionId() + ")")
|
||||||
|
: ""),
|
||||||
|
error.getMessage());
|
||||||
|
LOG.warn(item);
|
||||||
|
result.append(item);
|
||||||
if (exitCode.isEmpty() || ACCESS_DENIED.equals(code)) {
|
if (exitCode.isEmpty() || ACCESS_DENIED.equals(code)) {
|
||||||
exitCode = code;
|
exitCode = code;
|
||||||
}
|
}
|
||||||
@ -113,7 +126,7 @@ public static IOException translateDeleteException(
|
|||||||
* @param keysToDelete the keys in the delete request
|
* @param keysToDelete the keys in the delete request
|
||||||
* @return tuple of (undeleted, deleted) paths.
|
* @return tuple of (undeleted, deleted) paths.
|
||||||
*/
|
*/
|
||||||
public Pair<List<Path>, List<Path>> splitUndeletedKeys(
|
public Pair<List<KeyPath>, List<KeyPath>> splitUndeletedKeys(
|
||||||
final MultiObjectDeleteException deleteException,
|
final MultiObjectDeleteException deleteException,
|
||||||
final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete) {
|
final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete) {
|
||||||
LOG.debug("Processing delete failure; keys to delete count = {};"
|
LOG.debug("Processing delete failure; keys to delete count = {};"
|
||||||
@ -122,11 +135,11 @@ public Pair<List<Path>, List<Path>> splitUndeletedKeys(
|
|||||||
deleteException.getErrors().size(),
|
deleteException.getErrors().size(),
|
||||||
deleteException.getDeletedObjects().size());
|
deleteException.getDeletedObjects().size());
|
||||||
// convert the collection of keys being deleted into paths
|
// convert the collection of keys being deleted into paths
|
||||||
final List<Path> pathsBeingDeleted = keysToPaths(keysToDelete);
|
final List<KeyPath> pathsBeingDeleted = keysToKeyPaths(keysToDelete);
|
||||||
// Take this is list of paths
|
// Take this ist of paths
|
||||||
// extract all undeleted entries contained in the exception and
|
// extract all undeleted entries contained in the exception and
|
||||||
// then removes them from the original list.
|
// then remove them from the original list.
|
||||||
List<Path> undeleted = removeUndeletedPaths(deleteException,
|
List<KeyPath> undeleted = removeUndeletedPaths(deleteException,
|
||||||
pathsBeingDeleted,
|
pathsBeingDeleted,
|
||||||
getStoreContext()::keyToPath);
|
getStoreContext()::keyToPath);
|
||||||
return Pair.of(undeleted, pathsBeingDeleted);
|
return Pair.of(undeleted, pathsBeingDeleted);
|
||||||
@ -139,7 +152,17 @@ public Pair<List<Path>, List<Path>> splitUndeletedKeys(
|
|||||||
*/
|
*/
|
||||||
public List<Path> keysToPaths(
|
public List<Path> keysToPaths(
|
||||||
final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete) {
|
final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete) {
|
||||||
return convertToPaths(keysToDelete,
|
return toPathList(keysToKeyPaths(keysToDelete));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a list of delete requests, convert them all to keypaths.
|
||||||
|
* @param keysToDelete list of keys for the delete operation.
|
||||||
|
* @return list of keypath entries
|
||||||
|
*/
|
||||||
|
public List<KeyPath> keysToKeyPaths(
|
||||||
|
final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete) {
|
||||||
|
return convertToKeyPaths(keysToDelete,
|
||||||
getStoreContext()::keyToPath);
|
getStoreContext()::keyToPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -149,13 +172,17 @@ public List<Path> keysToPaths(
|
|||||||
* @param qualifier path qualifier
|
* @param qualifier path qualifier
|
||||||
* @return the paths.
|
* @return the paths.
|
||||||
*/
|
*/
|
||||||
public static List<Path> convertToPaths(
|
public static List<KeyPath> convertToKeyPaths(
|
||||||
final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
||||||
final Function<String, Path> qualifier) {
|
final Function<String, Path> qualifier) {
|
||||||
return keysToDelete.stream()
|
List<KeyPath> l = new ArrayList<>(keysToDelete.size());
|
||||||
.map((keyVersion) ->
|
for (DeleteObjectsRequest.KeyVersion kv : keysToDelete) {
|
||||||
qualifier.apply(keyVersion.getKey()))
|
String key = kv.getKey();
|
||||||
.collect(Collectors.toList());
|
Path p = qualifier.apply(key);
|
||||||
|
boolean isDir = key.endsWith("/");
|
||||||
|
l.add(new KeyPath(key, p, isDir));
|
||||||
|
}
|
||||||
|
return l;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -164,27 +191,59 @@ public static List<Path> convertToPaths(
|
|||||||
* and the original list of files to delete declares to have been deleted.
|
* and the original list of files to delete declares to have been deleted.
|
||||||
* @param deleteException the delete exception.
|
* @param deleteException the delete exception.
|
||||||
* @param keysToDelete collection of keys which had been requested.
|
* @param keysToDelete collection of keys which had been requested.
|
||||||
|
* @param retainedMarkers list built up of retained markers.
|
||||||
* @return a tuple of (undeleted, deleted, failures)
|
* @return a tuple of (undeleted, deleted, failures)
|
||||||
*/
|
*/
|
||||||
public Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
|
public Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
|
||||||
processDeleteFailure(
|
processDeleteFailure(
|
||||||
final MultiObjectDeleteException deleteException,
|
final MultiObjectDeleteException deleteException,
|
||||||
final List<DeleteObjectsRequest.KeyVersion> keysToDelete) {
|
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
||||||
|
final List<Path> retainedMarkers) {
|
||||||
final MetadataStore metadataStore =
|
final MetadataStore metadataStore =
|
||||||
checkNotNull(getStoreContext().getMetadataStore(),
|
checkNotNull(getStoreContext().getMetadataStore(),
|
||||||
"context metadatastore");
|
"context metadatastore");
|
||||||
final List<Pair<Path, IOException>> failures = new ArrayList<>();
|
final List<Pair<Path, IOException>> failures = new ArrayList<>();
|
||||||
final Pair<List<Path>, List<Path>> outcome =
|
final Pair<List<KeyPath>, List<KeyPath>> outcome =
|
||||||
splitUndeletedKeys(deleteException, keysToDelete);
|
splitUndeletedKeys(deleteException, keysToDelete);
|
||||||
List<Path> deleted = outcome.getRight();
|
List<KeyPath> deleted = outcome.getRight();
|
||||||
List<Path> undeleted = outcome.getLeft();
|
List<Path> deletedPaths = new ArrayList<>();
|
||||||
// delete the paths but recover
|
List<KeyPath> undeleted = outcome.getLeft();
|
||||||
// TODO: handle the case where a parent path is deleted but not a child.
|
retainedMarkers.clear();
|
||||||
// TODO: in a fake object delete, we don't actually want to delete
|
List<Path> undeletedPaths = toPathList((List<KeyPath>) undeleted);
|
||||||
// metastore entries
|
// sort shorter keys first,
|
||||||
deleted.forEach(path -> {
|
// so that if the left key is longer than the first it is considered
|
||||||
try {
|
// smaller, so appears in the list first.
|
||||||
metadataStore.delete(path, operationState);
|
// thus when we look for a dir being empty, we know it holds
|
||||||
|
deleted.sort((l, r) -> r.getKey().length() - l.getKey().length());
|
||||||
|
|
||||||
|
// now go through and delete from S3Guard all paths listed in
|
||||||
|
// the result which are either files or directories with
|
||||||
|
// no children.
|
||||||
|
deleted.forEach(kp -> {
|
||||||
|
Path path = kp.getPath();
|
||||||
|
try{
|
||||||
|
boolean toDelete = true;
|
||||||
|
if (kp.isDirectoryMarker()) {
|
||||||
|
// its a dir marker, which could be an empty dir
|
||||||
|
// (which is then tombstoned), or a non-empty dir, which
|
||||||
|
// is not tombstoned.
|
||||||
|
// for this to be handled, we have to have removed children
|
||||||
|
// from the store first, which relies on the sort
|
||||||
|
PathMetadata pmentry = metadataStore.get(path, true);
|
||||||
|
if (pmentry != null && !pmentry.isDeleted()) {
|
||||||
|
toDelete = pmentry.getFileStatus().isEmptyDirectory()
|
||||||
|
== Tristate.TRUE;
|
||||||
|
} else {
|
||||||
|
toDelete = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (toDelete) {
|
||||||
|
LOG.debug("Removing deleted object from S3Guard Store {}", path);
|
||||||
|
metadataStore.delete(path, operationState);
|
||||||
|
} else {
|
||||||
|
LOG.debug("Retaining S3Guard directory entry {}", path);
|
||||||
|
retainedMarkers.add(path);
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// trouble: we failed to delete the far end entry
|
// trouble: we failed to delete the far end entry
|
||||||
// try with the next one.
|
// try with the next one.
|
||||||
@ -192,11 +251,25 @@ public static List<Path> convertToPaths(
|
|||||||
LOG.warn("Failed to update S3Guard store with deletion of {}", path);
|
LOG.warn("Failed to update S3Guard store with deletion of {}", path);
|
||||||
failures.add(Pair.of(path, e));
|
failures.add(Pair.of(path, e));
|
||||||
}
|
}
|
||||||
|
// irrespective of the S3Guard outcome, it is declared as deleted, as
|
||||||
|
// it is no longer in the S3 store.
|
||||||
|
deletedPaths.add(path);
|
||||||
});
|
});
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
undeleted.forEach(p -> LOG.debug("Deleted {}", p));
|
undeleted.forEach(p -> LOG.debug("Deleted {}", p));
|
||||||
}
|
}
|
||||||
return Triple.of(undeleted, deleted, failures);
|
return Triple.of(undeletedPaths, deletedPaths, failures);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a list of keypaths, convert to a list of paths.
|
||||||
|
* @param keyPaths source list
|
||||||
|
* @return a listg of paths
|
||||||
|
*/
|
||||||
|
public static List<Path> toPathList(final List<KeyPath> keyPaths) {
|
||||||
|
return keyPaths.stream()
|
||||||
|
.map(KeyPath::getPath)
|
||||||
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -211,8 +284,31 @@ public static List<Path> convertToPaths(
|
|||||||
public static List<Path> extractUndeletedPaths(
|
public static List<Path> extractUndeletedPaths(
|
||||||
final MultiObjectDeleteException deleteException,
|
final MultiObjectDeleteException deleteException,
|
||||||
final Function<String, Path> qualifierFn) {
|
final Function<String, Path> qualifierFn) {
|
||||||
return deleteException.getErrors().stream()
|
return toPathList(extractUndeletedKeyPaths(deleteException, qualifierFn));
|
||||||
.map((e) -> qualifierFn.apply(e.getKey()))
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a list of undeleted paths from a {@code MultiObjectDeleteException}.
|
||||||
|
* Outside of unit tests, the qualifier function should be
|
||||||
|
* {@link S3AFileSystem#keyToQualifiedPath(String)}.
|
||||||
|
* @param deleteException the delete exception.
|
||||||
|
* @param qualifierFn function to qualify paths
|
||||||
|
* @return the possibly empty list of paths.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static List<KeyPath> extractUndeletedKeyPaths(
|
||||||
|
final MultiObjectDeleteException deleteException,
|
||||||
|
final Function<String, Path> qualifierFn) {
|
||||||
|
|
||||||
|
List<MultiObjectDeleteException.DeleteError> errors
|
||||||
|
= deleteException.getErrors();
|
||||||
|
return errors.stream()
|
||||||
|
.map((error) -> {
|
||||||
|
String key = error.getKey();
|
||||||
|
Path path = qualifierFn.apply(key);
|
||||||
|
boolean isDir = key.endsWith("/");
|
||||||
|
return new KeyPath(key, path, isDir);
|
||||||
|
})
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -227,12 +323,17 @@ public static List<Path> extractUndeletedPaths(
|
|||||||
* @return the list of undeleted entries
|
* @return the list of undeleted entries
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static List<Path> removeUndeletedPaths(
|
static List<KeyPath> removeUndeletedPaths(
|
||||||
final MultiObjectDeleteException deleteException,
|
final MultiObjectDeleteException deleteException,
|
||||||
final Collection<Path> pathsBeingDeleted,
|
final Collection<KeyPath> pathsBeingDeleted,
|
||||||
final Function<String, Path> qualifier) {
|
final Function<String, Path> qualifier) {
|
||||||
List<Path> undeleted = extractUndeletedPaths(deleteException, qualifier);
|
// get the undeleted values
|
||||||
pathsBeingDeleted.removeAll(undeleted);
|
List<KeyPath> undeleted = extractUndeletedKeyPaths(deleteException,
|
||||||
|
qualifier);
|
||||||
|
// and remove them from the undeleted list, matching on key
|
||||||
|
for (KeyPath undel : undeleted) {
|
||||||
|
pathsBeingDeleted.removeIf(kp -> kp.getPath().equals(undel.getPath()));
|
||||||
|
}
|
||||||
return undeleted;
|
return undeleted;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,4 +348,70 @@ public List<Path> processDeleteFailureGenericException(Exception ex,
|
|||||||
final List<DeleteObjectsRequest.KeyVersion> keysToDelete) {
|
final List<DeleteObjectsRequest.KeyVersion> keysToDelete) {
|
||||||
return keysToPaths(keysToDelete);
|
return keysToPaths(keysToDelete);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Representation of a (key, path) which couldn't be deleted;
|
||||||
|
* the dir marker flag is inferred from the key suffix.
|
||||||
|
* <p>
|
||||||
|
* Added because Pairs of Lists of Triples was just too complex
|
||||||
|
* for Java code.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
public static final class KeyPath {
|
||||||
|
/** Key in bucket. */
|
||||||
|
private final String key;
|
||||||
|
/** Full path. */
|
||||||
|
private final Path path;
|
||||||
|
/** Is this a directory marker? */
|
||||||
|
private final boolean directoryMarker;
|
||||||
|
|
||||||
|
public KeyPath(final String key,
|
||||||
|
final Path path,
|
||||||
|
final boolean directoryMarker) {
|
||||||
|
this.key = key;
|
||||||
|
this.path = path;
|
||||||
|
this.directoryMarker = directoryMarker;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Path getPath() {
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDirectoryMarker() {
|
||||||
|
return directoryMarker;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "KeyPath{" +
|
||||||
|
"key='" + key + '\'' +
|
||||||
|
", path=" + path +
|
||||||
|
", directoryMarker=" + directoryMarker +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Equals test is on key alone.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean equals(final Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
KeyPath keyPath = (KeyPath) o;
|
||||||
|
return key.equals(keyPath.key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -105,7 +105,7 @@ void deleteObjectAtPath(Path path,
|
|||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recursive list of files and empty directories.
|
* Recursive list of files and directory markers.
|
||||||
*
|
*
|
||||||
* @param path path to list from
|
* @param path path to list from
|
||||||
* @param status optional status of path to list.
|
* @param status optional status of path to list.
|
||||||
@ -115,7 +115,7 @@ void deleteObjectAtPath(Path path,
|
|||||||
* @throws IOException failure
|
* @throws IOException failure
|
||||||
*/
|
*/
|
||||||
@Retries.RetryTranslated
|
@Retries.RetryTranslated
|
||||||
RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
|
RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
|
||||||
Path path,
|
Path path,
|
||||||
S3AFileStatus status,
|
S3AFileStatus status,
|
||||||
boolean collectTombstones,
|
boolean collectTombstones,
|
||||||
|
@ -211,14 +211,23 @@ private void completeActiveCopies(String reason) throws IOException {
|
|||||||
* Only queuing objects here whose copy operation has
|
* Only queuing objects here whose copy operation has
|
||||||
* been submitted and so is in that thread pool.
|
* been submitted and so is in that thread pool.
|
||||||
* </li>
|
* </li>
|
||||||
|
* <li>
|
||||||
|
* If a path is supplied, then after the delete is executed
|
||||||
|
* (and completes) the rename tracker from S3Guard will be
|
||||||
|
* told of its deletion. Do not set this for directory
|
||||||
|
* markers with children, as it may mistakenly add
|
||||||
|
* tombstones into the table.
|
||||||
|
* </li>
|
||||||
* </ol>
|
* </ol>
|
||||||
* This method must only be called from the primary thread.
|
* This method must only be called from the primary thread.
|
||||||
* @param path path to the object
|
* @param path path to the object.
|
||||||
* @param key key of the object.
|
* @param key key of the object.
|
||||||
*/
|
*/
|
||||||
private void queueToDelete(Path path, String key) {
|
private void queueToDelete(Path path, String key) {
|
||||||
LOG.debug("Queueing to delete {}", path);
|
LOG.debug("Queueing to delete {}", path);
|
||||||
pathsToDelete.add(path);
|
if (path != null) {
|
||||||
|
pathsToDelete.add(path);
|
||||||
|
}
|
||||||
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(key));
|
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -234,7 +243,9 @@ private void queueToDelete(Path path, String key) {
|
|||||||
*/
|
*/
|
||||||
private void queueToDelete(
|
private void queueToDelete(
|
||||||
List<DirMarkerTracker.Marker> markersToDelete) {
|
List<DirMarkerTracker.Marker> markersToDelete) {
|
||||||
markersToDelete.forEach(this::queueToDelete);
|
markersToDelete.forEach(m -> queueToDelete(
|
||||||
|
null,
|
||||||
|
m.getKey()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -397,6 +408,7 @@ protected void recursiveDirectoryRename() throws IOException {
|
|||||||
destStatus.getPath());
|
destStatus.getPath());
|
||||||
// Although the dir marker policy doesn't always need to do this,
|
// Although the dir marker policy doesn't always need to do this,
|
||||||
// it's simplest just to be consistent here.
|
// it's simplest just to be consistent here.
|
||||||
|
// note: updates the metastore as well a S3.
|
||||||
callbacks.deleteObjectAtPath(destStatus.getPath(), dstKey, false, null);
|
callbacks.deleteObjectAtPath(destStatus.getPath(), dstKey, false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -408,7 +420,7 @@ protected void recursiveDirectoryRename() throws IOException {
|
|||||||
false);
|
false);
|
||||||
|
|
||||||
final RemoteIterator<S3ALocatedFileStatus> iterator =
|
final RemoteIterator<S3ALocatedFileStatus> iterator =
|
||||||
callbacks.listFilesAndEmptyDirectories(parentPath,
|
callbacks.listFilesAndDirectoryMarkers(parentPath,
|
||||||
sourceStatus,
|
sourceStatus,
|
||||||
true,
|
true,
|
||||||
true);
|
true);
|
||||||
|
@ -717,7 +717,7 @@ public DDBPathMetadata get(Path path) throws IOException {
|
|||||||
public DDBPathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
|
public DDBPathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkPath(path);
|
checkPath(path);
|
||||||
LOG.debug("Get from table {} in region {}: {}. wantEmptyDirectory={}",
|
LOG.debug("Get from table {} in region {}: {} ; wantEmptyDirectory={}",
|
||||||
tableName, region, path, wantEmptyDirectoryFlag);
|
tableName, region, path, wantEmptyDirectoryFlag);
|
||||||
DDBPathMetadata result = innerGet(path, wantEmptyDirectoryFlag);
|
DDBPathMetadata result = innerGet(path, wantEmptyDirectoryFlag);
|
||||||
LOG.debug("result of get {} is: {}", path, result);
|
LOG.debug("result of get {} is: {}", path, result);
|
||||||
|
@ -12,7 +12,11 @@
|
|||||||
limitations under the License. See accompanying LICENSE file.
|
limitations under the License. See accompanying LICENSE file.
|
||||||
-->
|
-->
|
||||||
|
|
||||||
# Controlling the S3A Directory Marker Behavior
|
# Experimental: Controlling the S3A Directory Marker Behavior
|
||||||
|
|
||||||
|
This document discusses an experimental feature of the S3A
|
||||||
|
connector since Hadoop 3.3.1: the ability to retain directory
|
||||||
|
marker objects above paths containing files or subdirectories.
|
||||||
|
|
||||||
## <a name="compatibility"></a> Critical: this is not backwards compatible!
|
## <a name="compatibility"></a> Critical: this is not backwards compatible!
|
||||||
|
|
||||||
@ -26,15 +30,40 @@ Versions of Hadoop which are incompatible with other marker retention policies,
|
|||||||
as of August 2020.
|
as of August 2020.
|
||||||
|
|
||||||
-------------------------------------------------------
|
-------------------------------------------------------
|
||||||
| Branch | Compatible Since | Future Fix Planned? |
|
| Branch | Compatible Since | Supported |
|
||||||
|------------|------------------|---------------------|
|
|------------|------------------|---------------------|
|
||||||
| Hadoop 2.x | | NO |
|
| Hadoop 2.x | n/a | WONTFIX |
|
||||||
| Hadoop 3.0 | | NO |
|
| Hadoop 3.0 | check | Read-only |
|
||||||
| Hadoop 3.1 | check | Yes |
|
| Hadoop 3.1 | check | Read-only |
|
||||||
| Hadoop 3.2 | check | Yes |
|
| Hadoop 3.2 | check | Read-only |
|
||||||
| Hadoop 3.3 | 3.3.1 | Done |
|
| Hadoop 3.3 | 3.3.1 | Done |
|
||||||
-------------------------------------------------------
|
-------------------------------------------------------
|
||||||
|
|
||||||
|
*WONTFIX*
|
||||||
|
|
||||||
|
The Hadoop branch-2 line will *not* be patched.
|
||||||
|
|
||||||
|
*Read-only*
|
||||||
|
|
||||||
|
These branches have read-only compatibility.
|
||||||
|
|
||||||
|
* They may list directories with directory markers, and correctly identify when
|
||||||
|
such directories have child entries.
|
||||||
|
* They will open files under directories with such markers.
|
||||||
|
|
||||||
|
However, they have limitations when writing/deleting directories.
|
||||||
|
|
||||||
|
Specifically: S3Guard tables may not be correctly updated in
|
||||||
|
all conditions, especially on the partial failure of delete
|
||||||
|
operations. Specifically: they may mistakenly add a tombstone in
|
||||||
|
the dynamoDB table and so future directory/directory tree listings
|
||||||
|
will consider the directory to be nonexistent.
|
||||||
|
|
||||||
|
_It is not safe for Hadoop releases before Hadoop 3.3.1 to write
|
||||||
|
to S3 buckets which have directory markers when S3Guard is enabled_
|
||||||
|
|
||||||
|
## Verifying read compatibility.
|
||||||
|
|
||||||
The `s3guard bucket-info` tool [can be used to verify support](#bucket-info).
|
The `s3guard bucket-info` tool [can be used to verify support](#bucket-info).
|
||||||
This allows for a command line check of compatibility, including
|
This allows for a command line check of compatibility, including
|
||||||
in scripts.
|
in scripts.
|
||||||
@ -49,6 +78,7 @@ It is only safe change the directory marker policy if the following
|
|||||||
(including backing up) an S3 bucket.
|
(including backing up) an S3 bucket.
|
||||||
2. You know all applications which read data from the bucket are compatible.
|
2. You know all applications which read data from the bucket are compatible.
|
||||||
|
|
||||||
|
|
||||||
### <a name="backups"></a> Applications backing up data.
|
### <a name="backups"></a> Applications backing up data.
|
||||||
|
|
||||||
It is not enough to have a version of Apache Hadoop which is compatible, any
|
It is not enough to have a version of Apache Hadoop which is compatible, any
|
||||||
@ -240,7 +270,7 @@ can switch to the higher-performance mode for those specific directories.
|
|||||||
Only the default setting, `fs.s3a.directory.marker.retention = delete` is compatible with
|
Only the default setting, `fs.s3a.directory.marker.retention = delete` is compatible with
|
||||||
every shipping Hadoop releases.
|
every shipping Hadoop releases.
|
||||||
|
|
||||||
## <a name="authoritative"></a> Directory Markers and S3Guard
|
## <a name="s3guard"></a> Directory Markers and S3Guard
|
||||||
|
|
||||||
Applications which interact with S3A in S3A clients with S3Guard enabled still
|
Applications which interact with S3A in S3A clients with S3Guard enabled still
|
||||||
create and delete markers. There's no attempt to skip operations, such as by having
|
create and delete markers. There's no attempt to skip operations, such as by having
|
||||||
@ -256,6 +286,28 @@ then an S3A connector with a retention policy of `fs.s3a.directory.marker.retent
|
|||||||
only use in managed applications where all clients are using the same version of
|
only use in managed applications where all clients are using the same version of
|
||||||
hadoop, and configured consistently.
|
hadoop, and configured consistently.
|
||||||
|
|
||||||
|
After the directory marker feature [HADOOP-13230](https://issues.apache.org/jira/browse/HADOOP-13230)
|
||||||
|
was added, issues related to S3Guard integration surfaced:
|
||||||
|
|
||||||
|
1. The incremental update of the S3Guard table was inserting tombstones
|
||||||
|
over directories as the markers were deleted, hiding files underneath.
|
||||||
|
This happened during directory `rename()` and `delete()`.
|
||||||
|
1. The update of the S3Guard table after a partial failure of a bulk delete
|
||||||
|
operation would insert tombstones in S3Guard records of successfully
|
||||||
|
deleted markers, irrespective of the directory status.
|
||||||
|
|
||||||
|
Issue #1 is unique to Hadoop branch 3.3; however issue #2 is s critical
|
||||||
|
part of the S3Guard consistency handling.
|
||||||
|
|
||||||
|
Both issues have been fixed in Hadoop 3.3.x,
|
||||||
|
in [HADOOP-17244](https://issues.apache.org/jira/browse/HADOOP-17244)
|
||||||
|
|
||||||
|
Issue #2, delete failure handling, is not easily backported and is
|
||||||
|
not likely to be backported.
|
||||||
|
|
||||||
|
Accordingly: Hadoop releases with read-only compatibility must not be used
|
||||||
|
to rename or delete directories where markers are retained *when S3Guard is enabled.*
|
||||||
|
|
||||||
## <a name="bucket-info"></a> Verifying marker policy with `s3guard bucket-info`
|
## <a name="bucket-info"></a> Verifying marker policy with `s3guard bucket-info`
|
||||||
|
|
||||||
The `bucket-info` command has been enhanced to support verification from the command
|
The `bucket-info` command has been enhanced to support verification from the command
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||||
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
|
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -141,13 +142,14 @@ public void testMultiObjectDeleteNoPermissions() throws Throwable {
|
|||||||
Path markerPath = fs.keyToQualifiedPath(marker);
|
Path markerPath = fs.keyToQualifiedPath(marker);
|
||||||
keys.add(new DeleteObjectsRequest.KeyVersion(marker));
|
keys.add(new DeleteObjectsRequest.KeyVersion(marker));
|
||||||
|
|
||||||
Pair<List<Path>, List<Path>> pair =
|
Pair<List<KeyPath>, List<KeyPath>> pair =
|
||||||
new MultiObjectDeleteSupport(fs.createStoreContext(), null)
|
new MultiObjectDeleteSupport(fs.createStoreContext(), null)
|
||||||
.splitUndeletedKeys(ex, keys);
|
.splitUndeletedKeys(ex, keys);
|
||||||
assertEquals(undeleted, pair.getLeft());
|
assertEquals(undeleted, toPathList(pair.getLeft()));
|
||||||
List<Path> right = pair.getRight();
|
List<KeyPath> right = pair.getRight();
|
||||||
assertEquals("Wrong size for " + join(right), 1, right.size());
|
Assertions.assertThat(right)
|
||||||
assertEquals(markerPath, right.get(0));
|
.hasSize(1);
|
||||||
|
assertEquals(markerPath, right.get(0).getPath());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import org.assertj.core.api.Assertions;
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
||||||
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
||||||
@ -286,4 +287,27 @@ public int read() {
|
|||||||
s3.putObject(putObjectRequest);
|
s3.putObject(putObjectRequest);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDirMarkerDelete() throws Throwable {
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
assumeFilesystemHasMetadatastore(getFileSystem());
|
||||||
|
Path baseDir = methodPath();
|
||||||
|
Path subFile = new Path(baseDir, "subdir/file.txt");
|
||||||
|
// adds the s3guard entry
|
||||||
|
fs.mkdirs(baseDir);
|
||||||
|
touch(fs, subFile);
|
||||||
|
// PUT a marker
|
||||||
|
createEmptyObject(fs, fs.pathToKey(baseDir) + "/");
|
||||||
|
fs.delete(baseDir, true);
|
||||||
|
assertPathDoesNotExist("Should have been deleted", baseDir);
|
||||||
|
|
||||||
|
// now create the dir again
|
||||||
|
fs.mkdirs(baseDir);
|
||||||
|
FileStatus fileStatus = fs.getFileStatus(baseDir);
|
||||||
|
Assertions.assertThat(fileStatus)
|
||||||
|
.matches(FileStatus::isDirectory, "Not a directory");
|
||||||
|
Assertions.assertThat(fs.listStatus(baseDir))
|
||||||
|
.describedAs("listing of %s", baseDir)
|
||||||
|
.isEmpty();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -37,19 +37,13 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
||||||
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
|
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
|
||||||
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
|
|
||||||
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
|
|
||||||
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
|
||||||
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
||||||
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
|
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreCapabilities;
|
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreCapabilities;
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
|
|
||||||
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
|
||||||
|
import org.apache.hadoop.fs.s3a.test.OperationTrackingStore;
|
||||||
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
@ -61,13 +55,7 @@
|
|||||||
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
import com.amazonaws.AmazonClientException;
|
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|
||||||
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
|
||||||
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
|
|
||||||
import com.amazonaws.services.s3.transfer.model.CopyResult;
|
|
||||||
import javax.annotation.Nullable;
|
|
||||||
import org.hamcrest.core.Is;
|
import org.hamcrest.core.Is;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
@ -82,8 +70,6 @@
|
|||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.text.DateFormat;
|
import java.text.DateFormat;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
@ -1024,8 +1010,14 @@ public String toString() {
|
|||||||
* @param expected expected value.
|
* @param expected expected value.
|
||||||
*/
|
*/
|
||||||
public void assertDiffEquals(String message, long expected) {
|
public void assertDiffEquals(String message, long expected) {
|
||||||
Assert.assertEquals(message + ": " + statistic.getSymbol(),
|
String text = message + ": " + statistic.getSymbol();
|
||||||
expected, diff());
|
long diff = diff();
|
||||||
|
if (expected != diff) {
|
||||||
|
// Log in error ensures that the details appear in the test output
|
||||||
|
LOG.error(text + " expected {}, actual {}", expected, diff);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(text,
|
||||||
|
expected, diff);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1540,292 +1532,4 @@ public static S3AFileStatus innerGetFileStatus(
|
|||||||
probes);
|
probes);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class MinimalOperationCallbacks
|
|
||||||
implements OperationCallbacks {
|
|
||||||
@Override
|
|
||||||
public S3ObjectAttributes createObjectAttributes(
|
|
||||||
Path path,
|
|
||||||
String eTag,
|
|
||||||
String versionId,
|
|
||||||
long len) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public S3ObjectAttributes createObjectAttributes(
|
|
||||||
S3AFileStatus fileStatus) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public S3AReadOpContext createReadContext(
|
|
||||||
FileStatus fileStatus) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void finishRename(
|
|
||||||
Path sourceRenamed,
|
|
||||||
Path destCreated)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deleteObjectAtPath(
|
|
||||||
Path path,
|
|
||||||
String key,
|
|
||||||
boolean isFile,
|
|
||||||
BulkOperationState operationState)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
|
|
||||||
Path path,
|
|
||||||
S3AFileStatus status,
|
|
||||||
boolean collectTombstones,
|
|
||||||
boolean includeSelf)
|
|
||||||
throws IOException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public CopyResult copyFile(
|
|
||||||
String srcKey,
|
|
||||||
String destKey,
|
|
||||||
S3ObjectAttributes srcAttributes,
|
|
||||||
S3AReadOpContext readContext)
|
|
||||||
throws IOException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public DeleteObjectsResult removeKeys(
|
|
||||||
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
|
||||||
boolean deleteFakeDir,
|
|
||||||
List<Path> undeletedObjectsOnFailure,
|
|
||||||
BulkOperationState operationState,
|
|
||||||
boolean quiet)
|
|
||||||
throws MultiObjectDeleteException, AmazonClientException,
|
|
||||||
IOException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean allowAuthoritative(Path p) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RemoteIterator<S3AFileStatus> listObjects(
|
|
||||||
Path path,
|
|
||||||
String key)
|
|
||||||
throws IOException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* MetadataStore which tracks what is deleted and added.
|
|
||||||
*/
|
|
||||||
public static class OperationTrackingStore implements MetadataStore {
|
|
||||||
|
|
||||||
private final List<Path> deleted = new ArrayList<>();
|
|
||||||
|
|
||||||
private final List<Path> created = new ArrayList<>();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initialize(final FileSystem fs,
|
|
||||||
ITtlTimeProvider ttlTimeProvider) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initialize(final Configuration conf,
|
|
||||||
ITtlTimeProvider ttlTimeProvider) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void forgetMetadata(final Path path) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public PathMetadata get(final Path path) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public PathMetadata get(final Path path,
|
|
||||||
final boolean wantEmptyDirectoryFlag) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public DirListingMetadata listChildren(final Path path) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void put(final PathMetadata meta) {
|
|
||||||
put(meta, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void put(final PathMetadata meta,
|
|
||||||
final BulkOperationState operationState) {
|
|
||||||
created.add(meta.getFileStatus().getPath());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void put(final Collection<? extends PathMetadata> metas,
|
|
||||||
final BulkOperationState operationState) {
|
|
||||||
metas.stream().forEach(meta -> put(meta, null));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void put(final DirListingMetadata meta,
|
|
||||||
final List<Path> unchangedEntries,
|
|
||||||
final BulkOperationState operationState) {
|
|
||||||
created.add(meta.getPath());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void destroy() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void delete(final Path path,
|
|
||||||
final BulkOperationState operationState) {
|
|
||||||
deleted.add(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deletePaths(final Collection<Path> paths,
|
|
||||||
@Nullable final BulkOperationState operationState)
|
|
||||||
throws IOException {
|
|
||||||
deleted.addAll(paths);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deleteSubtree(final Path path,
|
|
||||||
final BulkOperationState operationState) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void move(@Nullable final Collection<Path> pathsToDelete,
|
|
||||||
@Nullable final Collection<PathMetadata> pathsToCreate,
|
|
||||||
@Nullable final BulkOperationState operationState) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void prune(final PruneMode pruneMode, final long cutoff) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long prune(final PruneMode pruneMode,
|
|
||||||
final long cutoff,
|
|
||||||
final String keyPrefix) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BulkOperationState initiateBulkWrite(
|
|
||||||
final BulkOperationState.OperationType operation,
|
|
||||||
final Path dest) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Map<String, String> getDiagnostics() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void updateParameters(final Map<String, String> parameters) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<Path> getDeleted() {
|
|
||||||
return deleted;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<Path> getCreated() {
|
|
||||||
return created;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RenameTracker initiateRenameOperation(
|
|
||||||
final StoreContext storeContext,
|
|
||||||
final Path source,
|
|
||||||
final S3AFileStatus sourceStatus,
|
|
||||||
final Path dest) {
|
|
||||||
throw new UnsupportedOperationException("unsupported");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addAncestors(final Path qualifiedPath,
|
|
||||||
@Nullable final BulkOperationState operationState) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class MinimalListingOperationCallbacks
|
|
||||||
implements ListingOperationCallbacks {
|
|
||||||
@Override
|
|
||||||
public CompletableFuture<S3ListResult> listObjectsAsync(
|
|
||||||
S3ListRequest request)
|
|
||||||
throws IOException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public CompletableFuture<S3ListResult> continueListObjectsAsync(
|
|
||||||
S3ListRequest request,
|
|
||||||
S3ListResult prevResult)
|
|
||||||
throws IOException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public S3ALocatedFileStatus toLocatedFileStatus(
|
|
||||||
S3AFileStatus status) throws IOException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public S3ListRequest createListObjectsRequest(
|
|
||||||
String key,
|
|
||||||
String delimiter) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getDefaultBlockSize(Path path) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMaxKeys() {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean allowAuthoritative(Path p) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,6 @@
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
@ -58,6 +57,7 @@
|
|||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
|
||||||
import static org.apache.hadoop.fs.s3a.Statistic.FILES_DELETE_REJECTED;
|
import static org.apache.hadoop.fs.s3a.Statistic.FILES_DELETE_REJECTED;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_OBJECTS;
|
||||||
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_REQUESTS;
|
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_REQUESTS;
|
||||||
import static org.apache.hadoop.fs.s3a.auth.RoleModel.Effects;
|
import static org.apache.hadoop.fs.s3a.auth.RoleModel.Effects;
|
||||||
import static org.apache.hadoop.fs.s3a.auth.RoleModel.Statement;
|
import static org.apache.hadoop.fs.s3a.auth.RoleModel.Statement;
|
||||||
@ -72,6 +72,7 @@
|
|||||||
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
|
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
|
||||||
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.extractUndeletedPaths;
|
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.extractUndeletedPaths;
|
||||||
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
|
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
|
||||||
|
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList;
|
||||||
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount;
|
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount;
|
||||||
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
|
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
|
||||||
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
||||||
@ -331,27 +332,37 @@ protected Configuration createConfiguration() {
|
|||||||
removeBucketOverrides(bucketName, conf,
|
removeBucketOverrides(bucketName, conf,
|
||||||
MAX_THREADS,
|
MAX_THREADS,
|
||||||
MAXIMUM_CONNECTIONS,
|
MAXIMUM_CONNECTIONS,
|
||||||
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY);
|
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
|
||||||
|
DIRECTORY_MARKER_POLICY,
|
||||||
|
BULK_DELETE_PAGE_SIZE);
|
||||||
conf.setInt(MAX_THREADS, EXECUTOR_THREAD_COUNT);
|
conf.setInt(MAX_THREADS, EXECUTOR_THREAD_COUNT);
|
||||||
conf.setInt(MAXIMUM_CONNECTIONS, EXECUTOR_THREAD_COUNT * 2);
|
conf.setInt(MAXIMUM_CONNECTIONS, EXECUTOR_THREAD_COUNT * 2);
|
||||||
// turn off prune delays, so as to stop scale tests creating
|
// turn off prune delays, so as to stop scale tests creating
|
||||||
// so much cruft that future CLI prune commands take forever
|
// so much cruft that future CLI prune commands take forever
|
||||||
conf.setInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, 0);
|
conf.setInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, 0);
|
||||||
|
// use the keep policy to ensure that surplus markers exist
|
||||||
|
// to complicate failures
|
||||||
|
conf.set(DIRECTORY_MARKER_POLICY, DIRECTORY_MARKER_POLICY_KEEP);
|
||||||
|
// set the delete page size to its maximum to ensure that all
|
||||||
|
// entries are included in the same large delete, even on
|
||||||
|
// scale runs. This is needed for assertions on the result.
|
||||||
|
conf.setInt(BULK_DELETE_PAGE_SIZE, 1_000);
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a unique path, which includes method name,
|
* Create a unique path, which includes method name,
|
||||||
* multidelete flag and a random UUID.
|
* multidelete flag and a timestamp.
|
||||||
* @return a string to use for paths.
|
* @return a string to use for paths.
|
||||||
* @throws IOException path creation failure.
|
* @throws IOException path creation failure.
|
||||||
*/
|
*/
|
||||||
private Path uniquePath() throws IOException {
|
private Path uniquePath() throws IOException {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
return path(
|
return path(
|
||||||
String.format("%s-%s-%04d",
|
String.format("%s-%s-%06d.%03d",
|
||||||
getMethodName(),
|
getMethodName(),
|
||||||
multiDelete ? "multi" : "single",
|
multiDelete ? "multi" : "single",
|
||||||
System.currentTimeMillis() % 10000));
|
now / 1000, now % 1000));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -477,8 +488,11 @@ public void testRenameDirFailsInDelete() throws Throwable {
|
|||||||
|
|
||||||
// create a set of files
|
// create a set of files
|
||||||
// this is done in parallel as it is 10x faster on a long-haul test run.
|
// this is done in parallel as it is 10x faster on a long-haul test run.
|
||||||
List<Path> createdFiles = createFiles(fs, readOnlyDir, dirDepth, fileCount,
|
List<Path> dirs = new ArrayList<>(dirCount);
|
||||||
dirCount);
|
List<Path> createdFiles = createDirsAndFiles(fs, readOnlyDir, dirDepth,
|
||||||
|
fileCount, dirCount,
|
||||||
|
new ArrayList<>(fileCount),
|
||||||
|
dirs);
|
||||||
// are they all there?
|
// are they all there?
|
||||||
int expectedFileCount = createdFiles.size();
|
int expectedFileCount = createdFiles.size();
|
||||||
assertFileCount("files ready to rename", roleFS,
|
assertFileCount("files ready to rename", roleFS,
|
||||||
@ -495,26 +509,36 @@ public void testRenameDirFailsInDelete() throws Throwable {
|
|||||||
MultiObjectDeleteException.class, deniedException);
|
MultiObjectDeleteException.class, deniedException);
|
||||||
final List<Path> undeleted
|
final List<Path> undeleted
|
||||||
= extractUndeletedPaths(mde, fs::keyToQualifiedPath);
|
= extractUndeletedPaths(mde, fs::keyToQualifiedPath);
|
||||||
|
|
||||||
|
List<Path> expectedUndeletedFiles = new ArrayList<>(createdFiles);
|
||||||
|
if (getFileSystem().getDirectoryMarkerPolicy()
|
||||||
|
.keepDirectoryMarkers(readOnlyDir)) {
|
||||||
|
// directory markers are being retained,
|
||||||
|
// so will also be in the list of undeleted files
|
||||||
|
expectedUndeletedFiles.addAll(dirs);
|
||||||
|
}
|
||||||
Assertions.assertThat(undeleted)
|
Assertions.assertThat(undeleted)
|
||||||
.as("files which could not be deleted")
|
.as("files which could not be deleted")
|
||||||
.hasSize(expectedFileCount)
|
.containsExactlyInAnyOrderElementsOf(expectedUndeletedFiles);
|
||||||
.containsAll(createdFiles)
|
|
||||||
.containsExactlyInAnyOrderElementsOf(createdFiles);
|
|
||||||
}
|
}
|
||||||
LOG.info("Result of renaming read-only files is as expected",
|
LOG.info("Result of renaming read-only files is as expected",
|
||||||
deniedException);
|
deniedException);
|
||||||
assertFileCount("files in the source directory", roleFS,
|
assertFileCount("files in the source directory", roleFS,
|
||||||
readOnlyDir, expectedFileCount);
|
readOnlyDir, expectedFileCount);
|
||||||
// now lets look at the destination.
|
// now lets look at the destination.
|
||||||
// even with S3Guard on, we expect the destination to match that of our
|
// even with S3Guard on, we expect the destination to match that of
|
||||||
// the remote state.
|
// the remote state.
|
||||||
// the test will exist
|
// the test will exist
|
||||||
describe("Verify destination directory exists");
|
describe("Verify destination directory exists");
|
||||||
FileStatus st = roleFS.getFileStatus(writableDir);
|
assertIsDirectory(writableDir);
|
||||||
assertTrue("Not a directory: " + st,
|
|
||||||
st.isDirectory());
|
|
||||||
assertFileCount("files in the dest directory", roleFS,
|
assertFileCount("files in the dest directory", roleFS,
|
||||||
writableDir, expectedFileCount);
|
writableDir, expectedFileCount);
|
||||||
|
// all directories in the source tree must still exist,
|
||||||
|
// which for S3Guard means no tombstone markers were added
|
||||||
|
LOG.info("Verifying all directories still exist");
|
||||||
|
for (Path dir : dirs) {
|
||||||
|
assertIsDirectory(dir);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -611,9 +635,14 @@ public void testPartialDirDelete() throws Throwable {
|
|||||||
|
|
||||||
// the full FS
|
// the full FS
|
||||||
S3AFileSystem fs = getFileSystem();
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
StoreContext storeContext = fs.createStoreContext();
|
||||||
|
|
||||||
List<Path> readOnlyFiles = createFiles(fs, readOnlyDir,
|
List<Path> dirs = new ArrayList<>(dirCount);
|
||||||
dirDepth, fileCount, dirCount);
|
List<Path> readOnlyFiles = createDirsAndFiles(
|
||||||
|
fs, readOnlyDir, dirDepth,
|
||||||
|
fileCount, dirCount,
|
||||||
|
new ArrayList<>(fileCount),
|
||||||
|
dirs);
|
||||||
List<Path> deletableFiles = createFiles(fs,
|
List<Path> deletableFiles = createFiles(fs,
|
||||||
writableDir, dirDepth, fileCount, dirCount);
|
writableDir, dirDepth, fileCount, dirCount);
|
||||||
|
|
||||||
@ -625,20 +654,31 @@ public void testPartialDirDelete() throws Throwable {
|
|||||||
readOnlyFiles.stream(),
|
readOnlyFiles.stream(),
|
||||||
deletableFiles.stream())
|
deletableFiles.stream())
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
List<MultiObjectDeleteSupport.KeyPath> keyPaths = allFiles.stream()
|
||||||
|
.map(path ->
|
||||||
|
new MultiObjectDeleteSupport.KeyPath(
|
||||||
|
storeContext.pathToKey(path),
|
||||||
|
path,
|
||||||
|
false))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
// this set can be deleted by the role FS
|
// this set can be deleted by the role FS
|
||||||
MetricDiff rejectionCount = new MetricDiff(roleFS, FILES_DELETE_REJECTED);
|
MetricDiff rejectionCount = new MetricDiff(roleFS, FILES_DELETE_REJECTED);
|
||||||
MetricDiff deleteVerbCount = new MetricDiff(roleFS, OBJECT_DELETE_REQUESTS);
|
MetricDiff deleteVerbCount = new MetricDiff(roleFS, OBJECT_DELETE_REQUESTS);
|
||||||
|
MetricDiff deleteObjectCount = new MetricDiff(roleFS,
|
||||||
|
OBJECT_DELETE_OBJECTS);
|
||||||
|
|
||||||
describe("Trying to delete read only directory");
|
describe("Trying to delete read only directory");
|
||||||
AccessDeniedException ex = expectDeleteForbidden(readOnlyDir);
|
AccessDeniedException ex = expectDeleteForbidden(readOnlyDir);
|
||||||
if (multiDelete) {
|
if (multiDelete) {
|
||||||
// multi-delete status checks
|
// multi-delete status checks
|
||||||
extractCause(MultiObjectDeleteException.class, ex);
|
extractCause(MultiObjectDeleteException.class, ex);
|
||||||
|
deleteVerbCount.assertDiffEquals("Wrong delete request count", 1);
|
||||||
|
deleteObjectCount.assertDiffEquals("Number of keys in delete request",
|
||||||
|
readOnlyFiles.size());
|
||||||
rejectionCount.assertDiffEquals("Wrong rejection count",
|
rejectionCount.assertDiffEquals("Wrong rejection count",
|
||||||
readOnlyFiles.size());
|
readOnlyFiles.size());
|
||||||
deleteVerbCount.assertDiffEquals("Wrong delete count", 1);
|
reset(rejectionCount, deleteVerbCount, deleteObjectCount);
|
||||||
reset(rejectionCount, deleteVerbCount);
|
|
||||||
}
|
}
|
||||||
// all the files are still there? (avoid in scale test due to cost)
|
// all the files are still there? (avoid in scale test due to cost)
|
||||||
if (!scaleTest) {
|
if (!scaleTest) {
|
||||||
@ -649,16 +689,20 @@ public void testPartialDirDelete() throws Throwable {
|
|||||||
ex = expectDeleteForbidden(basePath);
|
ex = expectDeleteForbidden(basePath);
|
||||||
if (multiDelete) {
|
if (multiDelete) {
|
||||||
// multi-delete status checks
|
// multi-delete status checks
|
||||||
extractCause(MultiObjectDeleteException.class, ex);
|
|
||||||
deleteVerbCount.assertDiffEquals("Wrong delete count", 1);
|
deleteVerbCount.assertDiffEquals("Wrong delete count", 1);
|
||||||
MultiObjectDeleteException mde = extractCause(
|
MultiObjectDeleteException mde = extractCause(
|
||||||
MultiObjectDeleteException.class, ex);
|
MultiObjectDeleteException.class, ex);
|
||||||
final List<Path> undeleted
|
List<MultiObjectDeleteSupport.KeyPath> undeletedKeyPaths =
|
||||||
= removeUndeletedPaths(mde, allFiles, fs::keyToQualifiedPath);
|
removeUndeletedPaths(mde, keyPaths, storeContext::keyToPath);
|
||||||
|
final List<Path> undeleted = toPathList(
|
||||||
|
undeletedKeyPaths);
|
||||||
|
deleteObjectCount.assertDiffEquals(
|
||||||
|
"Wrong count of objects in delete request",
|
||||||
|
allFiles.size());
|
||||||
Assertions.assertThat(undeleted)
|
Assertions.assertThat(undeleted)
|
||||||
.as("files which could not be deleted")
|
.as("files which could not be deleted")
|
||||||
.containsExactlyInAnyOrderElementsOf(readOnlyFiles);
|
.containsExactlyInAnyOrderElementsOf(readOnlyFiles);
|
||||||
Assertions.assertThat(allFiles)
|
Assertions.assertThat(toPathList(keyPaths))
|
||||||
.as("files which were deleted")
|
.as("files which were deleted")
|
||||||
.containsExactlyInAnyOrderElementsOf(deletableFiles);
|
.containsExactlyInAnyOrderElementsOf(deletableFiles);
|
||||||
rejectionCount.assertDiffEquals("Wrong rejection count",
|
rejectionCount.assertDiffEquals("Wrong rejection count",
|
||||||
@ -677,7 +721,26 @@ public void testPartialDirDelete() throws Throwable {
|
|||||||
|
|
||||||
Assertions.assertThat(readOnlyListing)
|
Assertions.assertThat(readOnlyListing)
|
||||||
.as("ReadOnly directory " + directoryList)
|
.as("ReadOnly directory " + directoryList)
|
||||||
.containsAll(readOnlyFiles);
|
.containsExactlyInAnyOrderElementsOf(readOnlyFiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies the logic of handling directory markers in
|
||||||
|
* delete operations, specifically:
|
||||||
|
* <ol>
|
||||||
|
* <li>all markers above empty directories MUST be deleted</li>
|
||||||
|
* <li>all markers above non-empty directories MUST NOT be deleted</li>
|
||||||
|
* </ol>
|
||||||
|
* As the delete list may include subdirectories, we need to work up from
|
||||||
|
* the bottom of the list of deleted files before probing the parents,
|
||||||
|
* that being done by a s3guard get(path, need-empty-directory) call.
|
||||||
|
* <p></p>
|
||||||
|
* This is pretty sensitive code.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSubdirDeleteFailures() throws Throwable {
|
||||||
|
describe("Multiobject delete handling of directorYesFory markers");
|
||||||
|
assume("Multiobject delete only", multiDelete);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -771,7 +834,7 @@ private static CompletableFuture<Path> put(FileSystem fs,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parallel-touch a set of files in the destination directory.
|
* Build a set of files in a directory tree.
|
||||||
* @param fs filesystem
|
* @param fs filesystem
|
||||||
* @param destDir destination
|
* @param destDir destination
|
||||||
* @param depth file depth
|
* @param depth file depth
|
||||||
@ -784,12 +847,48 @@ public static List<Path> createFiles(final FileSystem fs,
|
|||||||
final int depth,
|
final int depth,
|
||||||
final int fileCount,
|
final int fileCount,
|
||||||
final int dirCount) throws IOException {
|
final int dirCount) throws IOException {
|
||||||
List<CompletableFuture<Path>> futures = new ArrayList<>(fileCount);
|
return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
|
||||||
List<Path> paths = new ArrayList<>(fileCount);
|
new ArrayList<Path>(fileCount),
|
||||||
List<Path> dirs = new ArrayList<>(fileCount);
|
new ArrayList<Path>(dirCount));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a set of files in a directory tree.
|
||||||
|
* @param fs filesystem
|
||||||
|
* @param destDir destination
|
||||||
|
* @param depth file depth
|
||||||
|
* @param fileCount number of files to create.
|
||||||
|
* @param dirCount number of dirs to create at each level
|
||||||
|
* @param paths [out] list of file paths created
|
||||||
|
* @param dirs [out] list of directory paths created.
|
||||||
|
* @return the list of files created.
|
||||||
|
*/
|
||||||
|
public static List<Path> createDirsAndFiles(final FileSystem fs,
|
||||||
|
final Path destDir,
|
||||||
|
final int depth,
|
||||||
|
final int fileCount,
|
||||||
|
final int dirCount,
|
||||||
|
final List<Path> paths,
|
||||||
|
final List<Path> dirs) throws IOException {
|
||||||
buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
|
buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
|
||||||
|
List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
|
||||||
|
+ dirs.size());
|
||||||
|
|
||||||
|
// create directories. With dir marker retention, that adds more entries
|
||||||
|
// to cause deletion issues
|
||||||
try (DurationInfo ignore =
|
try (DurationInfo ignore =
|
||||||
new DurationInfo(LOG, "Creating %d files", fileCount)) {
|
new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
|
||||||
|
for (Path path : dirs) {
|
||||||
|
futures.add(submit(EXECUTOR, () ->{
|
||||||
|
fs.mkdirs(path);
|
||||||
|
return path;
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
waitForCompletion(futures);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (DurationInfo ignore =
|
||||||
|
new DurationInfo(LOG, "Creating %d files", paths.size())) {
|
||||||
for (Path path : paths) {
|
for (Path path : paths) {
|
||||||
futures.add(put(fs, path, path.getName()));
|
futures.add(put(fs, path, path.getName()));
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -36,9 +37,11 @@
|
|||||||
import org.apache.commons.lang3.tuple.Triple;
|
import org.apache.commons.lang3.tuple.Triple;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||||
|
import org.apache.hadoop.fs.s3a.test.OperationTrackingStore;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.ACCESS_DENIED;
|
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.ACCESS_DENIED;
|
||||||
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
|
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
|
||||||
|
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -56,36 +59,42 @@ private static Path qualifyKey(String k) {
|
|||||||
return new Path("s3a://bucket/" + k);
|
return new Path("s3a://bucket/" + k);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String toKey(Path path) {
|
||||||
|
return path.toUri().getPath();
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
context = S3ATestUtils.createMockStoreContext(true,
|
context = S3ATestUtils.createMockStoreContext(true,
|
||||||
new S3ATestUtils.OperationTrackingStore(), CONTEXT_ACCESSORS);
|
new OperationTrackingStore(), CONTEXT_ACCESSORS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteExtraction() {
|
public void testDeleteExtraction() {
|
||||||
List<Path> src = pathList("a", "a/b", "a/c");
|
List<MultiObjectDeleteSupport.KeyPath> src = pathList("a", "a/b", "a/c");
|
||||||
List<Path> rejected = pathList("a/b");
|
List<MultiObjectDeleteSupport.KeyPath> rejected = pathList("a/b");
|
||||||
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
|
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
|
||||||
rejected);
|
rejected);
|
||||||
List<Path> undeleted = removeUndeletedPaths(ex, src,
|
List<MultiObjectDeleteSupport.KeyPath> undeleted =
|
||||||
TestPartialDeleteFailures::qualifyKey);
|
removeUndeletedPaths(ex, src,
|
||||||
|
TestPartialDeleteFailures::qualifyKey);
|
||||||
assertEquals("mismatch of rejected and undeleted entries",
|
assertEquals("mismatch of rejected and undeleted entries",
|
||||||
rejected, undeleted);
|
rejected, undeleted);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSplitKeysFromResults() throws Throwable {
|
public void testSplitKeysFromResults() throws Throwable {
|
||||||
List<Path> src = pathList("a", "a/b", "a/c");
|
List<MultiObjectDeleteSupport.KeyPath> src = pathList("a", "a/b", "a/c");
|
||||||
List<Path> rejected = pathList("a/b");
|
List<MultiObjectDeleteSupport.KeyPath> rejected = pathList("a/b");
|
||||||
List<DeleteObjectsRequest.KeyVersion> keys = keysToDelete(src);
|
List<DeleteObjectsRequest.KeyVersion> keys = keysToDelete(toPathList(src));
|
||||||
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
|
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
|
||||||
rejected);
|
rejected);
|
||||||
Pair<List<Path>, List<Path>> pair =
|
Pair<List<MultiObjectDeleteSupport.KeyPath>,
|
||||||
|
List<MultiObjectDeleteSupport.KeyPath>> pair =
|
||||||
new MultiObjectDeleteSupport(context, null)
|
new MultiObjectDeleteSupport(context, null)
|
||||||
.splitUndeletedKeys(ex, keys);
|
.splitUndeletedKeys(ex, keys);
|
||||||
List<Path> undeleted = pair.getLeft();
|
List<MultiObjectDeleteSupport.KeyPath> undeleted = pair.getLeft();
|
||||||
List<Path> deleted = pair.getRight();
|
List<MultiObjectDeleteSupport.KeyPath> deleted = pair.getRight();
|
||||||
assertEquals(rejected, undeleted);
|
assertEquals(rejected, undeleted);
|
||||||
// now check the deleted list to verify that it is valid
|
// now check the deleted list to verify that it is valid
|
||||||
src.remove(rejected.get(0));
|
src.remove(rejected.get(0));
|
||||||
@ -97,9 +106,12 @@ public void testSplitKeysFromResults() throws Throwable {
|
|||||||
* @param paths paths to qualify and then convert to a lst.
|
* @param paths paths to qualify and then convert to a lst.
|
||||||
* @return same paths as a list.
|
* @return same paths as a list.
|
||||||
*/
|
*/
|
||||||
private List<Path> pathList(String... paths) {
|
private List<MultiObjectDeleteSupport.KeyPath> pathList(String... paths) {
|
||||||
return Arrays.stream(paths)
|
return Arrays.stream(paths)
|
||||||
.map(TestPartialDeleteFailures::qualifyKey)
|
.map(k->
|
||||||
|
new MultiObjectDeleteSupport.KeyPath(k,
|
||||||
|
qualifyKey(k),
|
||||||
|
k.endsWith("/")))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,12 +123,13 @@ private List<Path> pathList(String... paths) {
|
|||||||
*/
|
*/
|
||||||
private MultiObjectDeleteException createDeleteException(
|
private MultiObjectDeleteException createDeleteException(
|
||||||
final String code,
|
final String code,
|
||||||
final List<Path> rejected) {
|
final List<MultiObjectDeleteSupport.KeyPath> rejected) {
|
||||||
List<MultiObjectDeleteException.DeleteError> errors = rejected.stream()
|
List<MultiObjectDeleteException.DeleteError> errors = rejected.stream()
|
||||||
.map((p) -> {
|
.map((kp) -> {
|
||||||
|
Path p = kp.getPath();
|
||||||
MultiObjectDeleteException.DeleteError e
|
MultiObjectDeleteException.DeleteError e
|
||||||
= new MultiObjectDeleteException.DeleteError();
|
= new MultiObjectDeleteException.DeleteError();
|
||||||
e.setKey(p.toUri().getPath());
|
e.setKey(kp.getKey());
|
||||||
e.setCode(code);
|
e.setCode(code);
|
||||||
e.setMessage("forbidden");
|
e.setMessage("forbidden");
|
||||||
return e;
|
return e;
|
||||||
@ -125,14 +138,33 @@ private MultiObjectDeleteException createDeleteException(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* From a list of paths, build up the list of keys for a delete request.
|
* From a list of paths, build up the list of KeyVersion records
|
||||||
|
* for a delete request.
|
||||||
|
* All the entries will be files (i.e. no trailing /)
|
||||||
* @param paths path list
|
* @param paths path list
|
||||||
* @return a key list suitable for a delete request.
|
* @return a key list suitable for a delete request.
|
||||||
*/
|
*/
|
||||||
public static List<DeleteObjectsRequest.KeyVersion> keysToDelete(
|
public static List<DeleteObjectsRequest.KeyVersion> keysToDelete(
|
||||||
List<Path> paths) {
|
List<Path> paths) {
|
||||||
return paths.stream()
|
return paths.stream()
|
||||||
.map((p) -> p.toUri().getPath())
|
.map(p -> {
|
||||||
|
String uripath = p.toUri().getPath();
|
||||||
|
return uripath.substring(1);
|
||||||
|
})
|
||||||
|
.map(DeleteObjectsRequest.KeyVersion::new)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* From a list of keys, build up the list of keys for a delete request.
|
||||||
|
* If a key has a trailing /, that will be retained, so it will be
|
||||||
|
* considered a directory during multi-object delete failure handling
|
||||||
|
* @param keys key list
|
||||||
|
* @return a key list suitable for a delete request.
|
||||||
|
*/
|
||||||
|
public static List<DeleteObjectsRequest.KeyVersion> toDeleteRequests(
|
||||||
|
List<String> keys) {
|
||||||
|
return keys.stream()
|
||||||
.map(DeleteObjectsRequest.KeyVersion::new)
|
.map(DeleteObjectsRequest.KeyVersion::new)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
@ -143,23 +175,33 @@ public static List<DeleteObjectsRequest.KeyVersion> keysToDelete(
|
|||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testProcessDeleteFailure() throws Throwable {
|
public void testProcessDeleteFailure() throws Throwable {
|
||||||
Path pathA = qualifyKey("/a");
|
String keyA = "/a/";
|
||||||
Path pathAB = qualifyKey("/a/b");
|
String keyAB = "/a/b";
|
||||||
Path pathAC = qualifyKey("/a/c");
|
String keyAC = "/a/c";
|
||||||
|
Path pathA = qualifyKey(keyA);
|
||||||
|
Path pathAB = qualifyKey(keyAB);
|
||||||
|
Path pathAC = qualifyKey(keyAC);
|
||||||
|
List<String> srcKeys = Lists.newArrayList(keyA, keyAB, keyAC);
|
||||||
List<Path> src = Lists.newArrayList(pathA, pathAB, pathAC);
|
List<Path> src = Lists.newArrayList(pathA, pathAB, pathAC);
|
||||||
List<DeleteObjectsRequest.KeyVersion> keyList = keysToDelete(src);
|
List<DeleteObjectsRequest.KeyVersion> keyList = toDeleteRequests(srcKeys);
|
||||||
List<Path> deleteForbidden = Lists.newArrayList(pathAB);
|
List<Path> deleteForbidden = Lists.newArrayList(pathAB);
|
||||||
final List<Path> deleteAllowed = Lists.newArrayList(pathA, pathAC);
|
final List<Path> deleteAllowed = Lists.newArrayList(pathA, pathAC);
|
||||||
|
List<MultiObjectDeleteSupport.KeyPath> forbiddenKP =
|
||||||
|
Lists.newArrayList(
|
||||||
|
new MultiObjectDeleteSupport.KeyPath(keyAB, pathAB, true));
|
||||||
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
|
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
|
||||||
deleteForbidden);
|
forbiddenKP);
|
||||||
S3ATestUtils.OperationTrackingStore store
|
OperationTrackingStore store
|
||||||
= new S3ATestUtils.OperationTrackingStore();
|
= new OperationTrackingStore();
|
||||||
StoreContext storeContext = S3ATestUtils
|
StoreContext storeContext = S3ATestUtils
|
||||||
.createMockStoreContext(true, store, CONTEXT_ACCESSORS);
|
.createMockStoreContext(true, store, CONTEXT_ACCESSORS);
|
||||||
MultiObjectDeleteSupport deleteSupport
|
MultiObjectDeleteSupport deleteSupport
|
||||||
= new MultiObjectDeleteSupport(storeContext, null);
|
= new MultiObjectDeleteSupport(storeContext, null);
|
||||||
|
List<Path> retainedMarkers = new ArrayList<>();
|
||||||
Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
|
Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
|
||||||
triple = deleteSupport.processDeleteFailure(ex, keyList);
|
triple = deleteSupport.processDeleteFailure(ex,
|
||||||
|
keyList,
|
||||||
|
retainedMarkers);
|
||||||
Assertions.assertThat(triple.getRight())
|
Assertions.assertThat(triple.getRight())
|
||||||
.as("failure list")
|
.as("failure list")
|
||||||
.isEmpty();
|
.isEmpty();
|
||||||
@ -173,6 +215,14 @@ public void testProcessDeleteFailure() throws Throwable {
|
|||||||
as("undeleted store entries")
|
as("undeleted store entries")
|
||||||
.containsAll(deleteForbidden)
|
.containsAll(deleteForbidden)
|
||||||
.doesNotContainAnyElementsOf(deleteAllowed);
|
.doesNotContainAnyElementsOf(deleteAllowed);
|
||||||
|
// because dir marker retention is on, we expect at least one retained
|
||||||
|
// marker
|
||||||
|
Assertions.assertThat(retainedMarkers).
|
||||||
|
as("Retained Markers")
|
||||||
|
.containsExactly(pathA);
|
||||||
|
Assertions.assertThat(store.getDeleted()).
|
||||||
|
as("List of tombstoned records")
|
||||||
|
.doesNotContain(pathA);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -111,7 +111,10 @@ public Configuration createConfiguration() {
|
|||||||
keepMarkers
|
keepMarkers
|
||||||
? DIRECTORY_MARKER_POLICY_KEEP
|
? DIRECTORY_MARKER_POLICY_KEEP
|
||||||
: DIRECTORY_MARKER_POLICY_DELETE);
|
: DIRECTORY_MARKER_POLICY_DELETE);
|
||||||
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
|
if (isGuarded()) {
|
||||||
|
conf.set(S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_DYNAMO);
|
||||||
|
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
|
||||||
|
}
|
||||||
disableFilesystemCaching(conf);
|
disableFilesystemCaching(conf);
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
@ -148,6 +151,7 @@ public void setup() throws Exception {
|
|||||||
INVOCATION_COPY_FROM_LOCAL_FILE,
|
INVOCATION_COPY_FROM_LOCAL_FILE,
|
||||||
OBJECT_COPY_REQUESTS,
|
OBJECT_COPY_REQUESTS,
|
||||||
OBJECT_DELETE_REQUESTS,
|
OBJECT_DELETE_REQUESTS,
|
||||||
|
OBJECT_DELETE_OBJECTS,
|
||||||
OBJECT_LIST_REQUESTS,
|
OBJECT_LIST_REQUESTS,
|
||||||
OBJECT_METADATA_REQUESTS,
|
OBJECT_METADATA_REQUESTS,
|
||||||
OBJECT_PUT_BYTES,
|
OBJECT_PUT_BYTES,
|
||||||
|
@ -19,16 +19,20 @@
|
|||||||
package org.apache.hadoop.fs.s3a.performance;
|
package org.apache.hadoop.fs.s3a.performance;
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
import org.apache.hadoop.fs.s3a.Tristate;
|
import org.apache.hadoop.fs.s3a.Tristate;
|
||||||
@ -37,6 +41,7 @@
|
|||||||
import static org.apache.hadoop.fs.s3a.Statistic.*;
|
import static org.apache.hadoop.fs.s3a.Statistic.*;
|
||||||
import static org.apache.hadoop.fs.s3a.performance.OperationCost.*;
|
import static org.apache.hadoop.fs.s3a.performance.OperationCost.*;
|
||||||
import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
|
import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use metrics to assert about the cost of file API calls.
|
* Use metrics to assert about the cost of file API calls.
|
||||||
@ -58,7 +63,9 @@ public static Collection<Object[]> params() {
|
|||||||
{"raw-keep-markers", false, true, false},
|
{"raw-keep-markers", false, true, false},
|
||||||
{"raw-delete-markers", false, false, false},
|
{"raw-delete-markers", false, false, false},
|
||||||
{"nonauth-keep-markers", true, true, false},
|
{"nonauth-keep-markers", true, true, false},
|
||||||
{"auth-delete-markers", true, false, true}
|
{"nonauth-delete-markers", true, false, false},
|
||||||
|
{"auth-delete-markers", true, false, true},
|
||||||
|
{"auth-keep-markers", true, true, true}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,7 +152,7 @@ public void testDeleteFileInDir() throws Throwable {
|
|||||||
boolean rawAndDeleting = isRaw() && isDeleting();
|
boolean rawAndDeleting = isRaw() && isDeleting();
|
||||||
verifyMetrics(() -> {
|
verifyMetrics(() -> {
|
||||||
fs.delete(file1, false);
|
fs.delete(file1, false);
|
||||||
return "after fs.delete(file1simpleFile) " + getMetricSummary();
|
return "after fs.delete(file1) " + getMetricSummary();
|
||||||
},
|
},
|
||||||
// delete file. For keeping: that's it
|
// delete file. For keeping: that's it
|
||||||
probe(rawAndKeeping, OBJECT_METADATA_REQUESTS,
|
probe(rawAndKeeping, OBJECT_METADATA_REQUESTS,
|
||||||
@ -173,7 +180,24 @@ public void testDeleteFileInDir() throws Throwable {
|
|||||||
public void testDirMarkersSubdir() throws Throwable {
|
public void testDirMarkersSubdir() throws Throwable {
|
||||||
describe("verify cost of deep subdir creation");
|
describe("verify cost of deep subdir creation");
|
||||||
|
|
||||||
Path subDir = new Path(methodPath(), "1/2/3/4/5/6");
|
Path methodPath = methodPath();
|
||||||
|
Path parent = new Path(methodPath, "parent");
|
||||||
|
Path subDir = new Path(parent, "1/2/3/4/5/6");
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
// this creates a peer of the parent dir, so ensures
|
||||||
|
// that when parent dir is deleted, no markers need to
|
||||||
|
// be recreated...that complicates all the metrics which
|
||||||
|
// are measured
|
||||||
|
Path sibling = new Path(methodPath, "sibling");
|
||||||
|
ContractTestUtils.touch(fs, sibling);
|
||||||
|
|
||||||
|
int dirsCreated = 2;
|
||||||
|
fs.delete(parent, true);
|
||||||
|
|
||||||
|
LOG.info("creating parent dir {}", parent);
|
||||||
|
fs.mkdirs(parent);
|
||||||
|
|
||||||
|
LOG.info("creating sub directory {}", subDir);
|
||||||
// one dir created, possibly a parent removed
|
// one dir created, possibly a parent removed
|
||||||
verifyMetrics(() -> {
|
verifyMetrics(() -> {
|
||||||
mkdirs(subDir);
|
mkdirs(subDir);
|
||||||
@ -187,6 +211,47 @@ public void testDirMarkersSubdir() throws Throwable {
|
|||||||
// delete all possible fake dirs above the subdirectory
|
// delete all possible fake dirs above the subdirectory
|
||||||
withWhenDeleting(FAKE_DIRECTORIES_DELETED,
|
withWhenDeleting(FAKE_DIRECTORIES_DELETED,
|
||||||
directoriesInPath(subDir) - 1));
|
directoriesInPath(subDir) - 1));
|
||||||
|
|
||||||
|
int dirDeleteRequests = 1;
|
||||||
|
int fileDeleteRequests = 0;
|
||||||
|
int totalDeleteRequests = dirDeleteRequests + fileDeleteRequests;
|
||||||
|
|
||||||
|
LOG.info("About to delete {}", parent);
|
||||||
|
// now delete the deep tree.
|
||||||
|
verifyMetrics(() -> {
|
||||||
|
fs.delete(parent, true);
|
||||||
|
return "deleting parent dir " + parent + " " + getMetricSummary();
|
||||||
|
},
|
||||||
|
|
||||||
|
// two directory markers will be deleted in a single request
|
||||||
|
with(OBJECT_DELETE_REQUESTS, totalDeleteRequests),
|
||||||
|
// keeping: the parent dir marker needs deletion alongside
|
||||||
|
// the subdir one.
|
||||||
|
withWhenKeeping(OBJECT_DELETE_OBJECTS, dirsCreated),
|
||||||
|
|
||||||
|
// deleting: only the marker at the bottom needs deleting
|
||||||
|
withWhenDeleting(OBJECT_DELETE_OBJECTS, 1));
|
||||||
|
|
||||||
|
// followup with list calls to make sure all is clear.
|
||||||
|
verifyNoListing(parent);
|
||||||
|
verifyNoListing(subDir);
|
||||||
|
// now reinstate the directory, which in HADOOP-17244 hitting problems
|
||||||
|
fs.mkdirs(parent);
|
||||||
|
FileStatus[] children = fs.listStatus(parent);
|
||||||
|
Assertions.assertThat(children)
|
||||||
|
.describedAs("Children of %s", parent)
|
||||||
|
.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List a path, verify that there are no direct child entries.
|
||||||
|
* @param path path to scan
|
||||||
|
*/
|
||||||
|
protected void verifyNoListing(final Path path) throws Exception {
|
||||||
|
intercept(FileNotFoundException.class, () -> {
|
||||||
|
FileStatus[] statuses = getFileSystem().listStatus(path);
|
||||||
|
return Arrays.deepToString(statuses);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -0,0 +1,85 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3ListRequest;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3ListResult;
|
||||||
|
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stub implementation of {@link ListingOperationCallbacks}.
|
||||||
|
*/
|
||||||
|
public class MinimalListingOperationCallbacks
|
||||||
|
implements ListingOperationCallbacks {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<S3ListResult> listObjectsAsync(
|
||||||
|
S3ListRequest request)
|
||||||
|
throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<S3ListResult> continueListObjectsAsync(
|
||||||
|
S3ListRequest request,
|
||||||
|
S3ListResult prevResult)
|
||||||
|
throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public S3ALocatedFileStatus toLocatedFileStatus(
|
||||||
|
S3AFileStatus status) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public S3ListRequest createListObjectsRequest(
|
||||||
|
String key,
|
||||||
|
String delimiter) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDefaultBlockSize(Path path) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxKeys() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean allowAuthoritative(Path p) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,128 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.amazonaws.AmazonClientException;
|
||||||
|
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||||
|
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
||||||
|
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
|
||||||
|
import com.amazonaws.services.s3.transfer.model.CopyResult;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
|
||||||
|
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stub implementation of {@link OperationCallbacks}.
|
||||||
|
*/
|
||||||
|
public class MinimalOperationCallbacks
|
||||||
|
implements OperationCallbacks {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public S3ObjectAttributes createObjectAttributes(
|
||||||
|
Path path,
|
||||||
|
String eTag,
|
||||||
|
String versionId,
|
||||||
|
long len) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public S3ObjectAttributes createObjectAttributes(
|
||||||
|
S3AFileStatus fileStatus) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public S3AReadOpContext createReadContext(
|
||||||
|
FileStatus fileStatus) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void finishRename(
|
||||||
|
Path sourceRenamed,
|
||||||
|
Path destCreated)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteObjectAtPath(
|
||||||
|
Path path,
|
||||||
|
String key,
|
||||||
|
boolean isFile,
|
||||||
|
BulkOperationState operationState)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
|
||||||
|
final Path path,
|
||||||
|
final S3AFileStatus status,
|
||||||
|
final boolean collectTombstones,
|
||||||
|
final boolean includeSelf) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CopyResult copyFile(
|
||||||
|
String srcKey,
|
||||||
|
String destKey,
|
||||||
|
S3ObjectAttributes srcAttributes,
|
||||||
|
S3AReadOpContext readContext)
|
||||||
|
throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DeleteObjectsResult removeKeys(
|
||||||
|
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
|
||||||
|
boolean deleteFakeDir,
|
||||||
|
List<Path> undeletedObjectsOnFailure,
|
||||||
|
BulkOperationState operationState,
|
||||||
|
boolean quiet)
|
||||||
|
throws MultiObjectDeleteException, AmazonClientException,
|
||||||
|
IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean allowAuthoritative(Path p) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RemoteIterator<S3AFileStatus> listObjects(
|
||||||
|
Path path,
|
||||||
|
String key)
|
||||||
|
throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,189 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.test;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AFileStatus;
|
||||||
|
import org.apache.hadoop.fs.s3a.impl.StoreContext;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
|
||||||
|
import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MetadataStore which tracks what is deleted and added.
|
||||||
|
*/
|
||||||
|
public class OperationTrackingStore implements MetadataStore {
|
||||||
|
|
||||||
|
private final List<Path> deleted = new ArrayList<>();
|
||||||
|
|
||||||
|
private final List<Path> created = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(final FileSystem fs,
|
||||||
|
ITtlTimeProvider ttlTimeProvider) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(final Configuration conf,
|
||||||
|
ITtlTimeProvider ttlTimeProvider) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void forgetMetadata(final Path path) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PathMetadata get(final Path path) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PathMetadata get(final Path path,
|
||||||
|
final boolean wantEmptyDirectoryFlag) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DirListingMetadata listChildren(final Path path) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(final PathMetadata meta) {
|
||||||
|
put(meta, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(final PathMetadata meta,
|
||||||
|
final BulkOperationState operationState) {
|
||||||
|
created.add(meta.getFileStatus().getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(final Collection<? extends PathMetadata> metas,
|
||||||
|
final BulkOperationState operationState) {
|
||||||
|
metas.stream().forEach(meta -> put(meta, null));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(final DirListingMetadata meta,
|
||||||
|
final List<Path> unchangedEntries,
|
||||||
|
final BulkOperationState operationState) {
|
||||||
|
created.add(meta.getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void delete(final Path path,
|
||||||
|
final BulkOperationState operationState) {
|
||||||
|
deleted.add(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deletePaths(final Collection<Path> paths,
|
||||||
|
@Nullable final BulkOperationState operationState)
|
||||||
|
throws IOException {
|
||||||
|
deleted.addAll(paths);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteSubtree(final Path path,
|
||||||
|
final BulkOperationState operationState) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void move(@Nullable final Collection<Path> pathsToDelete,
|
||||||
|
@Nullable final Collection<PathMetadata> pathsToCreate,
|
||||||
|
@Nullable final BulkOperationState operationState) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prune(final PruneMode pruneMode, final long cutoff) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long prune(final PruneMode pruneMode,
|
||||||
|
final long cutoff,
|
||||||
|
final String keyPrefix) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BulkOperationState initiateBulkWrite(
|
||||||
|
final BulkOperationState.OperationType operation,
|
||||||
|
final Path dest) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, String> getDiagnostics() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateParameters(final Map<String, String> parameters) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Path> getDeleted() {
|
||||||
|
return deleted;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Path> getCreated() {
|
||||||
|
return created;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RenameTracker initiateRenameOperation(
|
||||||
|
final StoreContext storeContext,
|
||||||
|
final Path source,
|
||||||
|
final S3AFileStatus sourceStatus,
|
||||||
|
final Path dest) {
|
||||||
|
throw new UnsupportedOperationException("unsupported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addAncestors(final Path qualifiedPath,
|
||||||
|
@Nullable final BulkOperationState operationState) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user