HADOOP-18112: Implement paging during multi object delete. (#4045)

Multi object delete of size more than 1000 is not supported by S3 and 
fails with MalformedXML error. So implementing paging of requests to 
reduce the number of keys in a single request. Page size can be configured
using "fs.s3a.bulk.delete.page.size" 

 Contributed By: Mukund Thakur
This commit is contained in:
Mukund Thakur 2022-03-11 13:05:45 +05:30 committed by GitHub
parent ed65aa2324
commit 672e380c4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 273 additions and 232 deletions

View File

@ -232,4 +232,28 @@ private static <T> boolean addAll(Collection<T> addTo,
return addAll(addTo, elementsToAdd.iterator()); return addAll(addTo, elementsToAdd.iterator());
} }
/**
* Returns consecutive sub-lists of a list, each of the same size
* (the final list may be smaller).
* @param originalList original big list.
* @param pageSize desired size of each sublist ( last one
* may be smaller)
* @return a list of sub lists.
*/
public static <T> List<List<T>> partition(List<T> originalList, int pageSize) {
Preconditions.checkArgument(originalList != null && originalList.size() > 0,
"Invalid original list");
Preconditions.checkArgument(pageSize > 0, "Page size should " +
"be greater than 0 for performing partition");
List<List<T>> result = new ArrayList<>();
int i=0;
while (i < originalList.size()) {
result.add(originalList.subList(i,
Math.min(i + pageSize, originalList.size())));
i = i + pageSize;
}
return result;
}
} }

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.util; package org.apache.hadoop.util;
import org.assertj.core.api.Assertions;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -79,6 +81,48 @@ public void testItrLinkedLists() {
Assert.assertEquals(4, list.size()); Assert.assertEquals(4, list.size());
} }
@Test
public void testListsPartition() {
List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");
list.add("d");
list.add("e");
List<List<String>> res = Lists.
partition(list, 2);
Assertions.assertThat(res)
.describedAs("Number of partitions post partition")
.hasSize(3);
Assertions.assertThat(res.get(0))
.describedAs("Number of elements in first partition")
.hasSize(2);
Assertions.assertThat(res.get(2))
.describedAs("Number of elements in last partition")
.hasSize(1);
List<List<String>> res2 = Lists.
partition(list, 1);
Assertions.assertThat(res2)
.describedAs("Number of partitions post partition")
.hasSize(5);
Assertions.assertThat(res2.get(0))
.describedAs("Number of elements in first partition")
.hasSize(1);
Assertions.assertThat(res2.get(4))
.describedAs("Number of elements in last partition")
.hasSize(1);
List<List<String>> res3 = Lists.
partition(list, 6);
Assertions.assertThat(res3)
.describedAs("Number of partitions post partition")
.hasSize(1);
Assertions.assertThat(res3.get(0))
.describedAs("Number of elements in first partition")
.hasSize(5);
}
@Test @Test
public void testArrayListWithSize() { public void testArrayListWithSize() {
List<String> list = Lists.newArrayListWithCapacity(3); List<String> list = Lists.newArrayListWithCapacity(3);

View File

@ -135,6 +135,7 @@
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils; import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -225,6 +226,7 @@
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator; import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator;
/** /**
@ -550,6 +552,8 @@ public void initialize(URI name, Configuration originalConf)
pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE, pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0); BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
"page size out of range: %s", pageSize);
listing = new Listing(listingOperationCallbacks, createStoreContext()); listing = new Listing(listingOperationCallbacks, createStoreContext());
} catch (AmazonClientException e) { } catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation // amazon client exception: stop all services then throw the translation
@ -2026,14 +2030,12 @@ public CopyResult copyFile(final String srcKey,
} }
@Override @Override
public DeleteObjectsResult removeKeys( public void removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete, final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final boolean deleteFakeDir, final boolean deleteFakeDir)
final boolean quiet)
throws MultiObjectDeleteException, AmazonClientException, IOException { throws MultiObjectDeleteException, AmazonClientException, IOException {
auditSpan.activate(); auditSpan.activate();
return S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir, S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir);
quiet);
} }
@Override @Override
@ -2818,10 +2820,6 @@ public void incrementPutProgressStatistics(String key, long bytes) {
* @param keysToDelete collection of keys to delete on the s3-backend. * @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store. * if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs * @param deleteFakeDir indicates whether this is for deleting fake dirs
* @param quiet should a bulk query be quiet, or should its result list
* all deleted keys?
* @return the deletion result if a multi object delete was invoked
* and it returned without a failure.
* @throws InvalidRequestException if the request was rejected due to * @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory. * a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not * @throws MultiObjectDeleteException one or more of the keys could not
@ -2831,10 +2829,9 @@ public void incrementPutProgressStatistics(String key, long bytes) {
* @throws AmazonClientException other amazon-layer failure. * @throws AmazonClientException other amazon-layer failure.
*/ */
@Retries.RetryRaw @Retries.RetryRaw
private DeleteObjectsResult removeKeysS3( private void removeKeysS3(
List<DeleteObjectsRequest.KeyVersion> keysToDelete, List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir, boolean deleteFakeDir)
boolean quiet)
throws MultiObjectDeleteException, AmazonClientException, throws MultiObjectDeleteException, AmazonClientException,
IOException { IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -2847,16 +2844,28 @@ private DeleteObjectsResult removeKeysS3(
} }
if (keysToDelete.isEmpty()) { if (keysToDelete.isEmpty()) {
// exit fast if there are no keys to delete // exit fast if there are no keys to delete
return null; return;
} }
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) { for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
blockRootDelete(keyVersion.getKey()); blockRootDelete(keyVersion.getKey());
} }
DeleteObjectsResult result = null;
try { try {
if (enableMultiObjectsDelete) { if (enableMultiObjectsDelete) {
result = deleteObjects( if (keysToDelete.size() <= pageSize) {
getRequestFactory().newBulkDeleteRequest(keysToDelete, quiet)); deleteObjects(getRequestFactory()
.newBulkDeleteRequest(keysToDelete));
} else {
// Multi object deletion of more than 1000 keys is not supported
// by s3. So we are paging the keys by page size.
LOG.debug("Partitioning the keys to delete as it is more than " +
"page size. Number of keys: {}, Page size: {}",
keysToDelete.size(), pageSize);
for (List<DeleteObjectsRequest.KeyVersion> batchOfKeysToDelete :
Lists.partition(keysToDelete, pageSize)) {
deleteObjects(getRequestFactory()
.newBulkDeleteRequest(batchOfKeysToDelete));
}
}
} else { } else {
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) { for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
deleteObject(keyVersion.getKey()); deleteObject(keyVersion.getKey());
@ -2872,7 +2881,6 @@ private DeleteObjectsResult removeKeysS3(
throw ex; throw ex;
} }
noteDeleted(keysToDelete.size(), deleteFakeDir); noteDeleted(keysToDelete.size(), deleteFakeDir);
return result;
} }
/** /**
@ -2889,7 +2897,7 @@ private void noteDeleted(final int count, final boolean deleteFakeDir) {
} }
/** /**
* Invoke {@link #removeKeysS3(List, boolean, boolean)}. * Invoke {@link #removeKeysS3(List, boolean)}.
* If a {@code MultiObjectDeleteException} is raised, the * If a {@code MultiObjectDeleteException} is raised, the
* relevant statistics are updated. * relevant statistics are updated.
* *
@ -2910,35 +2918,9 @@ public void removeKeys(
final boolean deleteFakeDir) final boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException, throws MultiObjectDeleteException, AmazonClientException,
IOException { IOException {
removeKeys(keysToDelete, deleteFakeDir,
true);
}
/**
* Invoke {@link #removeKeysS3(List, boolean, boolean)}.
* @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs.
* @param quiet should a bulk query be quiet, or should its result list
* all deleted keys
* @return the deletion result if a multi object delete was invoked
* and it returned without a failure, else null.
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not
* be deleted in a multiple object delete operation.
* @throws AmazonClientException amazon-layer failure.
* @throws IOException other IO Exception.
*/
@Retries.RetryRaw
private DeleteObjectsResult removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final boolean deleteFakeDir,
final boolean quiet)
throws MultiObjectDeleteException, AmazonClientException, IOException {
try (DurationInfo ignored = new DurationInfo(LOG, false, try (DurationInfo ignored = new DurationInfo(LOG, false,
"Deleting %d keys", keysToDelete.size())) { "Deleting %d keys", keysToDelete.size())) {
return removeKeysS3(keysToDelete, deleteFakeDir, quiet); removeKeysS3(keysToDelete, deleteFakeDir);
} }
} }

