diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java index b898fde43c..9ec07cbe96 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonPathCapabilities.java @@ -170,4 +170,15 @@ private CommonPathCapabilities() { */ public static final String LEASE_RECOVERABLE = "fs.capability.lease.recoverable"; + /** + * Is this a store where parent directory listings are potentially inconsistent with + * direct list/getFileStatus calls? + * This can happen with Amazon S3 Express One Zone Storage when there are pending + * uploads under a path. + * Application code can use this flag to decide whether or not to treat + * FileNotFoundExceptions on treewalk as errors or something to downgrade. + * Value: {@value}. + */ + public static final String DIRECTORY_LISTING_INCONSISTENT = + "fs.capability.directory.listing.inconsistent"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java index 28799349a8..fa87bb48aa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileUtil.java @@ -77,6 +77,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; @@ -2079,4 +2080,32 @@ public static void rename(FileSystem srcFs, Path src, Path dst, final Options.Rename... options) throws IOException { srcFs.rename(src, dst, options); } + + /** + * Method to call after a FNFE has been raised on a treewalk, so as to + * decide whether to throw the exception (default), or, if the FS + * supports inconsistent directory listings, to log and ignore it. + * If this returns then the caller should ignore the failure and continue. + * @param fs filesystem + * @param path path + * @param e exception caught + * @throws FileNotFoundException the exception passed in, if rethrown. + */ + public static void maybeIgnoreMissingDirectory(FileSystem fs, + Path path, + FileNotFoundException e) throws FileNotFoundException { + final boolean b; + try { + b = !fs.hasPathCapability(path, DIRECTORY_LISTING_INCONSISTENT); + } catch (IOException ex) { + // something went wrong; rethrow the existing exception + e.addSuppressed(ex); + throw e; + } + if (b) { + throw e; + } + LOG.info("Ignoring missing directory {}", path); + LOG.debug("Directory missing", e); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java index 7858238ee7..c4ce115def 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java @@ -38,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.FileUtil.maybeIgnoreMissingDirectory; import static org.apache.hadoop.util.functional.RemoteIterators.cleanupRemoteIterator; /** @@ -448,12 +449,16 @@ protected void postProcessPath(PathData item) throws IOException { protected void recursePath(PathData item) throws IOException { try { depth++; - if (isSorted()) { - // use the non-iterative method for listing because explicit sorting is - // required. Iterators not guaranteed to return sorted elements - processPaths(item, item.getDirectoryContents()); - } else { - processPaths(item, item.getDirectoryContentsIterator()); + try { + if (isSorted()) { + // use the non-iterative method for listing because explicit sorting is + // required. Iterators not guaranteed to return sorted elements + processPaths(item, item.getDirectoryContents()); + } else { + processPaths(item, item.getDirectoryContentsIterator()); + } + } catch (FileNotFoundException e) { + maybeIgnoreMissingDirectory(item.fs, item.path, e); } } finally { depth--; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextURIBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextURIBase.java index abb6d4f901..fad1e3774a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextURIBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextURIBase.java @@ -26,6 +26,7 @@ import org.junit.Assert; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Shell; import org.junit.After; import org.junit.Before; @@ -129,7 +130,7 @@ public void testCreateFileWithNullName() throws IOException { } @Test - public void testCreateExistingFile() throws IOException { + public void testCreateExistingFile() throws Exception { String fileName = "testCreateExistingFile"; Path testPath = qualifiedPath(fileName, fc2); @@ -140,15 +141,11 @@ public void testCreateExistingFile() throws IOException { createFile(fc1, testPath); // Create same file with fc1 - try { - createFile(fc2, testPath); - Assert.fail("Create existing file should throw an IOException."); - } catch (IOException e) { - // expected - } + LambdaTestUtils.intercept(IOException.class, () -> + createFile(fc2, testPath)); // Ensure fc2 has the created file - Assert.assertTrue(exists(fc2, testPath)); + fc2.getFileStatus(testPath); } @Test @@ -167,7 +164,7 @@ public void testCreateFileInNonExistingDirectory() throws IOException { Assert.assertTrue(isDir(fc2, testPath.getParent())); Assert.assertEquals("testCreateFileInNonExistingDirectory", testPath.getParent().getName()); - Assert.assertTrue(exists(fc2, testPath)); + fc2.getFileStatus(testPath); } @@ -216,10 +213,11 @@ public void testCreateDirectory() throws IOException { // TestCase - Create multiple directories String dirNames[] = { "createTest/testDir", "createTest/test Dir", - "deleteTest/test*Dir", "deleteTest/test#Dir", - "deleteTest/test1234", "deleteTest/test_DIr", - "deleteTest/1234Test", "deleteTest/test)Dir", - "deleteTest/()&^%$#@!~_+}{>() { - @Override - public Void call() throws Exception { - FileStatus[] deleted = deleteChildren(fs, root, true); - FileStatus[] children = listChildren(fs, root); - if (children.length > 0) { - fail(String.format( - "After %d attempts: listing after rm /* not empty" - + "\n%s\n%s\n%s", - iterations.incrementAndGet(), - dumpStats("final", children), + () -> { + iterations.incrementAndGet(); + FileStatus[] deleted = deleteChildren(fs, root, true); + FileStatus[] children = listChildren(fs, root); + Assertions.assertThat(children) + .describedAs("After %d attempts: listing after rm /* not empty" + + "\ndeleted: %s\n: original %s", + iterations.get(), dumpStats("deleted", deleted), - dumpStats("original", originalChildren))); - } - return null; - } + dumpStats("original", originalChildren)) + .isEmpty(); + return null; }, new LambdaTestUtils.ProportionalRetryInterval(50, 1000)); // then try to delete the empty one diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index bbccbfbc16..70a5e2de53 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathCapabilities; @@ -34,6 +35,7 @@ import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.util.functional.FutureIO; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.AssumptionViolatedException; import org.slf4j.Logger; @@ -62,6 +64,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; +import static org.apache.hadoop.util.functional.RemoteIterators.foreach; /** * Utilities used across test cases. @@ -1507,19 +1510,39 @@ public static FileStatus getFileStatusEventually(FileSystem fs, Path path, */ public static TreeScanResults treeWalk(FileSystem fs, Path path) throws IOException { - TreeScanResults dirsAndFiles = new TreeScanResults(); + return treeWalk(fs, fs.getFileStatus(path), new TreeScanResults()); + } - FileStatus[] statuses = fs.listStatus(path); - for (FileStatus status : statuses) { - LOG.info("{}{}", status.getPath(), status.isDirectory() ? "*" : ""); + /** + * Recursively list all entries, with a depth first traversal of the + * directory tree. + * @param dir status of the dir to scan + * @return the scan results + * @throws IOException IO problems + * @throws FileNotFoundException if the dir is not found and this FS does not + * have the listing inconsistency path capability/flag. + */ + private static TreeScanResults treeWalk(FileSystem fs, + FileStatus dir, + TreeScanResults results) throws IOException { + + Path path = dir.getPath(); + + try { + foreach(fs.listStatusIterator(path), (status) -> { + LOG.info("{}{}", status.getPath(), status.isDirectory() ? "*" : ""); + if (status.isDirectory()) { + treeWalk(fs, status, results); + } else { + results.add(status); + } + }); + // and ourselves + results.add(dir); + } catch (FileNotFoundException fnfe) { + FileUtil.maybeIgnoreMissingDirectory(fs, path, fnfe); } - for (FileStatus status : statuses) { - dirsAndFiles.add(status); - if (status.isDirectory()) { - dirsAndFiles.add(treeWalk(fs, status.getPath())); - } - } - return dirsAndFiles; + return results; } /** @@ -1917,17 +1940,16 @@ public void assertEquivalent(TreeScanResults that) { public void assertFieldsEquivalent(String fieldname, TreeScanResults that, List ours, List theirs) { - String ourList = pathsToString(ours); - String theirList = pathsToString(theirs); - assertFalse("Duplicate " + fieldname + " in " + this - +": " + ourList, - containsDuplicates(ours)); - assertFalse("Duplicate " + fieldname + " in other " + that - + ": " + theirList, - containsDuplicates(theirs)); - assertTrue(fieldname + " mismatch: between " + ourList - + " and " + theirList, - collectionsEquivalent(ours, theirs)); + Assertions.assertThat(ours). + describedAs("list of %s", fieldname) + .doesNotHaveDuplicates(); + Assertions.assertThat(theirs). + describedAs("list of %s in %s", fieldname, that) + .doesNotHaveDuplicates(); + Assertions.assertThat(ours) + .describedAs("Elements of %s", fieldname) + .containsExactlyInAnyOrderElementsOf(theirs); + } public List getFiles() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java index 07b3c76f03..1464fbaeba 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapred; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.LinkedList; import java.util.List; @@ -56,6 +57,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors; +import static org.apache.hadoop.fs.FileUtil.maybeIgnoreMissingDirectory; import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; @@ -304,24 +306,28 @@ public Result call() throws Exception { Result result = new Result(); result.fs = fs; LOG.debug("ProcessInputDirCallable {}", fileStatus); - if (fileStatus.isDirectory()) { - RemoteIterator iter = fs - .listLocatedStatus(fileStatus.getPath()); - while (iter.hasNext()) { - LocatedFileStatus stat = iter.next(); - if (inputFilter.accept(stat.getPath())) { - if (recursive && stat.isDirectory()) { - result.dirsNeedingRecursiveCalls.add(stat); - } else { - result.locatedFileStatuses.add(org.apache.hadoop.mapreduce.lib. - input.FileInputFormat.shrinkStatus(stat)); + try { + if (fileStatus.isDirectory()) { + RemoteIterator iter = fs + .listLocatedStatus(fileStatus.getPath()); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (recursive && stat.isDirectory()) { + result.dirsNeedingRecursiveCalls.add(stat); + } else { + result.locatedFileStatuses.add(org.apache.hadoop.mapreduce.lib. + input.FileInputFormat.shrinkStatus(stat)); + } } } + // aggregate any stats + result.stats = retrieveIOStatistics(iter); + } else { + result.locatedFileStatuses.add(fileStatus); } - // aggregate any stats - result.stats = retrieveIOStatistics(iter); - } else { - result.locatedFileStatuses.add(fileStatus); + } catch (FileNotFoundException e) { + maybeIgnoreMissingDirectory(fs, fileStatus.getPath(), e); } return result; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index 5b0f88f868..e74c3fa813 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.lib.input; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -48,6 +49,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.FileUtil.maybeIgnoreMissingDirectory; + /** * A base class for file-based {@link InputFormat}s. * @@ -356,16 +359,26 @@ private List singleThreadedListStatus(JobContext job, Path[] dirs, protected void addInputPathRecursively(List result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException { - RemoteIterator iter = fs.listLocatedStatus(path); - while (iter.hasNext()) { - LocatedFileStatus stat = iter.next(); - if (inputFilter.accept(stat.getPath())) { - if (stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(shrinkStatus(stat)); + // FNFE exceptions are caught whether raised in the list call, + // or in the hasNext() or next() calls, where async reporting + // may take place. + try { + RemoteIterator iter = fs.listLocatedStatus(path); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), inputFilter); + } else { + result.add(shrinkStatus(stat)); + } } } + } catch (FileNotFoundException e) { + // unless the store is capabile of list inconsistencies, rethrow. + // because this is recursive, the caller method may also end up catching + // and rethrowing, which is slighly inefficient but harmless. + maybeIgnoreMissingDirectory(fs, path, e); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index ba2321d3e9..13fa9eb23b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1462,9 +1462,18 @@ private Constants() { /** * Stream supports multipart uploads to the given path. */ - public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED = + public static final String STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED = "fs.s3a.capability.multipart.uploads.enabled"; + /** + * Stream supports multipart uploads to the given path. + * This name is wrong, but it has shipped so must be + * retained. + */ + @Deprecated + public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED + = STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED; + /** * Prefetch max blocks count config. * Value = {@value} @@ -1509,4 +1518,11 @@ private Constants() { * Value: {@value}. */ public static final boolean OPTIMIZED_COPY_FROM_LOCAL_DEFAULT = true; + + /** + * Is this a v2 SDK build? value {@value}. + */ + public static final String STORE_CAPABILITY_AWS_V2 = + "fs.s3a.capability.aws.v2"; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index 490deaaab0..e0868a2e13 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -259,7 +259,7 @@ public RemoteIterator getLocatedFileStatusIteratorForDir( } S3ListRequest request = createListObjectsRequest(key, "/", span); - LOG.debug("listStatus: doing listObjects for directory {}", key); + LOG.debug("listStatus: doing listObjects for directory \"{}\"", key); // return the results obtained from s3. return createFileStatusListingIterator( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 96c4f6268b..a4a09ad956 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -220,6 +220,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; +import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.Invoker.*; @@ -250,6 +251,8 @@ import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup; +import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE; +import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.isS3ExpressStore; import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.checkNoS3Guard; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_CONTINUE_LIST_REQUEST; @@ -289,11 +292,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024; private URI uri; + private Path workingDir; + private String username; + private S3Client s3Client; + /** Async client is used for transfer manager and s3 select. */ private S3AsyncClient s3AsyncClient; + // initial callback policy is fail-once; it's there just to assist // some mock tests and other codepaths trying to call the low level // APIs on an uninitialized filesystem. @@ -470,6 +478,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private boolean optimizedCopyFromLocal; + /** + * Is this an S3 Express store? + */ + private boolean s3ExpressStore; + + /** + * Store endpoint from configuration info or access point ARN. + */ + private String endpoint; + + /** + * Region from configuration info or access point ARN. + */ + private String configuredRegion; + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -581,9 +604,23 @@ public void initialize(URI name, Configuration originalConf) //check but do not store the block size longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); - // should the delete also purge uploads. + + // determine and cache the endpoints + endpoint = accessPoint == null + ? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT) + : accessPoint.getEndpoint(); + + configuredRegion = accessPoint == null + ? conf.getTrimmed(AWS_REGION) + : accessPoint.getRegion(); + + // is this an S3Express store? + s3ExpressStore = isS3ExpressStore(bucket, endpoint); + + // should the delete also purge uploads? + // happens if explicitly enabled, or if the store is S3Express storage. dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, - DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT); + s3ExpressStore); this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT); long prefetchBlockSizeLong = @@ -994,14 +1031,6 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException { S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, S3ClientFactory.class); - String endpoint = accessPoint == null - ? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT) - : accessPoint.getEndpoint(); - - String configuredRegion = accessPoint == null - ? conf.getTrimmed(AWS_REGION) - : accessPoint.getRegion(); - S3ClientFactory.S3ClientCreationParameters parameters = new S3ClientFactory.S3ClientCreationParameters() .withCredentialSet(credentials) @@ -1444,7 +1473,19 @@ public AWSCredentialProviderList shareCredentials(final String purpose) { public boolean isMultipartCopyEnabled() { return S3AFileSystem.this.isMultipartUploadEnabled; } - } + + @Override + public long abortMultipartUploads(final Path path) throws IOException { + final String prefix = pathToKey(path); + try (AuditSpan span = createSpan("object_multipart_bulk_abort", prefix, null)) { + return S3AFileSystem.this.abortMultipartUploadsUnderPrefix( + createStoreContext(), + span, + prefix); + } + } + + } // end S3AInternals /** * Get the input policy for this FS instance. @@ -2486,19 +2527,34 @@ public RemoteIterator listObjects( @Retries.RetryTranslated public long abortMultipartUploadsUnderPrefix(String prefix) throws IOException { - getAuditSpan().activate(); - // this deactivates the audit span somehow - final RemoteIterator uploads = - S3AFileSystem.this.listUploadsUnderPrefix(storeContext, prefix); - // so reactivate it. - getAuditSpan().activate(); - return foreach(uploads, upload -> - invoker.retry("Aborting multipart commit", upload.key(), true, () -> - S3AFileSystem.this.abortMultipartUpload(upload))); + return S3AFileSystem.this.abortMultipartUploadsUnderPrefix(storeContext, auditSpan, prefix); } } // end OperationCallbacksImpl + /** + * Abort multipart uploads under a prefix. + * @param storeContext store context + * @param span span for the operations + * @param prefix prefix for uploads to abort + * @return a count of aborts + * @throws IOException trouble; FileNotFoundExceptions are swallowed. + */ + private long abortMultipartUploadsUnderPrefix(StoreContext storeContext, + AuditSpan span, + String prefix) throws IOException { + + span.activate(); + // this deactivates the audit span somehow + final RemoteIterator uploads = + listUploadsUnderPrefix(storeContext, prefix); + // so reactivate it. + span.activate(); + return foreach(uploads, upload -> + invoker.retry("Aborting multipart commit", upload.key(), true, () -> + abortMultipartUpload(upload))); + } + /** * Callbacks from {@link Listing}. * Auditing: the listing object is long-lived; the audit span @@ -3330,7 +3386,7 @@ private void removeKeysS3( LOG.debug("Initiating delete operation for {} objects", keysToDelete.size()); for (ObjectIdentifier objectIdentifier : keysToDelete) { - LOG.debug(" {} {}", objectIdentifier.key(), + LOG.debug(" \"{}\" {}", objectIdentifier.key(), objectIdentifier.versionId() != null ? objectIdentifier.versionId() : ""); } } @@ -5403,10 +5459,24 @@ public boolean hasPathCapability(final Path path, final String capability) case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE: return true; + // multi object delete flag + case ENABLE_MULTI_DELETE: + return enableMultiObjectsDelete; + // Do directory operations purge uploads. case DIRECTORY_OPERATIONS_PURGE_UPLOADS: return dirOperationsPurgeUploads; + // this is a v2 sdk release. + case STORE_CAPABILITY_AWS_V2: + return true; + + // is this store S3 Express? + // if so, note that directory listings may be inconsistent + case STORE_CAPABILITY_S3_EXPRESS_STORAGE: + case DIRECTORY_LISTING_INCONSISTENT: + return s3ExpressStore; + // etags are avaialable in listings, but they // are not consistent across renames. // therefore, only availability is declared diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java index 18d6c1af58..b411606856 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInternals.java @@ -121,4 +121,14 @@ public interface S3AInternals { * @return true if the transfer manager is used to copy files. */ boolean isMultipartCopyEnabled(); + + /** + * Abort multipart uploads under a path. + * @param path path to abort uploads under. + * @return a count of aborts + * @throws IOException trouble; FileNotFoundExceptions are swallowed. + */ + @AuditEntryPoint + @Retries.RetryTranslated + long abortMultipartUploads(Path path) throws IOException; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 0fb352154c..96cbc9bdbf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -272,6 +272,11 @@ public static IOException translateException(@Nullable String operation, } break; + // Caused by duplicate create bucket call. + case SC_409_CONFLICT: + ioe = new AWSBadRequestException(message, ase); + break; + // this also surfaces sometimes and is considered to // be ~ a not found exception. case SC_410_GONE: @@ -282,12 +287,18 @@ public static IOException translateException(@Nullable String operation, // errors which stores can return from requests which // the store does not support. case SC_405_METHOD_NOT_ALLOWED: - case SC_412_PRECONDITION_FAILED: case SC_415_UNSUPPORTED_MEDIA_TYPE: case SC_501_NOT_IMPLEMENTED: ioe = new AWSUnsupportedFeatureException(message, s3Exception); break; + // precondition failure: the object is there, but the precondition + // (e.g. etag) didn't match. Assume remote file change during + // rename or status passed in to openfile had an etag which didn't match. + case SC_412_PRECONDITION_FAILED: + ioe = new RemoteFileChangedException(path, message, "", ase); + break; + // out of range. This may happen if an object is overwritten with // a shorter one while it is being read. case SC_416_RANGE_NOT_SATISFIABLE: @@ -865,7 +876,7 @@ static String lookupPassword(Configuration conf, String key, String defVal) */ public static String stringify(S3Object s3Object) { StringBuilder builder = new StringBuilder(s3Object.key().length() + 100); - builder.append(s3Object.key()).append(' '); + builder.append("\"").append(s3Object.key()).append("\" "); builder.append("size=").append(s3Object.size()); return builder.toString(); } @@ -1648,4 +1659,5 @@ public String toString() { public static String formatRange(long rangeStart, long rangeEnd) { return String.format("bytes=%d-%d", rangeStart, rangeEnd); } + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java index 8a24a4e14d..3cb8d97532 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java @@ -24,6 +24,7 @@ import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; +import software.amazon.awssdk.services.s3.model.CreateSessionRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest; @@ -196,7 +197,8 @@ private RequestInfo writing(final String verb, isRequestNotAlwaysInSpan(final Object request) { return request instanceof UploadPartCopyRequest || request instanceof CompleteMultipartUploadRequest - || request instanceof GetBucketLocationRequest; + || request instanceof GetBucketLocationRequest + || request instanceof CreateSessionRequest; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomSdkSigner.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomSdkSigner.java new file mode 100644 index 0000000000..b378602165 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/CustomSdkSigner.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.auth; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.arns.Arn; +import software.amazon.awssdk.auth.signer.Aws4Signer; +import software.amazon.awssdk.auth.signer.AwsS3V4Signer; +import software.amazon.awssdk.auth.signer.internal.AbstractAwsS3V4Signer; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.signer.Signer; +import software.amazon.awssdk.http.SdkHttpFullRequest; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * This class is for testing the SDK's signing: it + * can be declared as the signer class in the configuration + * and then the full test suite run with it. + * Derived from the inner class of {@code ITestCustomSigner}. + *
+ * fs.s3a.custom.signers=CustomSdkSigner:org.apache.hadoop.fs.s3a.auth.CustomSdkSigner
+ *
+ * fs.s3a.s3.signing-algorithm=CustomSdkSigner
+ * 
+ */ +public class CustomSdkSigner extends AbstractAwsS3V4Signer implements Signer { + + private static final Logger LOG = LoggerFactory + .getLogger(CustomSdkSigner.class); + + private static final AtomicInteger INSTANTIATION_COUNT = + new AtomicInteger(0); + private static final AtomicInteger INVOCATION_COUNT = + new AtomicInteger(0); + + /** + * Signer for all S3 requests. + */ + private final AwsS3V4Signer s3Signer = AwsS3V4Signer.create(); + + /** + * Signer for other services. + */ + private final Aws4Signer aws4Signer = Aws4Signer.create(); + + + public CustomSdkSigner() { + int c = INSTANTIATION_COUNT.incrementAndGet(); + LOG.info("Creating Signer #{}", c); + } + + + /** + * Method to sign the incoming request with credentials. + *

+ * NOTE: In case of Client-side encryption, we do a "Generate Key" POST + * request to AWSKMS service rather than S3, this was causing the test to + * break. When this request happens, we have the endpoint in form of + * "kms.[REGION].amazonaws.com", and bucket-name becomes "kms". We can't + * use AWSS3V4Signer for AWSKMS service as it contains a header + * "x-amz-content-sha256:UNSIGNED-PAYLOAD", which returns a 400 bad + * request because the signature calculated by the service doesn't match + * what we sent. + * @param request the request to sign. + * @param executionAttributes request executionAttributes which contain the credentials. + */ + @Override + public SdkHttpFullRequest sign(SdkHttpFullRequest request, + ExecutionAttributes executionAttributes) { + int c = INVOCATION_COUNT.incrementAndGet(); + + String host = request.host(); + LOG.debug("Signing request #{} against {}: class {}", + c, host, request.getClass()); + String bucketName = parseBucketFromHost(host); + if (bucketName.equals("kms")) { + return aws4Signer.sign(request, executionAttributes); + } else { + return s3Signer.sign(request, executionAttributes); + } + } + + /** + * Parse the bucket name from the host. + * @param host hostname + * @return the parsed bucket name; if "kms" is KMS signing. + */ + static String parseBucketFromHost(String host) { + String[] hostBits = host.split("\\."); + String bucketName = hostBits[0]; + String service = hostBits[1]; + + if (bucketName.equals("kms")) { + return bucketName; + } + + if (service.contains("s3-accesspoint") || service.contains("s3-outposts") + || service.contains("s3-object-lambda")) { + // If AccessPoint then bucketName is of format `accessPoint-accountId`; + String[] accessPointBits = bucketName.split("-"); + String accountId = accessPointBits[accessPointBits.length - 1]; + // Extract the access point name from bucket name. eg: if bucket name is + // test-custom-signer-, get the access point name test-custom-signer by removing + // - from the bucket name. + String accessPointName = + bucketName.substring(0, bucketName.length() - (accountId.length() + 1)); + Arn arn = Arn.builder() + .accountId(accountId) + .partition("aws") + .region(hostBits[2]) + .resource("accesspoint" + "/" + accessPointName) + .service("s3").build(); + + bucketName = arn.toString(); + } + + return bucketName; + } + + public static int getInstantiationCount() { + return INSTANTIATION_COUNT.get(); + } + + public static int getInvocationCount() { + return INVOCATION_COUNT.get(); + } + + public static String description() { + return "CustomSigner{" + + "invocations=" + INVOCATION_COUNT.get() + + ", instantiations=" + INSTANTIATION_COUNT.get() + + "}"; + } + + public static class Initializer implements AwsSignerInitializer { + + @Override + public void registerStore( + final String bucketName, + final Configuration storeConf, + final DelegationTokenProvider dtProvider, + final UserGroupInformation storeUgi) { + + LOG.debug("Registering store for bucket {}", bucketName); + } + + @Override + public void unregisterStore(final String bucketName, + final Configuration storeConf, + final DelegationTokenProvider dtProvider, + final UserGroupInformation storeUgi) { + LOG.debug("Unregistering store for bucket {}", bucketName); + } + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java index 11e73aeb75..534aeb10c8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java @@ -263,7 +263,7 @@ protected void deleteDirectoryTree(final Path path, final CompletableFuture abortUploads; if (dirOperationsPurgeUploads) { final StoreContext sc = getStoreContext(); - final String key = sc.pathToKey(path) + "/"; + final String key = dirKey; LOG.debug("All uploads under {} will be deleted", key); abortUploads = submit(sc.getExecutor(), sc.getActiveAuditSpan(), () -> callbacks.abortMultipartUploadsUnderPrefix(key)); @@ -439,14 +439,16 @@ private void asyncDeleteAction( .filter(e -> e.isDirMarker) .map(e -> e.objectIdentifier) .collect(Collectors.toList()); - LOG.debug("Deleting of {} directory markers", dirs.size()); - // This is invoked with deleteFakeDir. - Invoker.once("Remove S3 Dir Markers", - status.getPath().toString(), - () -> callbacks.removeKeys( - dirs, - true - )); + if (!dirs.isEmpty()) { + LOG.debug("Deleting {} directory markers", dirs.size()); + // This is invoked with deleteFakeDir. + Invoker.once("Remove S3 Dir Markers", + status.getPath().toString(), + () -> callbacks.removeKeys( + dirs, + true + )); + } } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java index f17295d0f5..cd78350a5d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -30,7 +31,24 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.s3a.Constants; +import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT; +import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE; +import static org.apache.hadoop.fs.CommonPathCapabilities.FS_CHECKSUMS; +import static org.apache.hadoop.fs.CommonPathCapabilities.FS_MULTIPART_UPLOADER; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; +import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; +import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE; +import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2; +import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE; +import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE; +import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP; +import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE; +import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE; +import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP; +import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER; /** * Internal constants private only to the S3A codebase. @@ -142,6 +160,9 @@ private InternalConstants() { /** 405 status code: Method Not Allowed. */ public static final int SC_405_METHOD_NOT_ALLOWED = 405; + /** 409 status code: Conflict. Example: creating a bucket twice. */ + public static final int SC_409_CONFLICT = 409; + /** 410 status code: Gone. */ public static final int SC_410_GONE = 410; @@ -239,6 +260,30 @@ private InternalConstants() { */ public static final Set CREATE_FILE_KEYS = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(Constants.FS_S3A_CREATE_PERFORMANCE))); + new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE))); + /** + * Dynamic Path capabilities to be evaluated + * in the BucketInfo tool. + */ + public static final List S3A_DYNAMIC_CAPABILITIES = + Collections.unmodifiableList(Arrays.asList( + ETAGS_AVAILABLE, + FS_CHECKSUMS, + FS_MULTIPART_UPLOADER, + DIRECTORY_LISTING_INCONSISTENT, + + // s3 specific + STORE_CAPABILITY_AWS_V2, + STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP, + STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE, + STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE, + STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP, + STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE, + STORE_CAPABILITY_MAGIC_COMMITTER, + STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED, + STORE_CAPABILITY_S3_EXPRESS_STORAGE, + FS_S3A_CREATE_PERFORMANCE_ENABLED, + DIRECTORY_OPERATIONS_PURGE_UPLOADS, + ENABLE_MULTI_DELETE)); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/NetworkBinding.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/NetworkBinding.java index 34b4049b06..910723f909 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/NetworkBinding.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/NetworkBinding.java @@ -97,6 +97,17 @@ public static void bindSSLChannelMode(Configuration conf, } } + /** + * Is this an AWS endpoint? looks at end of FQDN. + * @param endpoint endpoint + * @return true if the endpoint matches the requirements for an aws endpoint. + */ + public static boolean isAwsEndpoint(final String endpoint) { + return (endpoint.isEmpty() + || endpoint.endsWith(".amazonaws.com") + || endpoint.endsWith(".amazonaws.com.cn")); + } + /** * Interface used to bind to the socket factory, allows the code which * works with the shaded AWS libraries to exist in their own class. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index b441bda521..17a7189ae2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -644,7 +644,7 @@ public DeleteObjectsRequest.Builder newBulkDeleteRequestBuilder( return prepareRequest(DeleteObjectsRequest .builder() .bucket(bucket) - .delete(d -> d.objects(keysToDelete).quiet(true))); + .delete(d -> d.objects(keysToDelete).quiet(!LOG.isTraceEnabled()))); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3ExpressStorage.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3ExpressStorage.java new file mode 100644 index 0000000000..fde845c5d0 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3ExpressStorage.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.isAwsEndpoint; + +/** + * Anything needed to support Amazon S3 Express One Zone Storage. + * These have bucket names like {@code s3a://bucket--usw2-az2--x-s3/} + */ +public final class S3ExpressStorage { + + /** + * Is this S3Express storage? value {@value}. + */ + public static final String STORE_CAPABILITY_S3_EXPRESS_STORAGE = + "fs.s3a.capability.s3express.storage"; + + /** + * What is the official product name? used for error messages and logging: {@value}. + */ + public static final String PRODUCT_NAME = "Amazon S3 Express One Zone Storage"; + + private S3ExpressStorage() { + } + + /** + * Minimum length of a region. + */ + private static final int SUFFIX_LENGTH = "--usw2-az2--x-s3".length(); + + public static final int ZONE_LENGTH = "usw2-az2".length(); + + /** + * Suffix of S3Express storage bucket names.. + */ + public static final String S3EXPRESS_STORE_SUFFIX = "--x-s3"; + + /** + * Is a bucket an S3Express store? + * This may get confused against third party stores, so takes the endpoint + * and only supports aws endpoints round the world. + * @param bucket bucket to probe + * @param endpoint endpoint string. + * @return true if the store is S3 Express. + */ + public static boolean isS3ExpressStore(String bucket, final String endpoint) { + return isAwsEndpoint(endpoint) && hasS3ExpressSuffix(bucket); + } + + /** + * Check for a bucket name matching -does not look at endpoint. + * @param bucket bucket to probe. + * @return true if the suffix is present + */ + public static boolean hasS3ExpressSuffix(final String bucket) { + return bucket.endsWith(S3EXPRESS_STORE_SUFFIX); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java index ea1ea90848..41251d190c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java @@ -58,6 +58,7 @@ import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy; import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl; import org.apache.hadoop.fs.s3a.select.SelectTool; +import org.apache.hadoop.fs.s3a.tools.BucketTool; import org.apache.hadoop.fs.s3a.tools.MarkerTool; import org.apache.hadoop.fs.shell.CommandFormat; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -74,6 +75,7 @@ import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.FILESYSTEM_TEMP_PATH; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.S3A_DYNAMIC_CAPABILITIES; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.MULTIPART_UPLOAD_ABORTED; @@ -117,6 +119,7 @@ public abstract class S3GuardTool extends Configured implements Tool, " [command] [OPTIONS] [s3a://BUCKET]\n\n" + "Commands: \n" + "\t" + BucketInfo.NAME + " - " + BucketInfo.PURPOSE + "\n" + + "\t" + BucketTool.NAME + " - " + BucketTool.PURPOSE + "\n" + "\t" + MarkerTool.MARKERS + " - " + MarkerTool.PURPOSE + "\n" + "\t" + SelectTool.NAME + " - " + SelectTool.PURPOSE + "\n" + "\t" + Uploads.NAME + " - " + Uploads.PURPOSE + "\n"; @@ -164,8 +167,19 @@ public abstract class S3GuardTool extends Configured implements Tool, * @param opts any boolean options to support */ protected S3GuardTool(Configuration conf, String... opts) { + this(conf, 0, Integer.MAX_VALUE, opts); + } + + /** + * Constructor a S3Guard tool with HDFS configuration. + * @param conf Configuration. + * @param min min number of args + * @param max max number of args + * @param opts any boolean options to support + */ + protected S3GuardTool(Configuration conf, int min, int max, String... opts) { super(conf); - commandFormat = new CommandFormat(0, Integer.MAX_VALUE, opts); + commandFormat = new CommandFormat(min, max, opts); } /** @@ -208,7 +222,7 @@ long ageOptionsToMsec() { return cliDelta; } - protected void addAgeOptions() { + protected final void addAgeOptions() { CommandFormat format = getCommandFormat(); format.addOptionWithValue(DAYS_FLAG); format.addOptionWithValue(HOURS_FLAG); @@ -240,6 +254,22 @@ protected List parseArgs(String[] args) { return getCommandFormat().parse(args, 1); } + /** + * Process the arguments. + * @param args raw args + * @return process arg list. + * @throws ExitUtil.ExitException if there's an unknown option. + */ + protected List parseArgsWithErrorReporting(final String[] args) + throws ExitUtil.ExitException { + try { + return parseArgs(args); + } catch (CommandFormat.UnknownOptionException e) { + errorln(getUsage()); + throw new ExitUtil.ExitException(EXIT_USAGE, e.getMessage(), e); + } + } + protected S3AFileSystem getFilesystem() { return filesystem; } @@ -273,7 +303,7 @@ protected void resetBindings() { filesystem = null; } - protected CommandFormat getCommandFormat() { + protected final CommandFormat getCommandFormat() { return commandFormat; } @@ -397,6 +427,7 @@ public int run(String[] args, PrintStream out) Configuration conf = fs.getConf(); URI fsUri = fs.getUri(); println(out, "Filesystem %s", fsUri); + final Path root = new Path("/"); try { println(out, "Location: %s", fs.getBucketLocation()); } catch (IOException e) { @@ -524,6 +555,13 @@ public int run(String[] args, PrintStream out) processMarkerOption(out, fs, getCommandFormat().getOptValue(MARKERS_FLAG)); + // and check for capabilitities + println(out, "%nStore Capabilities"); + for (String capability : S3A_DYNAMIC_CAPABILITIES) { + out.printf("\t%s %s%n", capability, + fs.hasPathCapability(root, capability)); + } + println(out, ""); // and finally flush the output and report a success. out.flush(); return SUCCESS; @@ -584,7 +622,7 @@ private String printOption(PrintStream out, /** * Command to list / abort pending multipart uploads. */ - static class Uploads extends S3GuardTool { + static final class Uploads extends S3GuardTool { public static final String NAME = "uploads"; public static final String ABORT = "abort"; public static final String LIST = "list"; @@ -736,7 +774,7 @@ private boolean olderThan(MultipartUpload u, long msec) { return ageDate.compareTo(Date.from(u.initiated())) >= 0; } - private void processArgs(List args, PrintStream out) + protected void processArgs(List args, PrintStream out) throws IOException { CommandFormat commands = getCommandFormat(); String err = "Can only specify one of -" + LIST + ", " + @@ -947,9 +985,13 @@ public static int run(Configuration conf, String... args) throws throw s3guardUnsupported(); } switch (subCommand) { + case BucketInfo.NAME: command = new BucketInfo(conf); break; + case BucketTool.NAME: + command = new BucketTool(conf); + break; case MarkerTool.MARKERS: command = new MarkerTool(conf); break; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/BucketTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/BucketTool.java new file mode 100644 index 0000000000..6bfcbcf776 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/BucketTool.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.tools; + +import java.io.FileNotFoundException; +import java.io.PrintStream; +import java.net.URI; +import java.util.List; +import java.util.Optional; +import java.util.function.BiFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.BucketType; +import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.DataRedundancy; +import software.amazon.awssdk.services.s3.model.LocationInfo; +import software.amazon.awssdk.services.s3.model.LocationType; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool; +import org.apache.hadoop.fs.shell.CommandFormat; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.ExitUtil; + +import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; +import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; +import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.PRODUCT_NAME; +import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE; +import static org.apache.hadoop.fs.s3a.Invoker.once; +import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS; +import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.isAwsEndpoint; +import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.hasS3ExpressSuffix; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_BAD_CONFIGURATION; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_NOT_ACCEPTABLE; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_USAGE; + +/** + * Bucket operations, e.g. create/delete/probe. + */ +public final class BucketTool extends S3GuardTool { + + private static final Logger LOG = LoggerFactory.getLogger(BucketTool.class); + + /** + * Name of this tool: {@value}. + */ + public static final String NAME = "bucket"; + + /** + * Purpose of this tool: {@value}. + */ + public static final String PURPOSE = + "View and manipulate S3 buckets"; + + /** + * create command. + */ + public static final String CREATE = "create"; + + /** + * region {@value}. + */ + public static final String OPT_REGION = "region"; + + /** + * endpoint {@value}. + */ + public static final String OPT_ENDPOINT = "endpoint"; + + /** + * Zone for a store. + */ + public static final String OPT_ZONE = "zone"; + + /** + * Error message if -zone is set but the name doesn't match. + * Value {@value}. + */ + static final String UNSUPPORTED_ZONE_ARG = + "The -zone option is only supported for " + PRODUCT_NAME; + + /** + * Error message if the bucket is S3 Express but -zone wasn't set. + * Value {@value}. + */ + static final String NO_ZONE_SUPPLIED = "Required option -zone missing for " + + PRODUCT_NAME + " bucket"; + + /** + * Error Message logged/thrown when the tool could not start as + * the bucket probe was not disabled and the probe (inevitably) + * failed. + */ + public static final String PROBE_FAILURE = + "Initialization failed because the bucket existence probe" + + S3A_BUCKET_PROBE + " was not disabled. Check core-site settings."; + + public BucketTool(final Configuration conf) { + super(conf, 1, 1, + CREATE); + CommandFormat format = getCommandFormat(); + format.addOptionWithValue(OPT_REGION); + format.addOptionWithValue(OPT_ENDPOINT); + format.addOptionWithValue(OPT_ZONE); + } + + public String getUsage() { + return "bucket " + + "-" + CREATE + " " + + "[-" + OPT_ENDPOINT + " ] " + + "[-" + OPT_REGION + " ] " + + "[-" + OPT_ZONE + " ] " + + " "; + } + + public String getName() { + return NAME; + } + + private Optional getOptionalString(String key) { + String value = getCommandFormat().getOptValue(key); + return isNotEmpty(value) ? Optional.of(value) : Optional.empty(); + } + + @VisibleForTesting + int exec(final String...args) throws Exception { + return run(args, System.out); + } + + @Override + public int run(final String[] args, final PrintStream out) + throws Exception, ExitUtil.ExitException { + + LOG.debug("Supplied arguments: {}", String.join(", ", args)); + final List parsedArgs = parseArgsWithErrorReporting(args); + + CommandFormat command = getCommandFormat(); + boolean create = command.getOpt(CREATE); + + Optional endpoint = getOptionalString(OPT_ENDPOINT); + Optional region = getOptionalString(OPT_REGION); + Optional zone = getOptionalString(OPT_ZONE); + + + final String bucketPath = parsedArgs.get(0); + final Path source = new Path(bucketPath); + URI fsURI = source.toUri(); + String bucket = fsURI.getHost(); + + println(out, "Filesystem %s", fsURI); + if (!"s3a".equals(fsURI.getScheme())) { + throw new ExitUtil.ExitException(EXIT_USAGE, "Filesystem is not S3A URL: " + fsURI); + } + println(out, "Options region=%s endpoint=%s zone=%s s3a://%s", + region.orElse("(unset)"), + endpoint.orElse("(unset)"), + zone.orElse("(unset)"), + bucket); + if (!create) { + errorln(getUsage()); + println(out, "Supplied arguments: [" + + String.join(", ", parsedArgs) + + "]"); + throw new ExitUtil.ExitException(EXIT_USAGE, + "required option not found: -create"); + } + + final Configuration conf = getConf(); + + removeBucketOverrides(bucket, conf, + S3A_BUCKET_PROBE, + REJECT_OUT_OF_SPAN_OPERATIONS, + AWS_REGION, + ENDPOINT); + // stop any S3 calls taking place against a bucket which + // may not exist + String bucketPrefix = "fs.s3a.bucket." + bucket + '.'; + conf.setInt(bucketPrefix + S3A_BUCKET_PROBE, 0); + conf.setBoolean(bucketPrefix + REJECT_OUT_OF_SPAN_OPERATIONS, false); + // propagate an option + BiFunction, Boolean> propagate = (key, opt) -> + opt.map(v -> { + conf.set(key, v); + LOG.info("{} = {}", key, v); + return true; + }).orElse(false); + + + propagate.apply(AWS_REGION, region); + propagate.apply(ENDPOINT, endpoint); + + // fail fast on third party store + if (hasS3ExpressSuffix(bucket) && !isAwsEndpoint(endpoint.orElse(""))) { + throw new ExitUtil.ExitException(EXIT_NOT_ACCEPTABLE, UNSUPPORTED_ZONE_ARG); + } + S3AFileSystem fs; + try { + fs = (S3AFileSystem) FileSystem.newInstance(fsURI, conf); + } catch (FileNotFoundException e) { + // this happens if somehow the probe wasn't disabled. + errorln(PROBE_FAILURE); + throw new ExitUtil.ExitException(EXIT_BAD_CONFIGURATION, PROBE_FAILURE); + } + + try { + + // now build the configuration + final CreateBucketConfiguration.Builder builder = CreateBucketConfiguration.builder(); + + if (fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE)) { + // S3 Express store requires a zone and some other other settings + final String az = zone.orElseThrow(() -> + new ExitUtil.ExitException(EXIT_USAGE, NO_ZONE_SUPPLIED + bucket)); + builder.location(LocationInfo.builder() + .type(LocationType.AVAILABILITY_ZONE).name(az).build()) + .bucket(software.amazon.awssdk.services.s3.model.BucketInfo.builder() + .type(BucketType.DIRECTORY) + .dataRedundancy(DataRedundancy.SINGLE_AVAILABILITY_ZONE).build()); + + } else { + if (zone.isPresent()) { + throw new ExitUtil.ExitException(EXIT_USAGE, UNSUPPORTED_ZONE_ARG + " not " + bucket); + } + region.ifPresent(builder::locationConstraint); + } + + final CreateBucketRequest request = CreateBucketRequest.builder() + .bucket(bucket) + .createBucketConfiguration(builder.build()) + .build(); + + println(out, "Creating bucket %s", bucket); + + final S3Client s3Client = fs.getS3AInternals().getAmazonS3Client(NAME); + try (DurationInfo ignored = new DurationInfo(LOG, + "Create %sbucket %s in region %s", + (fs.hasPathCapability(new Path("/"), + STORE_CAPABILITY_S3_EXPRESS_STORAGE) ? (PRODUCT_NAME + " "): ""), + bucket, region.orElse("(unset)"))) { + once("create", source.toString(), () -> + s3Client.createBucket(request)); + } + } finally { + IOUtils.closeStream(fs); + } + + return 0; + } + + /** + * Remove any values from a bucket. + * @param bucket bucket whose overrides are to be removed. Can be null/empty + * @param conf config + * @param options list of fs.s3a options to remove + */ + public static void removeBucketOverrides(final String bucket, + final Configuration conf, + final String... options) { + + if (StringUtils.isEmpty(bucket)) { + return; + } + final String bucketPrefix = "fs.s3a.bucket." + bucket + '.'; + for (String option : options) { + final String stripped = option.substring("fs.s3a.".length()); + String target = bucketPrefix + stripped; + String v = conf.get(target); + if (v != null) { + conf.unset(target); + } + String extended = bucketPrefix + option; + if (conf.get(extended) != null) { + conf.unset(extended); + } + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java index f54ab18650..b2ff63b398 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java @@ -237,13 +237,7 @@ public void resetBindings() { public int run(final String[] args, final PrintStream stream) throws ExitUtil.ExitException, Exception { this.out = stream; - final List parsedArgs; - try { - parsedArgs = parseArgs(args); - } catch (CommandFormat.UnknownOptionException e) { - errorln(getUsage()); - throw new ExitUtil.ExitException(EXIT_USAGE, e.getMessage(), e); - } + final List parsedArgs = parseArgsWithErrorReporting(args); if (parsedArgs.size() != 1) { errorln(getUsage()); println(out, "Supplied arguments: [" diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md index 138e060155..8a3fd16ec1 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/aws_sdk_upgrade.md @@ -288,6 +288,16 @@ public interface S3AInternals { HeadObjectResponse getObjectMetadata(Path path) throws IOException; AWSCredentialProviderList shareCredentials(final String purpose); + + @AuditEntryPoint + @Retries.RetryTranslated + HeadBucketResponse getBucketMetadata() throws IOException; + + boolean isMultipartCopyEnabled(); + + @AuditEntryPoint + @Retries.RetryTranslated + long abortMultipartUploads(Path path) throws IOException; } ``` diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md index 0216e46014..1aa6e83b11 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/third_party_stores.md @@ -41,9 +41,18 @@ The features which may be unavailable include: * List API to use (`fs.s3a.list.version = 1`) * Bucket lifecycle rules to clean up pending uploads. -## Configuring s3a to connect to a third party store +### Disabling Change Detection +The (default) etag-based change detection logic expects stores to provide an Etag header in HEAD/GET requests, +and to support it as a precondition in subsequent GET and COPY calls. +If a store does not do this, disable the checks. +```xml + + fs.s3a.change.detection.mode + none + +``` ## Connecting to a third party object store over HTTPS The core setting for a third party store is to change the endpoint in `fs.s3a.endpoint`. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java index 6df4f7593c..3a3d82d94f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java @@ -49,6 +49,7 @@ public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase { @Override protected Configuration createConfiguration() { Configuration conf = super.createConfiguration(); + skipIfEncryptionTestsDisabled(conf); S3ATestUtils.disableFilesystemCaching(conf); patchConfigurationEncryptionSettings(conf); return conf; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java index d9c847d4b2..8787fca431 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java @@ -131,7 +131,7 @@ public void testEndpoint() throws Exception { S3ATestConstants.CONFIGURATION_TEST_ENDPOINT, ""); if (endpoint.isEmpty()) { LOG.warn("Custom endpoint test skipped as " + - S3ATestConstants.CONFIGURATION_TEST_ENDPOINT + "config " + + S3ATestConstants.CONFIGURATION_TEST_ENDPOINT + " config " + "setting was not detected"); } else { conf.set(Constants.ENDPOINT, endpoint); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java index 45b0c6c206..321f831c0a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java @@ -49,6 +49,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -104,7 +105,6 @@ public ITestS3AEncryptionSSEC(final String name, @Override protected Configuration createConfiguration() { Configuration conf = super.createConfiguration(); - disableFilesystemCaching(conf); String bucketName = getTestBucketName(conf); // directory marker options removeBaseAndBucketOverrides(bucketName, conf, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java index 66a211181b..a375044add 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java @@ -19,18 +19,12 @@ package org.apache.hadoop.fs.s3a; import java.io.ByteArrayInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.nio.charset.StandardCharsets; -import java.nio.file.AccessDeniedException; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.GetBucketEncryptionRequest; -import software.amazon.awssdk.services.s3.model.GetBucketEncryptionResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import org.assertj.core.api.Assertions; -import org.junit.Assume; import org.junit.Test; @@ -45,7 +39,6 @@ import org.apache.hadoop.fs.store.EtagChecksum; import org.apache.hadoop.test.LambdaTestUtils; -import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; @@ -55,7 +48,6 @@ import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_ETAG; -import static org.hamcrest.Matchers.nullValue; /** * Tests of the S3A FileSystem which don't have a specific home and can share @@ -153,39 +145,6 @@ Path mkFile(String name, byte[] data) throws IOException { return f; } - /** - * The assumption here is that 0-byte files uploaded in a single PUT - * always have the same checksum, including stores with encryption. - * This will be skipped if the bucket has S3 default encryption enabled. - * @throws Throwable on a failure - */ - @Test - public void testEmptyFileChecksums() throws Throwable { - assumeNoDefaultEncryption(); - final S3AFileSystem fs = getFileSystem(); - Path file1 = touchFile("file1"); - EtagChecksum checksum1 = fs.getFileChecksum(file1, 0); - LOG.info("Checksum for {}: {}", file1, checksum1); - assertHasPathCapabilities(fs, file1, - CommonPathCapabilities.FS_CHECKSUMS); - assertNotNull("Null file 1 checksum", checksum1); - assertNotEquals("file 1 checksum", 0, checksum1.getLength()); - assertEquals("checksums of empty files", checksum1, - fs.getFileChecksum(touchFile("file2"), 0)); - Assertions.assertThat(fs.getXAttr(file1, XA_ETAG)) - .describedAs("etag from xattr") - .isEqualTo(checksum1.getBytes()); - } - - /** - * Skip a test if we can get the default encryption on a bucket and it is - * non-null. - */ - private void assumeNoDefaultEncryption() throws IOException { - skipIfClientSideEncryption(); - Assume.assumeThat(getDefaultEncryption(), nullValue()); - } - /** * Make sure that when checksums are disabled, the caller * gets null back. @@ -226,24 +185,6 @@ public void testNonEmptyFileChecksums() throws Throwable { .isEqualTo(checksum1.getBytes()); } - /** - * Verify that on an unencrypted store, the checksum of two non-empty - * (single PUT) files is the same if the data is the same. - * This will be skipped if the bucket has S3 default encryption enabled. - * @throws Throwable failure - */ - @Test - public void testNonEmptyFileChecksumsUnencrypted() throws Throwable { - Assume.assumeTrue(encryptionAlgorithm().equals(S3AEncryptionMethods.NONE)); - assumeNoDefaultEncryption(); - final S3AFileSystem fs = getFileSystem(); - final EtagChecksum checksum1 = - fs.getFileChecksum(mkFile("file5", HELLO), 0); - assertNotNull("file 3 checksum", checksum1); - assertEquals("checksums", checksum1, - fs.getFileChecksum(mkFile("file6", HELLO), 0)); - } - private S3AEncryptionMethods encryptionAlgorithm() { return getFileSystem().getS3EncryptionAlgorithm(); } @@ -399,22 +340,4 @@ private static T verifyNoTrailingSlash(String role, T o) { return o; } - /** - * Gets default encryption settings for the bucket or returns null if default - * encryption is disabled. - */ - private GetBucketEncryptionResponse getDefaultEncryption() throws IOException { - S3AFileSystem fs = getFileSystem(); - S3Client s3 = getS3AInternals().getAmazonS3Client("check default encryption"); - try (AuditSpan s = span()){ - return Invoker.once("getBucketEncryption()", - fs.getBucket(), - () -> s3.getBucketEncryption(GetBucketEncryptionRequest.builder() - .bucket(fs.getBucket()) - .build())); - } catch (FileNotFoundException | AccessDeniedException | AWSBadRequestException e) { - return null; - } - } - } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java index 3f6870be46..20e5c12ec7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.store.audit.AuditSpan; -import org.apache.hadoop.io.IOUtils; import org.junit.Assert; import org.slf4j.Logger; @@ -35,22 +34,31 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertFileHasLength; +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; -import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX; /** * Utilities for S3A multipart upload tests. */ public final class MultipartTestUtils { + private static final Logger LOG = LoggerFactory.getLogger( MultipartTestUtils.class); + /** + * Target file of {@code createMagicFile()}. + */ + public static final String MAGIC_FILE_TARGET = "subdir/file.txt"; + /** Not instantiated. */ private MultipartTestUtils() { } @@ -94,24 +102,10 @@ public static IdKey createPartUpload(S3AFileSystem fs, String key, int len, /** Delete any uploads under given path (recursive). Silent on failure. */ public static void clearAnyUploads(S3AFileSystem fs, Path path) { - String key = fs.pathToKey(path); - AuditSpan span = null; try { - RemoteIterator uploads = fs.listUploads(key); - span = fs.createSpan("multipart", path.toString(), null); - final WriteOperationHelper helper - = fs.getWriteOperationHelper(); - while (uploads.hasNext()) { - MultipartUpload upload = uploads.next(); - LOG.debug("Cleaning up upload: {} {}", upload.key(), - truncatedUploadId(upload.uploadId())); - helper.abortMultipartUpload(upload.key(), - upload.uploadId(), true, LOG_EVENT); - } + fs.getS3AInternals().abortMultipartUploads(path); } catch (IOException ioe) { LOG.info("Ignoring exception: ", ioe); - } finally { - IOUtils.closeStream(span); } } @@ -165,6 +159,33 @@ private static String truncatedUploadId(String fullId) { return fullId.substring(0, 12) + " ..."; } + /** + * Given a dir, return the name of the magic subdir. + * The naming has changed across versions; this isolates + * the changes. + * @param dir directory + * @return the magic subdir + */ + public static Path magicPath(Path dir) { + return new Path(dir, MAGIC_PATH_PREFIX + "001/"); + } + + /** + * Create a magic file of "real" length more than 0 bytes long. + * @param fs filesystem + * @param dir directory + * @return the path + * @throws IOException creation failure.p + */ + public static Path createMagicFile(final S3AFileSystem fs, final Path dir) throws IOException { + Path magicFile = new Path(magicPath(dir), "__base/" + MAGIC_FILE_TARGET); + createFile(fs, magicFile, true, "123".getBytes(StandardCharsets.UTF_8)); + + // the file exists but is a 0 byte marker file. + assertFileHasLength(fs, magicFile, 0); + return magicFile; + } + /** Struct of object key, upload ID. */ public static class IdKey { private String key; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 10bb4a8032..8425a17922 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.EtagSource; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; @@ -37,6 +38,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ContextAccessors; +import org.apache.hadoop.fs.s3a.impl.NetworkBinding; import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; @@ -56,6 +58,8 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.functional.CallableRaisingIOE; import org.apache.hadoop.util.functional.FutureIO; @@ -90,6 +94,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; +import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE; import static org.apache.hadoop.test.GenericTestUtils.buildPaths; import static org.apache.hadoop.util.Preconditions.checkNotNull; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH; @@ -99,6 +104,8 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.buildEncryptionSecrets; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator; +import static org.apache.hadoop.util.functional.RemoteIterators.toList; import static org.junit.Assert.*; /** @@ -982,6 +989,54 @@ public static List createDirsAndFiles(final FileSystem fs, } } + /** + * Given a RemoteIterator to a list of file statuses, return a list of paths. + * @param i iterator + * @return list of paths + * @param type of status + * @throws IOException failure retrieving values from the iterator + */ + public static List toPathList(RemoteIterator i) + throws IOException { + return toList(mappingRemoteIterator(i, FileStatus::getPath)); + } + + /** + * Expect an error code from the exception. + * @param code error code + * @param error exception + */ + public static void expectErrorCode(final int code, final ExitUtil.ExitException error) { + if (error.getExitCode() != code) { + throw error; + } + } + + /** + * Require a test case to be against Amazon S3 Express store. + */ + public static void assumeS3ExpressFileSystem(final FileSystem fs) throws IOException { + assume("store is not S3 Express: " + fs.getUri(), + fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE)); + } + + /** + * Require a test case to be against a standard S3 store. + */ + public static void assumeNotS3ExpressFileSystem(final FileSystem fs) throws IOException { + assume("store is S3 Express: " + fs.getUri(), + !fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE)); + } + + /** + * Require a store to be hosted by Amazon -i.e. not a third party store. + */ + public static void assumeStoreAwsHosted(final FileSystem fs) { + assume("store is not AWS S3", + !NetworkBinding.isAwsEndpoint(fs.getConf() + .getTrimmed(ENDPOINT, DEFAULT_ENDPOINT))); + } + /** * Helper class to do diffs of metrics. */ @@ -1573,5 +1628,26 @@ public static boolean isCreatePerformanceEnabled(FileSystem fs) return fs.hasPathCapability(new Path("/"), FS_S3A_CREATE_PERFORMANCE_ENABLED); } + /** + * Is the filesystem connector bonded to S3Express storage? + * @param fs filesystem. + * @return true if the store has the relevant path capability. + * @throws IOException IO failure + */ + public static boolean isS3ExpressStorage(FileSystem fs) throws IOException { + return fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE); + } + /** + * Get an etag from a FileStatus which must implement + * the {@link EtagSource} interface -which S3AFileStatus does. + * + * @param status the status. + * @return the etag + */ + public static String etag(FileStatus status) { + Preconditions.checkArgument(status instanceof EtagSource, + "Not an EtagSource: %s", status); + return ((EtagSource) status).getEtag(); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java index fe48cd8911..58bb2a5e49 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java @@ -20,14 +20,18 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; -import software.amazon.awssdk.arns.Arn; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import software.amazon.awssdk.auth.signer.Aws4Signer; import software.amazon.awssdk.auth.signer.AwsS3V4Signer; +import software.amazon.awssdk.auth.signer.internal.AbstractAwsS3V4Signer; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.signer.Signer; import software.amazon.awssdk.http.SdkHttpFullRequest; @@ -38,8 +42,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; @@ -48,12 +54,18 @@ import org.apache.hadoop.security.UserGroupInformation; import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS; +import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE; import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3; +import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; /** * Tests for custom Signers and SignerInitializers. + * Because the v2 sdk has had some problems with bulk delete + * and custom signing, this suite is parameterized. */ +@RunWith(Parameterized.class) public class ITestCustomSigner extends AbstractS3ATestBase { private static final Logger LOG = LoggerFactory @@ -62,10 +74,33 @@ public class ITestCustomSigner extends AbstractS3ATestBase { private static final String TEST_ID_KEY = "TEST_ID_KEY"; private static final String TEST_REGION_KEY = "TEST_REGION_KEY"; + /** + * Parameterization. + */ + @Parameterized.Parameters(name = "{0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {"bulk delete", true}, + {"simple-delete", false}, + }); + } + + private final boolean bulkDelete; + + private final UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("user1"); + + private final UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser("user2"); + private String regionName; private String endpoint; + public ITestCustomSigner( + final String ignored, + final boolean bulkDelete) { + this.bulkDelete = bulkDelete; + } + @Override public void setup() throws Exception { super.setup(); @@ -79,6 +114,17 @@ public void setup() throws Exception { } LOG.info("Determined region name to be [{}] for bucket [{}]", regionName, fs.getBucket()); + CustomSignerInitializer.reset(); + } + + /** + * Teardown closes all filesystems for the test UGIs. + */ + @Override + public void teardown() throws Exception { + super.teardown(); + FileSystem.closeAllForUGI(ugi1); + FileSystem.closeAllForUGI(ugi2); } @Test @@ -86,12 +132,10 @@ public void testCustomSignerAndInitializer() throws IOException, InterruptedException { final Path basePath = path(getMethodName()); - UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("user1"); - FileSystem fs1 = runMkDirAndVerify(ugi1, + FileSystem fs1 = runStoreOperationsAndVerify(ugi1, new Path(basePath, "customsignerpath1"), "id1"); - UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser("user2"); - FileSystem fs2 = runMkDirAndVerify(ugi2, + FileSystem fs2 = runStoreOperationsAndVerify(ugi2, new Path(basePath, "customsignerpath2"), "id2"); Assertions.assertThat(CustomSignerInitializer.knownStores.size()) @@ -104,14 +148,15 @@ public void testCustomSignerAndInitializer() .as("Num registered stores mismatch").isEqualTo(0); } - private FileSystem runMkDirAndVerify(UserGroupInformation ugi, + private S3AFileSystem runStoreOperationsAndVerify(UserGroupInformation ugi, Path finalPath, String identifier) throws IOException, InterruptedException { Configuration conf = createTestConfig(identifier); - return ugi.doAs((PrivilegedExceptionAction) () -> { + return ugi.doAs((PrivilegedExceptionAction) () -> { int instantiationCount = CustomSigner.getInstantiationCount(); int invocationCount = CustomSigner.getInvocationCount(); - FileSystem fs = finalPath.getFileSystem(conf); + S3AFileSystem fs = (S3AFileSystem)finalPath.getFileSystem(conf); + fs.mkdirs(finalPath); Assertions.assertThat(CustomSigner.getInstantiationCount()) .as("CustomSigner Instantiation count lower than expected") @@ -130,6 +175,22 @@ private FileSystem runMkDirAndVerify(UserGroupInformation ugi, .as("Configuration TEST_KEY mismatch in %s", CustomSigner.description()) .isEqualTo(identifier); + // now do some more operations to make sure all is good. + final Path subdir = new Path(finalPath, "year=1970/month=1/day=1"); + fs.mkdirs(subdir); + + final Path file1 = new Path(subdir, "file1"); + ContractTestUtils.touch(fs, new Path(subdir, "file1")); + fs.listStatus(subdir); + fs.delete(file1, false); + ContractTestUtils.touch(fs, new Path(subdir, "file1")); + + // create a magic file. + createMagicFile(fs, subdir); + ContentSummary summary = fs.getContentSummary(finalPath); + fs.getS3AInternals().abortMultipartUploads(subdir); + fs.rename(subdir, new Path(finalPath, "renamed")); + fs.delete(finalPath, true); return fs; }); } @@ -143,10 +204,15 @@ private FileSystem runMkDirAndVerify(UserGroupInformation ugi, private Configuration createTestConfig(String identifier) { Configuration conf = createConfiguration(); + removeBaseAndBucketOverrides(conf, + CUSTOM_SIGNERS, + SIGNING_ALGORITHM_S3, + ENABLE_MULTI_DELETE); conf.set(CUSTOM_SIGNERS, "CustomS3Signer:" + CustomSigner.class.getName() + ":" + CustomSignerInitializer.class.getName()); conf.set(SIGNING_ALGORITHM_S3, "CustomS3Signer"); + conf.setBoolean(ENABLE_MULTI_DELETE, bulkDelete); conf.set(TEST_ID_KEY, identifier); conf.set(TEST_REGION_KEY, regionName); @@ -162,7 +228,7 @@ private String determineRegion(String bucketName) throws IOException { } @Private - public static final class CustomSigner implements Signer { + public static final class CustomSigner extends AbstractAwsS3V4Signer implements Signer { private static final AtomicInteger INSTANTIATION_COUNT = @@ -202,7 +268,8 @@ public SdkHttpFullRequest sign(SdkHttpFullRequest request, try { lastStoreValue = CustomSignerInitializer .getStoreValue(bucketName, UserGroupInformation.getCurrentUser()); - LOG.info("Store value for bucket {} is {}", bucketName, lastStoreValue); + LOG.info("Store value for bucket {} is {}", bucketName, + (lastStoreValue == null) ? "unset" : "set"); } catch (IOException e) { throw new RuntimeException("Failed to get current Ugi " + e, e); } @@ -216,35 +283,7 @@ public SdkHttpFullRequest sign(SdkHttpFullRequest request, } private String parseBucketFromHost(String host) { - String[] hostBits = host.split("\\."); - String bucketName = hostBits[0]; - String service = hostBits[1]; - - if (bucketName.equals("kms")) { - return bucketName; - } - - if (service.contains("s3-accesspoint") || service.contains("s3-outposts") - || service.contains("s3-object-lambda")) { - // If AccessPoint then bucketName is of format `accessPoint-accountId`; - String[] accessPointBits = bucketName.split("-"); - String accountId = accessPointBits[accessPointBits.length - 1]; - // Extract the access point name from bucket name. eg: if bucket name is - // test-custom-signer-, get the access point name test-custom-signer by removing - // - from the bucket name. - String accessPointName = - bucketName.substring(0, bucketName.length() - (accountId.length() + 1)); - Arn arn = Arn.builder() - .accountId(accountId) - .partition("aws") - .region(hostBits[2]) - .resource("accesspoint" + "/" + accessPointName) - .service("s3").build(); - - bucketName = arn.toString(); - } - - return bucketName; + return CustomSdkSigner.parseBucketFromHost(host); } public static int getInstantiationCount() { @@ -287,6 +326,13 @@ public void unregisterStore(String bucketName, Configuration storeConf, knownStores.remove(storeKey); } + /** + * Clear the stores; invoke during test setup. + */ + public static void reset() { + knownStores.clear(); + } + public static StoreValue getStoreValue(String bucketName, UserGroupInformation ugi) { StoreKey storeKey = new StoreKey(bucketName, ugi); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java index 7499e10da0..9e0bdd2cd3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java @@ -620,8 +620,10 @@ public void testBulkCommitFiles() throws Throwable { commits.add(commit1); } - assertPathDoesNotExist("destination dir", destDir); - assertPathDoesNotExist("subdirectory", subdir); + if (!isS3ExpressStorage(fs)) { + assertPathDoesNotExist("destination dir", destDir); + assertPathDoesNotExist("subdirectory", subdir); + } LOG.info("Initiating commit operations"); try (CommitContext commitContext = actions.createCommitContextForTesting(destDir, JOB_ID, 0)) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java new file mode 100644 index 0000000000..286a92f789 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestTreewalkProblems.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import software.amazon.awssdk.services.s3.model.MultipartUpload; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; +import org.apache.hadoop.fs.store.audit.AuditSpan; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.LocatedFileStatusFetcher; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.tools.DistCpConstants; +import org.apache.hadoop.tools.util.DistCpTestUtils; + +import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities; +import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; +import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk; +import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; +import static org.apache.hadoop.fs.s3a.MultipartTestUtils.assertNoUploadsAt; +import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads; +import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile; +import static org.apache.hadoop.fs.s3a.MultipartTestUtils.magicPath; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.toPathList; +import static org.apache.hadoop.fs.s3a.S3AUtils.HIDDEN_FILE_FILTER; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; +import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS; +import static org.apache.hadoop.util.ToolRunner.run; +import static org.apache.hadoop.util.functional.RemoteIterators.foreach; +import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromArray; +import static org.apache.hadoop.util.functional.RemoteIterators.toList; + +/** + * Test behavior of treewalking when there are pending + * uploads. All commands MUST work. + * Currently, the only one which doesn't is distcp; + * some tests do have different assertions about directories + * found. + */ +public class ITestTreewalkProblems extends AbstractS3ACostTest { + + /** + * Exit code to expect on a shell failure. + */ + public static final int SHELL_FAILURE = 1; + + /** + * Are directory listings potentially inconsistent? + */ + private boolean listingInconsistent; + + @Override + public Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + removeBaseAndBucketOverrides(conf, + DIRECTORY_OPERATIONS_PURGE_UPLOADS, + MAGIC_COMMITTER_ENABLED); + conf.setBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, true); + conf.setBoolean(MAGIC_COMMITTER_ENABLED, true); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + final S3AFileSystem fs = getFileSystem(); + final Path path = methodPath(); + assertHasPathCapabilities(fs, path, DIRECTORY_OPERATIONS_PURGE_UPLOADS); + listingInconsistent = fs.hasPathCapability(path, DIRECTORY_LISTING_INCONSISTENT); + clearAnyUploads(fs, path); + } + + @Test + public void testListFilesDeep() throws Throwable { + final S3AFileSystem fs = getFileSystem(); + final Path src = createDirWithUpload(); + + LOG.info("listFiles({}, true)", src); + foreach(fs.listFiles(src, true), e -> LOG.info("{}", e)); + + LOG.info("listFiles({}, true)", src); + foreach(fs.listLocatedStatus(src), e -> LOG.info("{}", e)); + + // and just verify a cleanup works + Assertions.assertThat(fs.getS3AInternals().abortMultipartUploads(src)) + .describedAs("Aborted uploads under %s", src) + .isEqualTo(1); + assertNoUploadsAt(fs, src); + } + + /** + * Create a directory methodPath()/src with a magic upload underneath, + * with the upload pointing at {@code src/subdir/file.txt}. + * @return the directory created + * @throws IOException creation problems + */ + private Path createDirWithUpload() throws IOException { + final S3AFileSystem fs = getFileSystem(); + final Path src = new Path(methodPath(), "src"); + // create a magic file. + createMagicFile(fs, src); + fs.delete(magicPath(src), true); + return src; + } + + @Test + public void testLocatedFileStatusFetcher() throws Throwable { + describe("Validate mapreduce LocatedFileStatusFetcher"); + + final Path src = createDirWithUpload(); + + Configuration listConfig = new Configuration(getConfiguration()); + listConfig.setInt(LIST_STATUS_NUM_THREADS, 2); + + LocatedFileStatusFetcher fetcher = + new LocatedFileStatusFetcher(listConfig, new Path[]{src}, true, HIDDEN_FILE_FILTER, true); + Assertions.assertThat(fetcher.getFileStatuses()).hasSize(0); + } + + @Test + public void testGetContentSummaryDirsAndFiles() throws Throwable { + describe("FileSystem.getContentSummary()"); + final S3AFileSystem fs = getFileSystem(); + final Path src = createDirWithUpload(); + fs.mkdirs(new Path(src, "child")); + final Path path = methodPath(); + file(new Path(path, "file")); + final int dirs = listingInconsistent ? 3 : 3; + assertContentSummary(path, dirs, 1); + } + + /** + * Execute getContentSummary() down a directory tree which only + * contains a single real directory. + * This test case has been a bit inconsistent between different store + * types. + */ + @Test + public void testGetContentSummaryPendingDir() throws Throwable { + describe("FileSystem.getContentSummary() with pending dir"); + assertContentSummary(createDirWithUpload(), 1, 0); + } + + /** + * Make an assertions about the content summary of a path. + * @param path path to scan + * @param dirs number of directories to find. + * @param files number of files to find + * @throws IOException scanning problems + */ + private void assertContentSummary( + final Path path, + final int dirs, + final int files) throws IOException { + ContentSummary summary = getFileSystem().getContentSummary(path); + Assertions.assertThat(summary.getDirectoryCount()) + .describedAs("dir count under %s of %s", path, summary) + .isEqualTo(dirs); + Assertions.assertThat(summary.getFileCount()) + .describedAs("filecount count under %s of %s", path, summary) + .isEqualTo(files); + } + + /** + * Execute getContentSummary() down a directory tree which only + * contains a single real directory. + */ + @Test + public void testGetContentSummaryFiles() throws Throwable { + describe("FileSystem.getContentSummary()"); + final S3AFileSystem fs = getFileSystem(); + final Path src = createDirWithUpload(); + fs.mkdirs(new Path(src, "child")); + final Path base = methodPath(); + touch(fs, new Path(base, "file")); + assertContentSummary(base, 3, 1); + } + + /** + * Test all the various filesystem.list* calls. + * Bundled into one test case to reduce setup/teardown overhead. + */ + @Test + public void testListStatusOperations() throws Throwable { + describe("FileSystem liststtus calls"); + final S3AFileSystem fs = getFileSystem(); + final Path base = methodPath(); + final Path src = createDirWithUpload(); + final Path file = new Path(base, "file"); + final Path dir2 = new Path(base, "dir2"); + // path in a child dir + final Path childfile = new Path(dir2, "childfile"); + file(childfile); + file(file); + fs.mkdirs(dir2); + + Assertions.assertThat(toPathList(fs.listStatusIterator(base))) + .describedAs("listStatusIterator(%s)", base) + .contains(src, dir2, file); + + Assertions.assertThat(toPathList(remoteIteratorFromArray(fs.listStatus(base)))) + .describedAs("listStatusIterator(%s, false)", false) + .contains(src, dir2, file); + + Assertions.assertThat(toPathList(fs.listFiles(base, false))) + .describedAs("listfiles(%s, false)", false) + .containsExactly(file); + + Assertions.assertThat(toPathList(fs.listFiles(base, true))) + .describedAs("listfiles(%s, true)", false) + .containsExactlyInAnyOrder(file, childfile); + + Assertions.assertThat(toPathList(fs.listLocatedStatus(base, (p) -> true))) + .describedAs("listLocatedStatus(%s, true)", false) + .contains(src, dir2, file); + Assertions.assertThat(toPathList(fs.listLocatedStatus(base))) + .describedAs("listLocatedStatus(%s, true)", false) + .contains(src, dir2, file); + } + + @Test + public void testShellList() throws Throwable { + describe("Validate hadoop fs -ls sorted and unsorted"); + final S3AFileSystem fs = getFileSystem(); + final Path base = methodPath(); + createDirWithUpload(); + fs.mkdirs(new Path(base, "dir2")); + // recursive not sorted + shell(base, "-ls", "-R", base.toUri().toString()); + + // recursive sorted + shell(base, "-ls", "-R", "-S", base.toUri().toString()); + } + + @Test + public void testShellDu() throws Throwable { + describe("Validate hadoop fs -du"); + final Path base = methodPath(); + createDirWithUpload(); + + shell(base, "-du", base.toUri().toString()); + } + + @Test + public void testShellDf() throws Throwable { + describe("Validate hadoop fs -df"); + final Path base = methodPath(); + + final String p = base.toUri().toString(); + shell(SHELL_FAILURE, base, "-df", p); + createDirWithUpload(); + + shell(base, "-df", p); + } + + @Test + public void testShellFind() throws Throwable { + describe("Validate hadoop fs -ls -R"); + final Path base = methodPath(); + final String p = base.toUri().toString(); + shell(SHELL_FAILURE, base, "-find", p, "-print"); + createDirWithUpload(); + shell(base, "-find", p, "-print"); + } + + @Test + public void testDistCp() throws Throwable { + describe("Validate distcp"); + final S3AFileSystem fs = getFileSystem(); + final Path base = methodPath(); + final Path src = createDirWithUpload(); + final Path dest = new Path(base, "dest"); + file(new Path(src, "real-file")); + // distcp fails if uploads are visible + DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(), + "-useiterator -update -delete -direct", getConfiguration()); + } + + @Test + public void testDistCpNoIterator() throws Throwable { + describe("Validate distcp"); + final S3AFileSystem fs = getFileSystem(); + final Path base = methodPath(); + final Path src = createDirWithUpload(); + final Path dest = new Path(base, "dest"); + file(new Path(src, "real-file")); + + // distcp fails if uploads are visible + DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(), + "-update -delete -direct", getConfiguration()); + } + + /** + * CTU is also doing treewalking, though it's test only. + */ + @Test + public void testContractTestUtilsTreewalk() throws Throwable { + final S3AFileSystem fs = getFileSystem(); + final Path base = methodPath(); + createDirWithUpload(); + final ContractTestUtils.TreeScanResults treeWalk = treeWalk(fs, base); + + ContractTestUtils.TreeScanResults listing = + new ContractTestUtils.TreeScanResults(fs.listFiles(base, true)); + treeWalk.assertFieldsEquivalent("treewalk vs listFiles(/, true)", listing, treeWalk.getFiles(), + listing.getFiles()); + } + + /** + * Globber is already resilient to missing directories; a relic + * of the time when HEAD requests on s3 objects could leave the + * 404 in S3 front end cache. + */ + @Test + public void testGlobberTreewalk() throws Throwable { + final S3AFileSystem fs = getFileSystem(); + final Path base = methodPath(); + final Path src = createDirWithUpload(); + // this is the pending dir + final Path subdir = new Path(src, "subdir/"); + final Path dest = new Path(base, "dest"); + final Path monday = new Path(dest, "day=monday"); + final Path realFile = file(new Path(monday, "real-file.parquet")); + assertGlob(fs, new Path(base, "*/*/*.parquet"), realFile); + if (listingInconsistent) { + assertGlob(fs, new Path(base, "*"), src, dest); + assertGlob(fs, new Path(base, "*/*"), subdir, monday); + } else { + assertGlob(fs, new Path(base, "*"), src, dest); + assertGlob(fs, new Path(base, "*/*"), monday); + } + } + + private static void assertGlob(final S3AFileSystem fs, + final Path pattern, + final Path... expected) + throws IOException { + final FileStatus[] globbed = fs.globStatus(pattern, + (f) -> true); + final List paths = Arrays.stream(globbed).map(s -> s.getPath()) + .collect(Collectors.toList()); + Assertions.assertThat(paths) + .describedAs("glob(%s)", pattern) + .containsExactlyInAnyOrder(expected); + } + + @Test + public void testFileInputFormatSplits() throws Throwable { + final S3AFileSystem fs = getFileSystem(); + final Path base = methodPath(); + final Path src = createDirWithUpload(); + final Path dest = new Path(base, "dest"); + final Path monday = new Path(dest, "day=monday"); + final int count = 4; + List files = new ArrayList<>(); + for (int i = 0; i < count; i++) { + files.add(file(new Path(monday, "file-0" + i + ".parquet"))); + } + final JobContextImpl jobContext = new JobContextImpl(getConfiguration(), new JobID("job", 1)); + final JobConf jc = (JobConf) jobContext.getConfiguration(); + jc.set("mapreduce.input.fileinputformat.inputdir", base.toUri().toString()); + jc.setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true); + final TextInputFormat inputFormat = new TextInputFormat(); + final List paths = inputFormat.getSplits(jobContext).stream().map(s -> + ((FileSplit) s).getPath()) + .collect(Collectors.toList()); + + Assertions.assertThat(paths) + .describedAs("input split of base directory") + .containsExactlyInAnyOrderElementsOf(files); + + + } + + /** + * Exec a shell command; require it to succeed. + * @param base base dir + * @param command command sequence + * @throws Exception failure + */ + + private void shell(final Path base, final String... command) throws Exception { + shell(0, base, command); + } + + /** + * Exec a shell command; require the result to match the expected outcome. + * @param expected expected outcome + * @param base base dir + * @param command command sequence + * @throws Exception failure + */ + + private void shell(int expected, final Path base, final String... command) throws Exception { + Assertions.assertThat(run(getConfiguration(), new FsShell(), command)) + .describedAs("%s %s", command[0], base) + .isEqualTo(expected); + } + + /** + * Assert the upload count under a dir is the expected value. + * Failure message will include the list of entries. + * @param dir dir + * @param expected expected count + * @throws IOException listing problem + */ + private void assertUploadCount(final Path dir, final int expected) throws IOException { + Assertions.assertThat(toList(listUploads(dir))) + .describedAs("uploads under %s", dir) + .hasSize(expected); + } + + /** + * List uploads; use the same APIs that the directory operations use, + * so implicitly validating them. + * @param dir directory to list + * @return full list of entries + * @throws IOException listing problem + */ + private RemoteIterator listUploads(Path dir) throws IOException { + final S3AFileSystem fs = getFileSystem(); + try (AuditSpan ignored = span()) { + final StoreContext sc = fs.createStoreContext(); + return fs.listUploadsUnderPrefix(sc, sc.pathToKey(dir)); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java index 9e07027375..80a44e22b8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestUploadPurgeOnDirectoryOperations.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.s3a.impl; import java.io.IOException; -import java.nio.charset.StandardCharsets; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -32,21 +31,20 @@ import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.store.audit.AuditSpan; -import static org.apache.hadoop.fs.contract.ContractTestUtils.assertFileHasLength; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities; -import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS; import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads; +import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_LIST; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX; import static org.apache.hadoop.util.functional.RemoteIterators.toList; /** * Test behavior of purging uploads in rename and delete. + * S3 Express tests automatically set this; it is explicitly set for the rest. */ public class ITestUploadPurgeOnDirectoryOperations extends AbstractS3ACostTest { @@ -117,22 +115,6 @@ public void testRenameWithPendingUpload() throws Throwable { assertUploadCount(dir, 0); } - /** - * Create a magic file of "real" length more than 0 bytes long. - * @param fs filesystem - * @param dir directory - * @return the path - * @throws IOException creation failure.p - */ - private static Path createMagicFile(final S3AFileSystem fs, final Path dir) throws IOException { - Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "001/file.txt"); - createFile(fs, magicFile, true, "123".getBytes(StandardCharsets.UTF_8)); - - // the file exists but is a 0 byte marker file. - assertFileHasLength(fs, magicFile, 0); - return magicFile; - } - /** * Assert the upload count under a dir is the expected value. * Failure message will include the list of entries. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3ExpressStorage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3ExpressStorage.java new file mode 100644 index 0000000000..2d5d69d2c9 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3ExpressStorage.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.test.AbstractHadoopTestBase; + +import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.isS3ExpressStore; + +/** + * Test S3 Express Storage methods. + */ +public class TestS3ExpressStorage extends AbstractHadoopTestBase { + + private static final String S3EXPRESS_BUCKET = "bucket--usw2-az2--x-s3"; + private static final String AZ = "usw2-az2"; + + @Test + public void testS3ExpressStateDefaultEndpoint() { + assertS3ExpressState(S3EXPRESS_BUCKET, true, ""); + assertS3ExpressState("bucket", false, ""); + } + + @Test + public void testS3ExpressStateAwsRegions() { + assertS3ExpressState(S3EXPRESS_BUCKET, true, "s3.amazonaws.com"); + assertS3ExpressState(S3EXPRESS_BUCKET, true, "s3.cn-northwest-1.amazonaws.com.cn"); + assertS3ExpressState(S3EXPRESS_BUCKET, true, "s3-fips.us-gov-east-1.amazonaws.com"); + assertS3ExpressState(S3EXPRESS_BUCKET, true, + "s3-accesspoint-fips.dualstack.us-east-1.amazonaws.com"); + assertS3ExpressState(S3EXPRESS_BUCKET, true, "s3.ca-central-1.amazonaws.com"); + } + + @Test + public void testS3ExpressStateThirdPartyStore() { + assertS3ExpressState(S3EXPRESS_BUCKET, false, "storeendpoint.example.com"); + } + + private void assertS3ExpressState(final String bucket, final boolean expected, String endpoint) { + Assertions.assertThat(isS3ExpressStore(bucket, endpoint)) + .describedAs("isS3ExpressStore(%s) with endpoint %s", bucket, endpoint) + .isEqualTo(expected); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestDirectoryMarkerListing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestDirectoryMarkerListing.java index 8dbf5baaa6..e001300474 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestDirectoryMarkerListing.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestDirectoryMarkerListing.java @@ -27,6 +27,7 @@ import java.util.concurrent.Callable; import java.util.stream.Collectors; +import org.assertj.core.api.Assertions; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; @@ -684,7 +685,8 @@ private void assertContainsFileUnderMarkerOnly( } /** - * Expect the list of status objects to match that of the paths. + * Expect the list of status objects to match that of the paths, + * without enforcing ordering of the values. * @param statuses status object list * @param paths ordered varargs list of paths * @param type of status objects @@ -692,20 +694,11 @@ private void assertContainsFileUnderMarkerOnly( private void assertContainsExactlyStatusOfPaths( List statuses, Path... paths) { - String actual = statuses.stream() - .map(Object::toString) - .collect(Collectors.joining(";")); - String expected = Arrays.stream(paths) - .map(Object::toString) - .collect(Collectors.joining(";")); - String summary = "expected [" + expected + "]" - + " actual = [" + actual + "]"; - assertEquals("mismatch in size of listing " + summary, - paths.length, statuses.size()); - for (int i = 0; i < statuses.size(); i++) { - assertEquals("Path mismatch at element " + i + " in " + summary, - paths[i], statuses.get(i).getPath()); - } + final List pathList = statuses.stream() + .map(FileStatus::getPath) + .collect(Collectors.toList()); + Assertions.assertThat(pathList) + .containsExactlyInAnyOrder(paths); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java index 95f22644c9..fd7a528a5d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java @@ -219,8 +219,10 @@ public void testToolsNoBucket() throws Throwable { cmdR.getName(), S3A_THIS_BUCKET_DOES_NOT_EXIST }; - intercept(UnknownStoreException.class, - () -> cmdR.run(argsR)); + intercept(UnknownStoreException.class, () -> { + final int e = cmdR.run(argsR); + return String.format("Outcome of %s on missing bucket: %d", tool, e); + }); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 75c6efbe2a..9cf3c220d1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -18,10 +18,12 @@ package org.apache.hadoop.fs.s3a.scale; +import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.IntFunction; @@ -72,6 +74,7 @@ import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS; +import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator; /** * Scale test which creates a huge file. @@ -103,8 +106,8 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { public void setup() throws Exception { super.setup(); scaleTestDir = new Path(getTestPath(), getTestSuiteName()); - hugefile = new Path(scaleTestDir, "hugefile"); - hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed"); + hugefile = new Path(scaleTestDir, "src/hugefile"); + hugefileRenamed = new Path(scaleTestDir, "dest/hugefile"); uploadBlockSize = uploadBlockSize(); filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE, DEFAULT_HUGE_FILESIZE); @@ -490,7 +493,40 @@ public void test_030_postCreationAssertions() throws Throwable { ContractTestUtils.assertPathExists(fs, "Huge file", hugefile); FileStatus status = fs.getFileStatus(hugefile); ContractTestUtils.assertIsFile(hugefile, status); + LOG.info("Huge File Status: {}", status); assertEquals("File size in " + status, filesize, status.getLen()); + + // now do some etag status checks asserting they are always the same + // across listing operations. + final Path path = hugefile; + final FileStatus listStatus = listFile(hugefile); + LOG.info("List File Status: {}", listStatus); + + Assertions.assertThat(listStatus.getLen()) + .describedAs("List file status length %s", listStatus) + .isEqualTo(filesize); + Assertions.assertThat(etag(listStatus)) + .describedAs("List file status etag %s", listStatus) + .isEqualTo(etag(status)); + } + + /** + * Get a filestatus by listing the parent directory. + * @param path path + * @return status + * @throws IOException failure to read, file not found + */ + private FileStatus listFile(final Path path) + throws IOException { + try { + return filteringRemoteIterator( + getFileSystem().listStatusIterator(path.getParent()), + st -> st.getPath().equals(path)) + .next(); + } catch (NoSuchElementException e) { + throw (FileNotFoundException)(new FileNotFoundException("Not found: " + path) + .initCause(e)); + } } /** @@ -509,7 +545,7 @@ public void test_040_PositionedReadHugeFile() throws Throwable { String filetype = encrypted ? "encrypted file" : "file"; describe("Positioned reads of %s %s", filetype, hugefile); S3AFileSystem fs = getFileSystem(); - FileStatus status = fs.getFileStatus(hugefile); + FileStatus status = listFile(hugefile); long size = status.getLen(); int ops = 0; final int bufferSize = 8192; @@ -518,7 +554,11 @@ public void test_040_PositionedReadHugeFile() throws Throwable { ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); ContractTestUtils.NanoTimer readAtByte0, readAtByte0Again, readAtEOF; - try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) { + try (FSDataInputStream in = fs.openFile(hugefile) + .withFileStatus(status) + .opt(FS_OPTION_OPENFILE_READ_POLICY, "random") + .opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize) + .build().get()) { readAtByte0 = new ContractTestUtils.NanoTimer(); in.readFully(0, buffer); readAtByte0.end("time to read data at start of file"); @@ -661,27 +701,45 @@ public void test_100_renameHugeFile() throws Throwable { S3AFileSystem fs = getFileSystem(); FileStatus status = fs.getFileStatus(hugefile); long size = status.getLen(); - fs.delete(hugefileRenamed, false); ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); - fs.rename(hugefile, hugefileRenamed); + renameFile(hugefile, hugefileRenamed); long mb = Math.max(size / _1MB, 1); timer.end("time to rename file of %d MB", mb); LOG.info("Time per MB to rename = {} nS", toHuman(timer.nanosPerOperation(mb))); bandwidth(timer, size); + assertPathExists("renamed file", hugefileRenamed); logFSState(); FileStatus destFileStatus = fs.getFileStatus(hugefileRenamed); assertEquals(size, destFileStatus.getLen()); // rename back ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer(); - fs.rename(hugefileRenamed, hugefile); + renameFile(hugefileRenamed, hugefile); + timer2.end("Renaming back"); LOG.info("Time per MB to rename = {} nS", toHuman(timer2.nanosPerOperation(mb))); bandwidth(timer2, size); } + /** + * Rename a file. + * Subclasses may do this differently. + * @param src source file + * @param dest dest file + * @throws IOException IO failure + */ + protected void renameFile(final Path src, + final Path dest) throws IOException { + final S3AFileSystem fs = getFileSystem(); + fs.delete(dest, false); + final boolean renamed = fs.rename(src, dest); + Assertions.assertThat(renamed) + .describedAs("rename(%s, %s)", src, dest) + .isTrue(); + } + /** * Test to verify target file encryption key. * @throws IOException diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java index b1323c49a8..1e74d715b8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java @@ -18,12 +18,20 @@ package org.apache.hadoop.fs.s3a.scale; +import java.io.IOException; + +import org.assertj.core.api.Assertions; + +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileSystem; import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BYTEBUFFER; /** * Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering. + * This also renames by parent directory, so validates parent + * dir renaming of huge files. */ public class ITestS3AHugeFilesByteBufferBlocks extends AbstractSTestS3AHugeFiles { @@ -31,4 +39,25 @@ public class ITestS3AHugeFilesByteBufferBlocks protected String getBlockOutputBufferName() { return FAST_UPLOAD_BYTEBUFFER; } + + /** + * Rename the parent directory, rather than the file itself. + * @param src source file + * @param dest dest file + * @throws IOException + */ + @Override + protected void renameFile(final Path src, final Path dest) throws IOException { + + final S3AFileSystem fs = getFileSystem(); + + final Path srcDir = src.getParent(); + final Path destDir = dest.getParent(); + fs.delete(destDir, true); + fs.mkdirs(destDir, null); + final boolean renamed = fs.rename(srcDir, destDir); + Assertions.assertThat(renamed) + .describedAs("rename(%s, %s)", src, dest) + .isTrue(); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/tools/ITestBucketTool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/tools/ITestBucketTool.java new file mode 100644 index 0000000000..3025646d23 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/tools/ITestBucketTool.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.tools; + +import java.io.IOException; +import java.net.UnknownHostException; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AWSBadRequestException; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.util.ExitUtil; + +import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeNotS3ExpressFileSystem; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeS3ExpressFileSystem; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.expectErrorCode; +import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE; +import static org.apache.hadoop.fs.s3a.tools.BucketTool.CREATE; +import static org.apache.hadoop.fs.s3a.tools.BucketTool.NO_ZONE_SUPPLIED; +import static org.apache.hadoop.fs.s3a.tools.BucketTool.OPT_ENDPOINT; +import static org.apache.hadoop.fs.s3a.tools.BucketTool.OPT_REGION; +import static org.apache.hadoop.fs.s3a.tools.BucketTool.OPT_ZONE; +import static org.apache.hadoop.fs.s3a.tools.BucketTool.UNSUPPORTED_ZONE_ARG; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_NOT_ACCEPTABLE; +import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_USAGE; +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * Test {@link BucketTool}. + * This is a tricky little test as it doesn't want to create any new bucket, + * including on third party stores. + */ +public class ITestBucketTool extends AbstractS3ATestBase { + + public static final String USWEST_2 = "us-west-2"; + + private static final Logger LOG = LoggerFactory.getLogger(ITestBucketTool.class); + + /** + * This is yours. + */ + public static final String OWNED = "BucketAlreadyOwnedByYouException"; + + /** + * This is not yours. Not *yet* used in any tests. + */ + public static final String EXISTS = "BucketAlreadyExists"; + + /** + * Sample S3 Express bucket name. {@value}. + */ + private static final String S3EXPRESS_BUCKET = "bucket--usw2-az2--x-s3"; + + /** + * Sample S3 Express bucket URI. {@value}. + */ + public static final String S3A_S3EXPRESS = "s3a://" + S3EXPRESS_BUCKET; + + /** + * US west az. {@value} + */ + public static final String USWEST_AZ_2 = "usw2-az2"; + + private String bucketName; + + private boolean s3ExpressStore; + + private BucketTool bucketTool; + + private String fsURI; + + private String region; + + private S3AFileSystem fs; + + @Override + public void setup() throws Exception { + super.setup(); + fs = getFileSystem(); + final Configuration conf = fs.getConf(); + bucketName = S3ATestUtils.getTestBucketName(conf); + fsURI = fs.getUri().toString(); + s3ExpressStore = fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE); + bucketTool = new BucketTool(conf); + region = conf.get(AWS_REGION, ""); + } + + @Test + public void testRecreateTestBucketS3Express() throws Throwable { + assumeS3ExpressFileSystem(getFileSystem()); + final IOException ex = intercept(IOException.class, + () -> bucketTool.exec("bucket", d(CREATE), d(OPT_ZONE), USWEST_AZ_2, d(OPT_REGION), region, + fsURI)); + if (ex instanceof AWSBadRequestException) { + // owned error + assertExceptionContains(OWNED, ex); + } else if (ex instanceof UnknownHostException) { + // endpoint region stuff, expect the error to include the s3express endpoint + // name + assertExceptionContains("s3express-control", ex); + } + } + + /** + * Try and recreate the test bucket. + * Third party stores may allow this, so skip test on them. + */ + @Test + public void testRecreateTestBucketNonS3Express() throws Throwable { + assumeNotS3ExpressFileSystem(fs); + assumeStoreAwsHosted(fs); + intercept(AWSBadRequestException.class, OWNED, + () -> bucketTool.exec("bucket", d(CREATE), + d(OPT_REGION), region, + fsURI)); + } + + @Test + public void testSimpleBucketWithZoneParam() throws Throwable { + expectErrorCode(EXIT_USAGE, + intercept(ExitUtil.ExitException.class, UNSUPPORTED_ZONE_ARG, () -> + bucketTool.exec("bucket", d(CREATE), + d(OPT_ZONE), USWEST_AZ_2, + "s3a://simple/"))); + } + + @Test + public void testS3ExpressBucketWithoutZoneParam() throws Throwable { + expectErrorCode(EXIT_USAGE, + intercept(ExitUtil.ExitException.class, NO_ZONE_SUPPLIED, () -> + bucketTool.exec("bucket", d(CREATE), + S3A_S3EXPRESS))); + } + + /** + * To avoid accidentally trying to create S3 Express stores on third party + * endpoints, the tool rejects such attempts. + */ + @Test + public void testS3ExpressStorageOnlyOnAwsEndpoint() throws Throwable { + describe("The tool only supports S3 Express bucket names on aws endpoints"); + expectErrorCode(EXIT_NOT_ACCEPTABLE, + intercept(ExitUtil.ExitException.class, () -> + bucketTool.exec("bucket", d(CREATE), + d(OPT_ENDPOINT), "s3.example.org", + S3A_S3EXPRESS))); + } + + /** + * Add a dash to a string. + * @param s source. + * @return "-s" + */ + private String d(String s) { + return "-" + s; + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml b/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml index e6c3eeed3d..f871369ed5 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml +++ b/hadoop-tools/hadoop-aws/src/test/resources/core-site.xml @@ -108,7 +108,7 @@ ireland.endpoint - s3-eu-west-1.amazonaws.com + s3.eu-west-1.amazonaws.com diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index 15c3b571b1..25247aaaab 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -92,3 +92,7 @@ log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO # uncomment this to trace where context entries are set # log4j.logger.org.apache.hadoop.fs.audit.CommonAuditContext=TRACE + +# uncomment this to get S3 Delete requests to return the list of deleted objects +# log4.logger.org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl=TRACE + diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/DistCpTestUtils.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/DistCpTestUtils.java index 624f7d5a0e..15211e261e 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/DistCpTestUtils.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/DistCpTestUtils.java @@ -24,9 +24,13 @@ import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.util.ToolRunner; +import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.stream.Collectors; + +import org.assertj.core.api.Assertions; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -91,7 +95,9 @@ private static void assertRunDistCp(int exitCode, String src, String dst, optsArr[optsArr.length - 2] = src; optsArr[optsArr.length - 1] = dst; - assertEquals(exitCode, - ToolRunner.run(conf, distCp, optsArr)); + Assertions.assertThat(ToolRunner.run(conf, distCp, optsArr)) + .describedAs("Exit code of distcp %s", + Arrays.stream(optsArr).collect(Collectors.joining(" "))) + .isEqualTo(exitCode); } }