View File

@ -291,12 +291,9 @@ ListObjectsV2Request newListObjectsV2Request(String key,
/** /**
* Bulk delete request. * Bulk delete request.
* @param keysToDelete list of keys to delete. * @param keysToDelete list of keys to delete.
* @param quiet should a bulk query be quiet, or should its result list
* all deleted keys?
* @return the request * @return the request
*/ */
DeleteObjectsRequest newBulkDeleteRequest( DeleteObjectsRequest newBulkDeleteRequest(
List<DeleteObjectsRequest.KeyVersion> keysToDelete, List<DeleteObjectsRequest.KeyVersion> keysToDelete);
boolean quiet);
} }

View File

@ -25,7 +25,6 @@
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -365,8 +364,7 @@ private CompletableFuture<Void> submitDelete(
callableWithinAuditSpan( callableWithinAuditSpan(
getAuditSpan(), () -> { getAuditSpan(), () -> {
asyncDeleteAction( asyncDeleteAction(
keyList, keyList);
LOG.isDebugEnabled());
return null; return null;
})); }));
} }
@ -376,20 +374,16 @@ private CompletableFuture<Void> submitDelete(
* the keys from S3 and paths from S3Guard. * the keys from S3 and paths from S3Guard.
* *
* @param keyList keys to delete. * @param keyList keys to delete.
* @param auditDeletedKeys should the results be audited and undeleted
* entries logged? * entries logged?
* @throws IOException failure * @throws IOException failure
*/ */
@Retries.RetryTranslated @Retries.RetryTranslated
private void asyncDeleteAction( private void asyncDeleteAction(
final List<DeleteEntry> keyList, final List<DeleteEntry> keyList)
final boolean auditDeletedKeys)
throws IOException { throws IOException {
List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
try (DurationInfo ignored = try (DurationInfo ignored =
new DurationInfo(LOG, false, new DurationInfo(LOG, false,
"Delete page of %d keys", keyList.size())) { "Delete page of %d keys", keyList.size())) {
DeleteObjectsResult result;
if (!keyList.isEmpty()) { if (!keyList.isEmpty()) {
// first delete the files. // first delete the files.
List<DeleteObjectsRequest.KeyVersion> files = keyList.stream() List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
@ -397,15 +391,12 @@ private void asyncDeleteAction(
.map(e -> e.keyVersion) .map(e -> e.keyVersion)
.collect(Collectors.toList()); .collect(Collectors.toList());
LOG.debug("Deleting of {} file objects", files.size()); LOG.debug("Deleting of {} file objects", files.size());
result = Invoker.once("Remove S3 Files", Invoker.once("Remove S3 Files",
status.getPath().toString(), status.getPath().toString(),
() -> callbacks.removeKeys( () -> callbacks.removeKeys(
files, files,
false, false
!auditDeletedKeys)); ));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
// now the dirs // now the dirs
List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream() List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
.filter(e -> e.isDirMarker) .filter(e -> e.isDirMarker)
@ -413,32 +404,12 @@ private void asyncDeleteAction(
.collect(Collectors.toList()); .collect(Collectors.toList());
LOG.debug("Deleting of {} directory markers", dirs.size()); LOG.debug("Deleting of {} directory markers", dirs.size());
// This is invoked with deleteFakeDir. // This is invoked with deleteFakeDir.
result = Invoker.once("Remove S3 Dir Markers", Invoker.once("Remove S3 Dir Markers",
status.getPath().toString(), status.getPath().toString(),
() -> callbacks.removeKeys( () -> callbacks.removeKeys(
dirs, dirs,
true, true
!auditDeletedKeys)); ));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
}
if (auditDeletedKeys) {
// audit the deleted keys
if (deletedObjects.size() != keyList.size()) {
// size mismatch
LOG.warn("Size mismatch in deletion operation. "
+ "Expected count of deleted files: {}; "
+ "actual: {}",
keyList.size(), deletedObjects.size());
// strip out the deleted keys
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
}
for (DeleteEntry kv : keyList) {
LOG.debug("{}", kv.getKey());
}
}
} }
} }
} }

View File

@ -24,7 +24,6 @@
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.transfer.model.CopyResult; import com.amazonaws.services.s3.transfer.model.CopyResult;
@ -138,10 +137,6 @@ CopyResult copyFile(String srcKey,
* @param keysToDelete collection of keys to delete on the s3-backend. * @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store. * if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs. * @param deleteFakeDir indicates whether this is for deleting fake dirs.
* @param quiet should a bulk query be quiet, or should its result list
* all deleted keys
* @return the deletion result if a multi object delete was invoked
* and it returned without a failure, else null.
* @throws InvalidRequestException if the request was rejected due to * @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory. * a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not * @throws MultiObjectDeleteException one or more of the keys could not
@ -150,10 +145,9 @@ CopyResult copyFile(String srcKey,
* @throws IOException other IO Exception. * @throws IOException other IO Exception.
*/ */
@Retries.RetryRaw @Retries.RetryRaw
DeleteObjectsResult removeKeys( void removeKeys(
List<DeleteObjectsRequest.KeyVersion> keysToDelete, List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir, boolean deleteFakeDir)
boolean quiet)
throws MultiObjectDeleteException, AmazonClientException, throws MultiObjectDeleteException, AmazonClientException,
IOException; IOException;

View File

@ -49,6 +49,7 @@
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.RENAME_PARALLEL_LIMIT; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.RENAME_PARALLEL_LIMIT;
import static org.apache.hadoop.util.Preconditions.checkArgument;
/** /**
* A parallelized rename operation. * A parallelized rename operation.
@ -155,6 +156,9 @@ public RenameOperation(
this.destKey = destKey; this.destKey = destKey;
this.destStatus = destStatus; this.destStatus = destStatus;
this.callbacks = callbacks; this.callbacks = callbacks;
checkArgument(pageSize > 0
&& pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
"page size out of range: %s", pageSize);
this.pageSize = pageSize; this.pageSize = pageSize;
} }
@ -586,8 +590,8 @@ private void removeSourceObjects(
sourcePath.toString(), () -> sourcePath.toString(), () ->
callbacks.removeKeys( callbacks.removeKeys(
keys, keys,
false, false
true)); ));
} }
/** /**

View File

@ -575,12 +575,11 @@ public DeleteObjectRequest newDeleteObjectRequest(String key) {
@Override @Override
public DeleteObjectsRequest newBulkDeleteRequest( public DeleteObjectsRequest newBulkDeleteRequest(
List<DeleteObjectsRequest.KeyVersion> keysToDelete, List<DeleteObjectsRequest.KeyVersion> keysToDelete) {
boolean quiet) {
return prepareRequest( return prepareRequest(
new DeleteObjectsRequest(bucket) new DeleteObjectsRequest(bucket)
.withKeys(keysToDelete) .withKeys(keysToDelete)
.withQuiet(quiet)); .withQuiet(true));
} }
@Override @Override

View File

@ -817,7 +817,7 @@ pages, suffix(pages),
end); end);
once("Remove S3 Keys", once("Remove S3 Keys",
tracker.getBasePath().toString(), () -> tracker.getBasePath().toString(), () ->
operations.removeKeys(page, true, false)); operations.removeKeys(page, true));
summary.deleteRequests++; summary.deleteRequests++;
// and move to the start of the next page // and move to the start of the next page
start = end; start = end;

View File

@ -23,7 +23,6 @@
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.InvalidRequestException;
@ -58,10 +57,7 @@ RemoteIterator<S3AFileStatus> listObjects(
* @param keysToDelete collection of keys to delete on the s3-backend. * @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store. * if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs. * @param deleteFakeDir indicates whether this is for deleting fake dirs.
* @param quiet should a bulk query be quiet, or should its result list
* all deleted keys * all deleted keys
* @return the deletion result if a multi object delete was invoked
* and it returned without a failure, else null.
* @throws InvalidRequestException if the request was rejected due to * @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory. * a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not * @throws MultiObjectDeleteException one or more of the keys could not
@ -70,10 +66,9 @@ RemoteIterator<S3AFileStatus> listObjects(
* @throws IOException other IO Exception. * @throws IOException other IO Exception.
*/ */
@Retries.RetryMixed @Retries.RetryMixed
DeleteObjectsResult removeKeys( void removeKeys(
List<DeleteObjectsRequest.KeyVersion> keysToDelete, List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir, boolean deleteFakeDir)
boolean quiet)
throws MultiObjectDeleteException, AmazonClientException, throws MultiObjectDeleteException, AmazonClientException,
IOException; IOException;

View File

@ -23,7 +23,6 @@
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -55,13 +54,12 @@ public RemoteIterator<S3AFileStatus> listObjects(final Path path,
} }
@Override @Override
public DeleteObjectsResult removeKeys( public void removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete, final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final boolean deleteFakeDir, final boolean deleteFakeDir)
final boolean quiet)
throws MultiObjectDeleteException, AmazonClientException, IOException { throws MultiObjectDeleteException, AmazonClientException, IOException {
return operationCallbacks.removeKeys(keysToDelete, deleteFakeDir, operationCallbacks.removeKeys(keysToDelete, deleteFakeDir
quiet); );
} }
} }

View File

@ -20,10 +20,13 @@
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import org.assertj.core.api.Assertions;
import org.junit.Assume; import org.junit.Assume;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.StoreStatisticNames; import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.store.audit.AuditSpan; import org.apache.hadoop.fs.store.audit.AuditSpan;
@ -37,9 +40,11 @@
import java.nio.file.AccessDeniedException; import java.nio.file.AccessDeniedException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.failIf; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.failIf;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.*;
import static org.apache.hadoop.test.LambdaTestUtils.*; import static org.apache.hadoop.test.LambdaTestUtils.*;
import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;
/** /**
* ITest for failure handling, primarily multipart deletion. * ITest for failure handling, primarily multipart deletion.
@ -72,6 +77,37 @@ public void testMultiObjectDeleteNoFile() throws Throwable {
removeKeys(getFileSystem(), "ITestS3AFailureHandling/missingFile"); removeKeys(getFileSystem(), "ITestS3AFailureHandling/missingFile");
} }
/**
* See HADOOP-18112.
*/
@Test
public void testMultiObjectDeleteLargeNumKeys() throws Exception {
S3AFileSystem fs = getFileSystem();
Path path = path("largeDir");
mkdirs(path);
createFiles(fs, path, 1, 1005, 0);
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator =
fs.listFiles(path, false);
List<String> keys = toList(mappingRemoteIterator(locatedFileStatusRemoteIterator,
locatedFileStatus -> fs.pathToKey(locatedFileStatus.getPath())));
// After implementation of paging during multi object deletion,
// no exception is encountered.
Long bulkDeleteReqBefore = getNumberOfBulkDeleteRequestsMadeTillNow(fs);
try (AuditSpan span = span()) {
fs.removeKeys(buildDeleteRequest(keys.toArray(new String[0])), false);
}
Long bulkDeleteReqAfter = getNumberOfBulkDeleteRequestsMadeTillNow(fs);
// number of delete requests is 5 as we have default page size of 250.
Assertions.assertThat(bulkDeleteReqAfter - bulkDeleteReqBefore)
.describedAs("Number of batched bulk delete requests")
.isEqualTo(5);
}
private Long getNumberOfBulkDeleteRequestsMadeTillNow(S3AFileSystem fs) {
return fs.getIOStatistics().counters()
.get(StoreStatisticNames.OBJECT_BULK_DELETE_REQUEST);
}
private void removeKeys(S3AFileSystem fileSystem, String... keys) private void removeKeys(S3AFileSystem fileSystem, String... keys)
throws IOException { throws IOException {
try (AuditSpan span = span()) { try (AuditSpan span = span()) {

View File

@ -52,7 +52,11 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service; import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.functional.CallableRaisingIOE; import org.apache.hadoop.util.functional.CallableRaisingIOE;
@ -70,6 +74,7 @@
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.text.DateFormat; import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
@ -78,6 +83,10 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.test.GenericTestUtils.buildPaths;
import static org.apache.hadoop.util.Preconditions.checkNotNull; import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty;
@ -95,8 +104,23 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public final class S3ATestUtils { public final class S3ATestUtils {
private static final Logger LOG = LoggerFactory.getLogger( private static final Logger LOG = LoggerFactory.getLogger(
S3ATestUtils.class); S3ATestUtils.class);
/** Many threads for scale performance: {@value}. */
public static final int EXECUTOR_THREAD_COUNT = 64;
/**
* For submitting work.
*/
private static final ListeningExecutorService EXECUTOR =
MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance(
EXECUTOR_THREAD_COUNT,
EXECUTOR_THREAD_COUNT * 2,
30, TimeUnit.SECONDS,
"test-operations"));
/** /**
* Value to set a system property to (in maven) to declare that * Value to set a system property to (in maven) to declare that
@ -821,6 +845,87 @@ public static StoreContext createMockStoreContext(
.build(); .build();
} }
/**
* Write the text to a file asynchronously. Logs the operation duration.
* @param fs filesystem
* @param path path
* @return future to the patch created.
*/
private static CompletableFuture<Path> put(FileSystem fs,
Path path, String text) {
return submit(EXECUTOR, () -> {
try (DurationInfo ignore =
new DurationInfo(LOG, false, "Creating %s", path)) {
createFile(fs, path, true, text.getBytes(Charsets.UTF_8));
return path;
}
});
}
/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @return the list of files created.
*/
public static List<Path> createFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount) throws IOException {
return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
new ArrayList<>(fileCount),
new ArrayList<>(dirCount));
}
/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @param paths [out] list of file paths created
* @param dirs [out] list of directory paths created.
* @return the list of files created.
*/
public static List<Path> createDirsAndFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount,
final List<Path> paths,
final List<Path> dirs) throws IOException {
buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
+ dirs.size());
// create directories. With dir marker retention, that adds more entries
// to cause deletion issues
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
for (Path path : dirs) {
futures.add(submit(EXECUTOR, () ->{
fs.mkdirs(path);
return path;
}));
}
waitForCompletion(futures);
}
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d files", paths.size())) {
for (Path path : paths) {
futures.add(put(fs, path, path.getName()));
}
waitForCompletion(futures);
return paths;
}
}
/** /**
* Helper class to do diffs of metrics. * Helper class to do diffs of metrics.
*/ */

View File

@ -26,14 +26,9 @@
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -42,13 +37,11 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
@ -69,13 +62,10 @@
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.forbidden; import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.forbidden;
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.newAssumedRoleConfig; import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.newAssumedRoleConfig;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DELEGATION_TOKEN_BINDING; import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DELEGATION_TOKEN_BINDING;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.test.GenericTestUtils.buildPaths;
import static org.apache.hadoop.test.LambdaTestUtils.eval; import static org.apache.hadoop.test.LambdaTestUtils.eval;
/** /**
@ -112,20 +102,6 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
private static final Statement STATEMENT_ALL_BUCKET_READ_ACCESS private static final Statement STATEMENT_ALL_BUCKET_READ_ACCESS
= statement(true, S3_ALL_BUCKETS, S3_BUCKET_READ_OPERATIONS); = statement(true, S3_ALL_BUCKETS, S3_BUCKET_READ_OPERATIONS);
/** Many threads for scale performance: {@value}. */
public static final int EXECUTOR_THREAD_COUNT = 64;
/**
* For submitting work.
*/
private static final ListeningExecutorService EXECUTOR =
MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance(
EXECUTOR_THREAD_COUNT,
EXECUTOR_THREAD_COUNT * 2,
30, TimeUnit.SECONDS,
"test-operations"));
/** /**
* The number of files in a non-scaled test. * The number of files in a non-scaled test.
@ -742,87 +718,6 @@ private Set<Path> listFilesUnderPath(Path path, boolean recursive)
return files; return files;
} }
/**
* Write the text to a file asynchronously. Logs the operation duration.
* @param fs filesystem
* @param path path
* @return future to the patch created.
*/
private static CompletableFuture<Path> put(FileSystem fs,
Path path, String text) {
return submit(EXECUTOR, () -> {
try (DurationInfo ignore =
new DurationInfo(LOG, false, "Creating %s", path)) {
createFile(fs, path, true, text.getBytes(Charsets.UTF_8));
return path;
}
});
}
/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @return the list of files created.
*/
public static List<Path> createFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount) throws IOException {
return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
new ArrayList<Path>(fileCount),
new ArrayList<Path>(dirCount));
}
/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @param paths [out] list of file paths created
* @param dirs [out] list of directory paths created.
* @return the list of files created.
*/
public static List<Path> createDirsAndFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount,
final List<Path> paths,
final List<Path> dirs) throws IOException {
buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
+ dirs.size());
// create directories. With dir marker retention, that adds more entries
// to cause deletion issues
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
for (Path path : dirs) {
futures.add(submit(EXECUTOR, () ->{
fs.mkdirs(path);
return path;
}));
}
waitForCompletion(futures);
}
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d files", paths.size())) {
for (Path path : paths) {
futures.add(put(fs, path, path.getName()));
}
waitForCompletion(futures);
return paths;
}
}
/** /**
* Verifies that s3:DeleteObjectVersion is not required for rename. * Verifies that s3:DeleteObjectVersion is not required for rename.
* <p></p> * <p></p>

View File

@ -164,7 +164,7 @@ private void createFactoryObjects(RequestFactory factory) {
new ArrayList<>())); new ArrayList<>()));
a(factory.newCopyObjectRequest(path, path2, md)); a(factory.newCopyObjectRequest(path, path2, md));
a(factory.newDeleteObjectRequest(path)); a(factory.newDeleteObjectRequest(path));
a(factory.newBulkDeleteRequest(new ArrayList<>(), true)); a(factory.newBulkDeleteRequest(new ArrayList<>()));
a(factory.newDirectoryMarkerRequest(path)); a(factory.newDirectoryMarkerRequest(path));
a(factory.newGetObjectRequest(path)); a(factory.newGetObjectRequest(path));
a(factory.newGetObjectMetadataRequest(path)); a(factory.newGetObjectMetadataRequest(path));

View File

@ -38,7 +38,7 @@
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING; import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX; import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.createFiles; import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles;
import static org.apache.hadoop.test.GenericTestUtils.filenameOfIndex; import static org.apache.hadoop.test.GenericTestUtils.filenameOfIndex;
/** /**

View File

@ -23,7 +23,6 @@
import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.transfer.model.CopyResult; import com.amazonaws.services.s3.transfer.model.CopyResult;
@ -99,13 +98,11 @@ public CopyResult copyFile(
} }
@Override @Override
public DeleteObjectsResult removeKeys( public void removeKeys(
List<DeleteObjectsRequest.KeyVersion> keysToDelete, List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir, boolean deleteFakeDir)
boolean quiet)
throws MultiObjectDeleteException, AmazonClientException, throws MultiObjectDeleteException, AmazonClientException,
IOException { IOException {
return null;
} }
@Override @Override