HADOOP-18996. S3A to provide full support for S3 Express One Zone (#6308)

This adds borad support for Amazon S3 Express One Zone to the S3A connector,
particularly resilience of other parts of the codebase to LIST operations returning
paths under which only in-progress uploads are taking place.

hadoop-common and hadoop-mapreduce treewalking routines all cope with this;
distcp is left alone.

There are still some outstanding followup issues, and we expect more to surface
with extended use.

Contains HADOOP-18955. AWS SDK v2: add path capability probe "fs.s3a.capability.aws.v2
* lets us probe for AWS SDK version
* bucket-info reports it

Contains HADOOP-18961 S3A: add s3guard command "bucket"

hadoop s3guard bucket -create -region us-west-2 -zone usw2-az2 \
  s3a://stevel--usw2-az2--x-s3/

* requires -zone if bucket is zonal
* rejects it if not
* rejects zonal bucket suffixes if endpoint is not aws (safety feature)
* imperfect, but a functional starting point.

New path capability "fs.s3a.capability.zonal.storage"
* Used in tests to determine whether pending uploads manifest paths
* cli tests can probe for this
* bucket-info reports it
* some tests disable/change assertions as appropriate

----

Shell commands fail on S3Express buckets if pending uploads.

New path capability in hadoop-common
   "fs.capability.directory.listing.inconsistent"

1. S3AFS returns true on a S3 Express bucket
2. FileUtil.maybeIgnoreMissingDirectory(fs, path, fnfe)
   decides whether to swallow the exception or not.
3. This is used in: Shell, FileInputFormat, LocatedFileStatusFetcher

Fixes with tests
* fs -ls -R
* fs -du
* fs -df
* fs -find
* S3AFS.getContentSummary() (maybe...should discuss)
* mapred LocatedFileStatusFetcher
* Globber, HADOOP-15478 already fixed that when dealing with
   S3 inconsistencies
* FileInputFormat

S3Express CreateSession request is permitted outside audit spans.

S3 Bulk Delete calls request the store to return the list of deleted objects
if RequestFactoryImpl is set to trace.
log4j.logger.org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl=TRACE

Test Changes
 * ITestS3AMiscOperations removes all tests which require unencrypted
   buckets. AWS S3 defaults to SSE-S3 everywhere.
 * ITestBucketTool to test new tool without actually creating new
   buckets.
 * S3ATestUtils add methods to skip test suites/cases if store is/is not
   S3Express
 * Cutting down on "is this a S3Express bucket" logic to trailing --x-s3 string
   and not worrying about AZ naming logic. commented out relevant tests.
 * ITestTreewalkProblems validated against standard and S3Express stores

Outstanding

 * Distcp: tests show it fails. Proposed: release notes.

---

x-amz-checksum header not found when signing S3Express messages

This modifies the custom signer in ITestCustomSigner to be a subclass
of AwsS3V4Signer with a goal of preventing signing problems with
S3 Express stores.

----

RemoteFileChanged renaming multipart file

Maps 412 status code to RemoteFileChangedException

Modifies huge file tests
-Adds a check on etag match for stat vs list
-ITestS3AHugeFilesByteBufferBlocks renames parent dirs, rather than
 files, to replicate distcp better.

----

S3Express custom Signing cannot handle bulk delete

Copy custom signer into production JAR, so enable downstream testing

Extend ITestCustomSigner to cover more filesystem operations
- PUT
- POST
- COPY
- LIST
- Bulk delete through delete() and rename()
- list + abort multipart uploads

Suite is parameterized on bulk delete enabled/disabled.

To use the new signer for a full test run:

<property>
  <name>fs.s3a.custom.signers</name>
  <value>CustomSdkSigner:org.apache.hadoop.fs.s3a.auth.CustomSdkSigner</value>
</property>

<property>
  <name>fs.s3a.s3.signing-algorithm</name>
  <value>CustomSdkSigner</value>
</property>
This commit is contained in:
Steve Loughran 2023-12-01 14:16:33 +00:00 committed by GitHub
parent 071f850841
commit e221231e81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 2033 additions and 315 deletions

View File

@ -170,4 +170,15 @@ private CommonPathCapabilities() {
*/
public static final String LEASE_RECOVERABLE = "fs.capability.lease.recoverable";
/**
* Is this a store where parent directory listings are potentially inconsistent with
* direct list/getFileStatus calls?
* This can happen with Amazon S3 Express One Zone Storage when there are pending
* uploads under a path.
* Application code can use this flag to decide whether or not to treat
* FileNotFoundExceptions on treewalk as errors or something to downgrade.
* Value: {@value}.
*/
public static final String DIRECTORY_LISTING_INCONSISTENT =
"fs.capability.directory.listing.inconsistent";
}

View File

@ -77,6 +77,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
@ -2079,4 +2080,32 @@ public static void rename(FileSystem srcFs, Path src, Path dst,
final Options.Rename... options) throws IOException {
srcFs.rename(src, dst, options);
}
/**
* Method to call after a FNFE has been raised on a treewalk, so as to
* decide whether to throw the exception (default), or, if the FS
* supports inconsistent directory listings, to log and ignore it.
* If this returns then the caller should ignore the failure and continue.
* @param fs filesystem
* @param path path
* @param e exception caught
* @throws FileNotFoundException the exception passed in, if rethrown.
*/
public static void maybeIgnoreMissingDirectory(FileSystem fs,
Path path,
FileNotFoundException e) throws FileNotFoundException {
final boolean b;
try {
b = !fs.hasPathCapability(path, DIRECTORY_LISTING_INCONSISTENT);
} catch (IOException ex) {
// something went wrong; rethrow the existing exception
e.addSuppressed(ex);
throw e;
}
if (b) {
throw e;
}
LOG.info("Ignoring missing directory {}", path);
LOG.debug("Directory missing", e);
}
}

View File

@ -38,6 +38,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.FileUtil.maybeIgnoreMissingDirectory;
import static org.apache.hadoop.util.functional.RemoteIterators.cleanupRemoteIterator;
/**
@ -448,12 +449,16 @@ protected void postProcessPath(PathData item) throws IOException {
protected void recursePath(PathData item) throws IOException {
try {
depth++;
if (isSorted()) {
// use the non-iterative method for listing because explicit sorting is
// required. Iterators not guaranteed to return sorted elements
processPaths(item, item.getDirectoryContents());
} else {
processPaths(item, item.getDirectoryContentsIterator());
try {
if (isSorted()) {
// use the non-iterative method for listing because explicit sorting is
// required. Iterators not guaranteed to return sorted elements
processPaths(item, item.getDirectoryContents());
} else {
processPaths(item, item.getDirectoryContentsIterator());
}
} catch (FileNotFoundException e) {
maybeIgnoreMissingDirectory(item.fs, item.path, e);
}
} finally {
depth--;

View File

@ -26,6 +26,7 @@
import org.junit.Assert;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Shell;
import org.junit.After;
import org.junit.Before;
@ -129,7 +130,7 @@ public void testCreateFileWithNullName() throws IOException {
}
@Test
public void testCreateExistingFile() throws IOException {
public void testCreateExistingFile() throws Exception {
String fileName = "testCreateExistingFile";
Path testPath = qualifiedPath(fileName, fc2);
@ -140,15 +141,11 @@ public void testCreateExistingFile() throws IOException {
createFile(fc1, testPath);
// Create same file with fc1
try {
createFile(fc2, testPath);
Assert.fail("Create existing file should throw an IOException.");
} catch (IOException e) {
// expected
}
LambdaTestUtils.intercept(IOException.class, () ->
createFile(fc2, testPath));
// Ensure fc2 has the created file
Assert.assertTrue(exists(fc2, testPath));
fc2.getFileStatus(testPath);
}
@Test
@ -167,7 +164,7 @@ public void testCreateFileInNonExistingDirectory() throws IOException {
Assert.assertTrue(isDir(fc2, testPath.getParent()));
Assert.assertEquals("testCreateFileInNonExistingDirectory",
testPath.getParent().getName());
Assert.assertTrue(exists(fc2, testPath));
fc2.getFileStatus(testPath);
}
@ -216,10 +213,11 @@ public void testCreateDirectory() throws IOException {
// TestCase - Create multiple directories
String dirNames[] = {
"createTest/testDir", "createTest/test Dir",
"deleteTest/test*Dir", "deleteTest/test#Dir",
"deleteTest/test1234", "deleteTest/test_DIr",
"deleteTest/1234Test", "deleteTest/test)Dir",
"deleteTest/()&^%$#@!~_+}{><?", " ", "^ " };
"createTest/test*Dir", "createTest/test#Dir",
"createTest/test1234", "createTest/test_DIr",
"createTest/1234Test", "createTest/test)Dir",
"createTest/()&^%$#@!~_+}{><?",
"createTest/ ", "createTest/^ " };
for (String f : dirNames) {
if (!isTestableFileNameOnPlatform(f)) {
@ -237,6 +235,10 @@ public void testCreateDirectory() throws IOException {
Assert.assertTrue(exists(fc2, testPath));
Assert.assertTrue(isDir(fc2, testPath));
}
// delete the parent directory and verify that the dir no longer exists
final Path parent = qualifiedPath("createTest", fc2);
fc2.delete(parent, true);
Assert.assertFalse(exists(fc2, parent));
}
@ -392,7 +394,9 @@ public void testDeleteDirectory() throws IOException {
"deleteTest/test*Dir", "deleteTest/test#Dir",
"deleteTest/test1234", "deleteTest/1234Test",
"deleteTest/test)Dir", "deleteTest/test_DIr",
"deleteTest/()&^%$#@!~_+}{><?", " ", "^ " };
"deleteTest/()&^%$#@!~_+}{><?",
"deleteTest/ ",
"deleteTest/^ " };
for (String f : dirNames) {
if (!isTestableFileNameOnPlatform(f)) {

View File

@ -29,7 +29,6 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -100,22 +99,18 @@ public void testRmEmptyRootDirNonRecursive() throws Throwable {
final FileStatus[] originalChildren = listChildren(fs, root);
LambdaTestUtils.eventually(
OBJECTSTORE_RETRY_TIMEOUT,
new Callable<Void>() {
@Override
public Void call() throws Exception {
FileStatus[] deleted = deleteChildren(fs, root, true);
FileStatus[] children = listChildren(fs, root);
if (children.length > 0) {
fail(String.format(
"After %d attempts: listing after rm /* not empty"
+ "\n%s\n%s\n%s",
iterations.incrementAndGet(),
dumpStats("final", children),
() -> {
iterations.incrementAndGet();
FileStatus[] deleted = deleteChildren(fs, root, true);
FileStatus[] children = listChildren(fs, root);
Assertions.assertThat(children)
.describedAs("After %d attempts: listing after rm /* not empty"
+ "\ndeleted: %s\n: original %s",
iterations.get(),
dumpStats("deleted", deleted),
dumpStats("original", originalChildren)));
}
return null;
}
dumpStats("original", originalChildren))
.isEmpty();
return null;
},
new LambdaTestUtils.ProportionalRetryInterval(50, 1000));
// then try to delete the empty one

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathCapabilities;
@ -34,6 +35,7 @@
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.util.functional.FutureIO;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.AssumptionViolatedException;
import org.slf4j.Logger;
@ -62,6 +64,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
/**
* Utilities used across test cases.
@ -1507,19 +1510,39 @@ public static FileStatus getFileStatusEventually(FileSystem fs, Path path,
*/
public static TreeScanResults treeWalk(FileSystem fs, Path path)
throws IOException {
TreeScanResults dirsAndFiles = new TreeScanResults();
return treeWalk(fs, fs.getFileStatus(path), new TreeScanResults());
}
FileStatus[] statuses = fs.listStatus(path);
for (FileStatus status : statuses) {
LOG.info("{}{}", status.getPath(), status.isDirectory() ? "*" : "");
/**
* Recursively list all entries, with a depth first traversal of the
* directory tree.
* @param dir status of the dir to scan
* @return the scan results
* @throws IOException IO problems
* @throws FileNotFoundException if the dir is not found and this FS does not
* have the listing inconsistency path capability/flag.
*/
private static TreeScanResults treeWalk(FileSystem fs,
FileStatus dir,
TreeScanResults results) throws IOException {
Path path = dir.getPath();
try {
foreach(fs.listStatusIterator(path), (status) -> {
LOG.info("{}{}", status.getPath(), status.isDirectory() ? "*" : "");
if (status.isDirectory()) {
treeWalk(fs, status, results);
} else {
results.add(status);
}
});
// and ourselves
results.add(dir);
} catch (FileNotFoundException fnfe) {
FileUtil.maybeIgnoreMissingDirectory(fs, path, fnfe);
}
for (FileStatus status : statuses) {
dirsAndFiles.add(status);
if (status.isDirectory()) {
dirsAndFiles.add(treeWalk(fs, status.getPath()));
}
}
return dirsAndFiles;
return results;
}
/**
@ -1917,17 +1940,16 @@ public void assertEquivalent(TreeScanResults that) {
public void assertFieldsEquivalent(String fieldname,
TreeScanResults that,
List<Path> ours, List<Path> theirs) {
String ourList = pathsToString(ours);
String theirList = pathsToString(theirs);
assertFalse("Duplicate " + fieldname + " in " + this
+": " + ourList,
containsDuplicates(ours));
assertFalse("Duplicate " + fieldname + " in other " + that
+ ": " + theirList,
containsDuplicates(theirs));
assertTrue(fieldname + " mismatch: between " + ourList
+ " and " + theirList,
collectionsEquivalent(ours, theirs));
Assertions.assertThat(ours).
describedAs("list of %s", fieldname)
.doesNotHaveDuplicates();
Assertions.assertThat(theirs).
describedAs("list of %s in %s", fieldname, that)
.doesNotHaveDuplicates();
Assertions.assertThat(ours)
.describedAs("Elements of %s", fieldname)
.containsExactlyInAnyOrderElementsOf(theirs);
}
public List<Path> getFiles() {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
@ -56,6 +57,7 @@
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.FileUtil.maybeIgnoreMissingDirectory;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
@ -304,24 +306,28 @@ public Result call() throws Exception {
Result result = new Result();
result.fs = fs;
LOG.debug("ProcessInputDirCallable {}", fileStatus);
if (fileStatus.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter = fs
.listLocatedStatus(fileStatus.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
result.dirsNeedingRecursiveCalls.add(stat);
} else {
result.locatedFileStatuses.add(org.apache.hadoop.mapreduce.lib.
input.FileInputFormat.shrinkStatus(stat));
try {
if (fileStatus.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter = fs
.listLocatedStatus(fileStatus.getPath());
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (recursive && stat.isDirectory()) {
result.dirsNeedingRecursiveCalls.add(stat);
} else {
result.locatedFileStatuses.add(org.apache.hadoop.mapreduce.lib.
input.FileInputFormat.shrinkStatus(stat));
}
}
}
// aggregate any stats
result.stats = retrieveIOStatistics(iter);
} else {
result.locatedFileStatuses.add(fileStatus);
}
// aggregate any stats
result.stats = retrieveIOStatistics(iter);
} else {
result.locatedFileStatuses.add(fileStatus);
} catch (FileNotFoundException e) {
maybeIgnoreMissingDirectory(fs, fileStatus.getPath(), e);
}
return result;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.lib.input;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@ -48,6 +49,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.FileUtil.maybeIgnoreMissingDirectory;
/**
* A base class for file-based {@link InputFormat}s.
*
@ -356,16 +359,26 @@ private List<FileStatus> singleThreadedListStatus(JobContext job, Path[] dirs,
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(shrinkStatus(stat));
// FNFE exceptions are caught whether raised in the list call,
// or in the hasNext() or next() calls, where async reporting
// may take place.
try {
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
if (inputFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(shrinkStatus(stat));
}
}
}
} catch (FileNotFoundException e) {
// unless the store is capabile of list inconsistencies, rethrow.
// because this is recursive, the caller method may also end up catching
// and rethrowing, which is slighly inefficient but harmless.
maybeIgnoreMissingDirectory(fs, path, e);
}
}

View File

@ -1462,9 +1462,18 @@ private Constants() {
/**
* Stream supports multipart uploads to the given path.
*/
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED =
public static final String STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED =
"fs.s3a.capability.multipart.uploads.enabled";
/**
* Stream supports multipart uploads to the given path.
* This name is wrong, but it has shipped so must be
* retained.
*/
@Deprecated
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED
= STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED;
/**
* Prefetch max blocks count config.
* Value = {@value}
@ -1509,4 +1518,11 @@ private Constants() {
* Value: {@value}.
*/
public static final boolean OPTIMIZED_COPY_FROM_LOCAL_DEFAULT = true;
/**
* Is this a v2 SDK build? value {@value}.
*/
public static final String STORE_CAPABILITY_AWS_V2 =
"fs.s3a.capability.aws.v2";
}

View File

@ -259,7 +259,7 @@ public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
}
S3ListRequest request = createListObjectsRequest(key, "/", span);
LOG.debug("listStatus: doing listObjects for directory {}", key);
LOG.debug("listStatus: doing listObjects for directory \"{}\"", key);
// return the results obtained from s3.
return createFileStatusListingIterator(

View File

@ -220,6 +220,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Invoker.*;
@ -250,6 +251,8 @@
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.isS3ExpressStore;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.checkNoS3Guard;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_CONTINUE_LIST_REQUEST;
@ -289,11 +292,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
private URI uri;
private Path workingDir;
private String username;
private S3Client s3Client;
/** Async client is used for transfer manager and s3 select. */
private S3AsyncClient s3AsyncClient;
// initial callback policy is fail-once; it's there just to assist
// some mock tests and other codepaths trying to call the low level
// APIs on an uninitialized filesystem.
@ -470,6 +478,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean optimizedCopyFromLocal;
/**
* Is this an S3 Express store?
*/
private boolean s3ExpressStore;
/**
* Store endpoint from configuration info or access point ARN.
*/
private String endpoint;
/**
* Region from configuration info or access point ARN.
*/
private String configuredRegion;
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@ -581,9 +604,23 @@ public void initialize(URI name, Configuration originalConf)
//check but do not store the block size
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
// should the delete also purge uploads.
// determine and cache the endpoints
endpoint = accessPoint == null
? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)
: accessPoint.getEndpoint();
configuredRegion = accessPoint == null
? conf.getTrimmed(AWS_REGION)
: accessPoint.getRegion();
// is this an S3Express store?
s3ExpressStore = isS3ExpressStore(bucket, endpoint);
// should the delete also purge uploads?
// happens if explicitly enabled, or if the store is S3Express storage.
dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT);
s3ExpressStore);
this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
long prefetchBlockSizeLong =
@ -994,14 +1031,6 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class);
String endpoint = accessPoint == null
? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)
: accessPoint.getEndpoint();
String configuredRegion = accessPoint == null
? conf.getTrimmed(AWS_REGION)
: accessPoint.getRegion();
S3ClientFactory.S3ClientCreationParameters parameters =
new S3ClientFactory.S3ClientCreationParameters()
.withCredentialSet(credentials)
@ -1444,7 +1473,19 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
public boolean isMultipartCopyEnabled() {
return S3AFileSystem.this.isMultipartUploadEnabled;
}
}
@Override
public long abortMultipartUploads(final Path path) throws IOException {
final String prefix = pathToKey(path);
try (AuditSpan span = createSpan("object_multipart_bulk_abort", prefix, null)) {
return S3AFileSystem.this.abortMultipartUploadsUnderPrefix(
createStoreContext(),
span,
prefix);
}
}
} // end S3AInternals
/**
* Get the input policy for this FS instance.
@ -2486,19 +2527,34 @@ public RemoteIterator<S3AFileStatus> listObjects(
@Retries.RetryTranslated
public long abortMultipartUploadsUnderPrefix(String prefix)
throws IOException {
getAuditSpan().activate();
// this deactivates the audit span somehow
final RemoteIterator<MultipartUpload> uploads =
S3AFileSystem.this.listUploadsUnderPrefix(storeContext, prefix);
// so reactivate it.
getAuditSpan().activate();
return foreach(uploads, upload ->
invoker.retry("Aborting multipart commit", upload.key(), true, () ->
S3AFileSystem.this.abortMultipartUpload(upload)));
return S3AFileSystem.this.abortMultipartUploadsUnderPrefix(storeContext, auditSpan, prefix);
}
} // end OperationCallbacksImpl
/**
* Abort multipart uploads under a prefix.
* @param storeContext store context
* @param span span for the operations
* @param prefix prefix for uploads to abort
* @return a count of aborts
* @throws IOException trouble; FileNotFoundExceptions are swallowed.
*/
private long abortMultipartUploadsUnderPrefix(StoreContext storeContext,
AuditSpan span,
String prefix) throws IOException {
span.activate();
// this deactivates the audit span somehow
final RemoteIterator<MultipartUpload> uploads =
listUploadsUnderPrefix(storeContext, prefix);
// so reactivate it.
span.activate();
return foreach(uploads, upload ->
invoker.retry("Aborting multipart commit", upload.key(), true, () ->
abortMultipartUpload(upload)));
}
/**
* Callbacks from {@link Listing}.
* Auditing: the listing object is long-lived; the audit span
@ -3330,7 +3386,7 @@ private void removeKeysS3(
LOG.debug("Initiating delete operation for {} objects",
keysToDelete.size());
for (ObjectIdentifier objectIdentifier : keysToDelete) {
LOG.debug(" {} {}", objectIdentifier.key(),
LOG.debug(" \"{}\" {}", objectIdentifier.key(),
objectIdentifier.versionId() != null ? objectIdentifier.versionId() : "");
}
}
@ -5403,10 +5459,24 @@ public boolean hasPathCapability(final Path path, final String capability)
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
return true;
// multi object delete flag
case ENABLE_MULTI_DELETE:
return enableMultiObjectsDelete;
// Do directory operations purge uploads.
case DIRECTORY_OPERATIONS_PURGE_UPLOADS:
return dirOperationsPurgeUploads;
// this is a v2 sdk release.
case STORE_CAPABILITY_AWS_V2:
return true;
// is this store S3 Express?
// if so, note that directory listings may be inconsistent
case STORE_CAPABILITY_S3_EXPRESS_STORAGE:
case DIRECTORY_LISTING_INCONSISTENT:
return s3ExpressStore;
// etags are avaialable in listings, but they
// are not consistent across renames.
// therefore, only availability is declared

View File

@ -121,4 +121,14 @@ public interface S3AInternals {
* @return true if the transfer manager is used to copy files.
*/
boolean isMultipartCopyEnabled();
/**
* Abort multipart uploads under a path.
* @param path path to abort uploads under.
* @return a count of aborts
* @throws IOException trouble; FileNotFoundExceptions are swallowed.
*/
@AuditEntryPoint
@Retries.RetryTranslated
long abortMultipartUploads(Path path) throws IOException;
}

View File

@ -272,6 +272,11 @@ public static IOException translateException(@Nullable String operation,
}
break;
// Caused by duplicate create bucket call.
case SC_409_CONFLICT:
ioe = new AWSBadRequestException(message, ase);
break;
// this also surfaces sometimes and is considered to
// be ~ a not found exception.
case SC_410_GONE:
@ -282,12 +287,18 @@ public static IOException translateException(@Nullable String operation,
// errors which stores can return from requests which
// the store does not support.
case SC_405_METHOD_NOT_ALLOWED:
case SC_412_PRECONDITION_FAILED:
case SC_415_UNSUPPORTED_MEDIA_TYPE:
case SC_501_NOT_IMPLEMENTED:
ioe = new AWSUnsupportedFeatureException(message, s3Exception);
break;
// precondition failure: the object is there, but the precondition
// (e.g. etag) didn't match. Assume remote file change during
// rename or status passed in to openfile had an etag which didn't match.
case SC_412_PRECONDITION_FAILED:
ioe = new RemoteFileChangedException(path, message, "", ase);
break;
// out of range. This may happen if an object is overwritten with
// a shorter one while it is being read.
case SC_416_RANGE_NOT_SATISFIABLE:
@ -865,7 +876,7 @@ static String lookupPassword(Configuration conf, String key, String defVal)
*/
public static String stringify(S3Object s3Object) {
StringBuilder builder = new StringBuilder(s3Object.key().length() + 100);
builder.append(s3Object.key()).append(' ');
builder.append("\"").append(s3Object.key()).append("\" ");
builder.append("size=").append(s3Object.size());
return builder.toString();
}
@ -1648,4 +1659,5 @@ public String toString() {
public static String formatRange(long rangeStart, long rangeEnd) {
return String.format("bytes=%d-%d", rangeStart, rangeEnd);
}
}

View File

@ -24,6 +24,7 @@
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateSessionRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
@ -196,7 +197,8 @@ private RequestInfo writing(final String verb,
isRequestNotAlwaysInSpan(final Object request) {
return request instanceof UploadPartCopyRequest
|| request instanceof CompleteMultipartUploadRequest
|| request instanceof GetBucketLocationRequest;
|| request instanceof GetBucketLocationRequest
|| request instanceof CreateSessionRequest;
}
/**

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
import software.amazon.awssdk.auth.signer.internal.AbstractAwsS3V4Signer;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.signer.Signer;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.security.UserGroupInformation;
/**
* This class is for testing the SDK's signing: it
* can be declared as the signer class in the configuration
* and then the full test suite run with it.
* Derived from the inner class of {@code ITestCustomSigner}.
* <pre>
* fs.s3a.custom.signers=CustomSdkSigner:org.apache.hadoop.fs.s3a.auth.CustomSdkSigner
*
* fs.s3a.s3.signing-algorithm=CustomSdkSigner
* </pre>
*/
public class CustomSdkSigner extends AbstractAwsS3V4Signer implements Signer {
private static final Logger LOG = LoggerFactory
.getLogger(CustomSdkSigner.class);
private static final AtomicInteger INSTANTIATION_COUNT =
new AtomicInteger(0);
private static final AtomicInteger INVOCATION_COUNT =
new AtomicInteger(0);
/**
* Signer for all S3 requests.
*/
private final AwsS3V4Signer s3Signer = AwsS3V4Signer.create();
/**
* Signer for other services.
*/
private final Aws4Signer aws4Signer = Aws4Signer.create();
public CustomSdkSigner() {
int c = INSTANTIATION_COUNT.incrementAndGet();
LOG.info("Creating Signer #{}", c);
}
/**
* Method to sign the incoming request with credentials.
* <p>
* NOTE: In case of Client-side encryption, we do a "Generate Key" POST
* request to AWSKMS service rather than S3, this was causing the test to
* break. When this request happens, we have the endpoint in form of
* "kms.[REGION].amazonaws.com", and bucket-name becomes "kms". We can't
* use AWSS3V4Signer for AWSKMS service as it contains a header
* "x-amz-content-sha256:UNSIGNED-PAYLOAD", which returns a 400 bad
* request because the signature calculated by the service doesn't match
* what we sent.
* @param request the request to sign.
* @param executionAttributes request executionAttributes which contain the credentials.
*/
@Override
public SdkHttpFullRequest sign(SdkHttpFullRequest request,
ExecutionAttributes executionAttributes) {
int c = INVOCATION_COUNT.incrementAndGet();
String host = request.host();
LOG.debug("Signing request #{} against {}: class {}",
c, host, request.getClass());
String bucketName = parseBucketFromHost(host);
if (bucketName.equals("kms")) {
return aws4Signer.sign(request, executionAttributes);
} else {
return s3Signer.sign(request, executionAttributes);
}
}
/**
* Parse the bucket name from the host.
* @param host hostname
* @return the parsed bucket name; if "kms" is KMS signing.
*/
static String parseBucketFromHost(String host) {
String[] hostBits = host.split("\\.");
String bucketName = hostBits[0];
String service = hostBits[1];
if (bucketName.equals("kms")) {
return bucketName;
}
if (service.contains("s3-accesspoint") || service.contains("s3-outposts")
|| service.contains("s3-object-lambda")) {
// If AccessPoint then bucketName is of format `accessPoint-accountId`;
String[] accessPointBits = bucketName.split("-");
String accountId = accessPointBits[accessPointBits.length - 1];
// Extract the access point name from bucket name. eg: if bucket name is
// test-custom-signer-<accountId>, get the access point name test-custom-signer by removing
// -<accountId> from the bucket name.
String accessPointName =
bucketName.substring(0, bucketName.length() - (accountId.length() + 1));
Arn arn = Arn.builder()
.accountId(accountId)
.partition("aws")
.region(hostBits[2])
.resource("accesspoint" + "/" + accessPointName)
.service("s3").build();
bucketName = arn.toString();
}
return bucketName;
}
public static int getInstantiationCount() {
return INSTANTIATION_COUNT.get();
}
public static int getInvocationCount() {
return INVOCATION_COUNT.get();
}
public static String description() {
return "CustomSigner{"
+ "invocations=" + INVOCATION_COUNT.get()
+ ", instantiations=" + INSTANTIATION_COUNT.get()
+ "}";
}
public static class Initializer implements AwsSignerInitializer {
@Override
public void registerStore(
final String bucketName,
final Configuration storeConf,
final DelegationTokenProvider dtProvider,
final UserGroupInformation storeUgi) {
LOG.debug("Registering store for bucket {}", bucketName);
}
@Override
public void unregisterStore(final String bucketName,
final Configuration storeConf,
final DelegationTokenProvider dtProvider,
final UserGroupInformation storeUgi) {
LOG.debug("Unregistering store for bucket {}", bucketName);
}
}
}

View File

@ -263,7 +263,7 @@ protected void deleteDirectoryTree(final Path path,
final CompletableFuture<Long> abortUploads;
if (dirOperationsPurgeUploads) {
final StoreContext sc = getStoreContext();
final String key = sc.pathToKey(path) + "/";
final String key = dirKey;
LOG.debug("All uploads under {} will be deleted", key);
abortUploads = submit(sc.getExecutor(), sc.getActiveAuditSpan(), () ->
callbacks.abortMultipartUploadsUnderPrefix(key));
@ -439,14 +439,16 @@ private void asyncDeleteAction(
.filter(e -> e.isDirMarker)
.map(e -> e.objectIdentifier)
.collect(Collectors.toList());
LOG.debug("Deleting of {} directory markers", dirs.size());
// This is invoked with deleteFakeDir.
Invoker.once("Remove S3 Dir Markers",
status.getPath().toString(),
() -> callbacks.removeKeys(
dirs,
true
));
if (!dirs.isEmpty()) {
LOG.debug("Deleting {} directory markers", dirs.size());
// This is invoked with deleteFakeDir.
Invoker.once("Remove S3 Dir Markers",
status.getPath().toString(),
() -> callbacks.removeKeys(
dirs,
true
));
}
}
}
}

View File

@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -30,7 +31,24 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.s3a.Constants;
import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT;
import static org.apache.hadoop.fs.CommonPathCapabilities.ETAGS_AVAILABLE;
import static org.apache.hadoop.fs.CommonPathCapabilities.FS_CHECKSUMS;
import static org.apache.hadoop.fs.CommonPathCapabilities.FS_MULTIPART_UPLOADER;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER;
/**
* Internal constants private only to the S3A codebase.
@ -142,6 +160,9 @@ private InternalConstants() {
/** 405 status code: Method Not Allowed. */
public static final int SC_405_METHOD_NOT_ALLOWED = 405;
/** 409 status code: Conflict. Example: creating a bucket twice. */
public static final int SC_409_CONFLICT = 409;
/** 410 status code: Gone. */
public static final int SC_410_GONE = 410;
@ -239,6 +260,30 @@ private InternalConstants() {
*/
public static final Set<String> CREATE_FILE_KEYS =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(Constants.FS_S3A_CREATE_PERFORMANCE)));
new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE)));
/**
* Dynamic Path capabilities to be evaluated
* in the BucketInfo tool.
*/
public static final List<String> S3A_DYNAMIC_CAPABILITIES =
Collections.unmodifiableList(Arrays.asList(
ETAGS_AVAILABLE,
FS_CHECKSUMS,
FS_MULTIPART_UPLOADER,
DIRECTORY_LISTING_INCONSISTENT,
// s3 specific
STORE_CAPABILITY_AWS_V2,
STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP,
STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE,
STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE,
STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP,
STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE,
STORE_CAPABILITY_MAGIC_COMMITTER,
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED,
STORE_CAPABILITY_S3_EXPRESS_STORAGE,
FS_S3A_CREATE_PERFORMANCE_ENABLED,
DIRECTORY_OPERATIONS_PURGE_UPLOADS,
ENABLE_MULTI_DELETE));
}

View File

@ -97,6 +97,17 @@ public static void bindSSLChannelMode(Configuration conf,
}
}
/**
* Is this an AWS endpoint? looks at end of FQDN.
* @param endpoint endpoint
* @return true if the endpoint matches the requirements for an aws endpoint.
*/
public static boolean isAwsEndpoint(final String endpoint) {
return (endpoint.isEmpty()
|| endpoint.endsWith(".amazonaws.com")
|| endpoint.endsWith(".amazonaws.com.cn"));
}
/**
* Interface used to bind to the socket factory, allows the code which
* works with the shaded AWS libraries to exist in their own class.

View File

@ -644,7 +644,7 @@ public DeleteObjectsRequest.Builder newBulkDeleteRequestBuilder(
return prepareRequest(DeleteObjectsRequest
.builder()
.bucket(bucket)
.delete(d -> d.objects(keysToDelete).quiet(true)));
.delete(d -> d.objects(keysToDelete).quiet(!LOG.isTraceEnabled())));
}
@Override

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.isAwsEndpoint;
/**
* Anything needed to support Amazon S3 Express One Zone Storage.
* These have bucket names like {@code s3a://bucket--usw2-az2--x-s3/}
*/
public final class S3ExpressStorage {
/**
* Is this S3Express storage? value {@value}.
*/
public static final String STORE_CAPABILITY_S3_EXPRESS_STORAGE =
"fs.s3a.capability.s3express.storage";
/**
* What is the official product name? used for error messages and logging: {@value}.
*/
public static final String PRODUCT_NAME = "Amazon S3 Express One Zone Storage";
private S3ExpressStorage() {
}
/**
* Minimum length of a region.
*/
private static final int SUFFIX_LENGTH = "--usw2-az2--x-s3".length();
public static final int ZONE_LENGTH = "usw2-az2".length();
/**
* Suffix of S3Express storage bucket names..
*/
public static final String S3EXPRESS_STORE_SUFFIX = "--x-s3";
/**
* Is a bucket an S3Express store?
* This may get confused against third party stores, so takes the endpoint
* and only supports aws endpoints round the world.
* @param bucket bucket to probe
* @param endpoint endpoint string.
* @return true if the store is S3 Express.
*/
public static boolean isS3ExpressStore(String bucket, final String endpoint) {
return isAwsEndpoint(endpoint) && hasS3ExpressSuffix(bucket);
}
/**
* Check for a bucket name matching -does not look at endpoint.
* @param bucket bucket to probe.
* @return true if the suffix is present
*/
public static boolean hasS3ExpressSuffix(final String bucket) {
return bucket.endsWith(S3EXPRESS_STORE_SUFFIX);
}
}

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
import org.apache.hadoop.fs.s3a.select.SelectTool;
import org.apache.hadoop.fs.s3a.tools.BucketTool;
import org.apache.hadoop.fs.s3a.tools.MarkerTool;
import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.fs.statistics.IOStatistics;
@ -74,6 +75,7 @@
import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.FILESYSTEM_TEMP_PATH;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.S3A_DYNAMIC_CAPABILITIES;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.MULTIPART_UPLOAD_ABORTED;
@ -117,6 +119,7 @@ public abstract class S3GuardTool extends Configured implements Tool,
" [command] [OPTIONS] [s3a://BUCKET]\n\n" +
"Commands: \n" +
"\t" + BucketInfo.NAME + " - " + BucketInfo.PURPOSE + "\n" +
"\t" + BucketTool.NAME + " - " + BucketTool.PURPOSE + "\n" +
"\t" + MarkerTool.MARKERS + " - " + MarkerTool.PURPOSE + "\n" +
"\t" + SelectTool.NAME + " - " + SelectTool.PURPOSE + "\n" +
"\t" + Uploads.NAME + " - " + Uploads.PURPOSE + "\n";
@ -164,8 +167,19 @@ public abstract class S3GuardTool extends Configured implements Tool,
* @param opts any boolean options to support
*/
protected S3GuardTool(Configuration conf, String... opts) {
this(conf, 0, Integer.MAX_VALUE, opts);
}
/**
* Constructor a S3Guard tool with HDFS configuration.
* @param conf Configuration.
* @param min min number of args
* @param max max number of args
* @param opts any boolean options to support
*/
protected S3GuardTool(Configuration conf, int min, int max, String... opts) {
super(conf);
commandFormat = new CommandFormat(0, Integer.MAX_VALUE, opts);
commandFormat = new CommandFormat(min, max, opts);
}
/**
@ -208,7 +222,7 @@ long ageOptionsToMsec() {
return cliDelta;
}
protected void addAgeOptions() {
protected final void addAgeOptions() {
CommandFormat format = getCommandFormat();
format.addOptionWithValue(DAYS_FLAG);
format.addOptionWithValue(HOURS_FLAG);
@ -240,6 +254,22 @@ protected List<String> parseArgs(String[] args) {
return getCommandFormat().parse(args, 1);
}
/**
* Process the arguments.
* @param args raw args
* @return process arg list.
* @throws ExitUtil.ExitException if there's an unknown option.
*/
protected List<String> parseArgsWithErrorReporting(final String[] args)
throws ExitUtil.ExitException {
try {
return parseArgs(args);
} catch (CommandFormat.UnknownOptionException e) {
errorln(getUsage());
throw new ExitUtil.ExitException(EXIT_USAGE, e.getMessage(), e);
}
}
protected S3AFileSystem getFilesystem() {
return filesystem;
}
@ -273,7 +303,7 @@ protected void resetBindings() {
filesystem = null;
}
protected CommandFormat getCommandFormat() {
protected final CommandFormat getCommandFormat() {
return commandFormat;
}
@ -397,6 +427,7 @@ public int run(String[] args, PrintStream out)
Configuration conf = fs.getConf();
URI fsUri = fs.getUri();
println(out, "Filesystem %s", fsUri);
final Path root = new Path("/");
try {
println(out, "Location: %s", fs.getBucketLocation());
} catch (IOException e) {
@ -524,6 +555,13 @@ public int run(String[] args, PrintStream out)
processMarkerOption(out, fs,
getCommandFormat().getOptValue(MARKERS_FLAG));
// and check for capabilitities
println(out, "%nStore Capabilities");
for (String capability : S3A_DYNAMIC_CAPABILITIES) {
out.printf("\t%s %s%n", capability,
fs.hasPathCapability(root, capability));
}
println(out, "");
// and finally flush the output and report a success.
out.flush();
return SUCCESS;
@ -584,7 +622,7 @@ private String printOption(PrintStream out,
/**
* Command to list / abort pending multipart uploads.
*/
static class Uploads extends S3GuardTool {
static final class Uploads extends S3GuardTool {
public static final String NAME = "uploads";
public static final String ABORT = "abort";
public static final String LIST = "list";
@ -736,7 +774,7 @@ private boolean olderThan(MultipartUpload u, long msec) {
return ageDate.compareTo(Date.from(u.initiated())) >= 0;
}
private void processArgs(List<String> args, PrintStream out)
protected void processArgs(List<String> args, PrintStream out)
throws IOException {
CommandFormat commands = getCommandFormat();
String err = "Can only specify one of -" + LIST + ", " +
@ -947,9 +985,13 @@ public static int run(Configuration conf, String... args) throws
throw s3guardUnsupported();
}
switch (subCommand) {
case BucketInfo.NAME:
command = new BucketInfo(conf);
break;
case BucketTool.NAME:
command = new BucketTool(conf);
break;
case MarkerTool.MARKERS:
command = new MarkerTool(conf);
break;

View File

@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.tools;
import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.BucketType;
import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DataRedundancy;
import software.amazon.awssdk.services.s3.model.LocationInfo;
import software.amazon.awssdk.services.s3.model.LocationType;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool;
import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.ExitUtil;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.PRODUCT_NAME;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.Invoker.once;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.isAwsEndpoint;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.hasS3ExpressSuffix;
import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_BAD_CONFIGURATION;
import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_NOT_ACCEPTABLE;
import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_USAGE;
/**
* Bucket operations, e.g. create/delete/probe.
*/
public final class BucketTool extends S3GuardTool {
private static final Logger LOG = LoggerFactory.getLogger(BucketTool.class);
/**
* Name of this tool: {@value}.
*/
public static final String NAME = "bucket";
/**
* Purpose of this tool: {@value}.
*/
public static final String PURPOSE =
"View and manipulate S3 buckets";
/**
* create command.
*/
public static final String CREATE = "create";
/**
* region {@value}.
*/
public static final String OPT_REGION = "region";
/**
* endpoint {@value}.
*/
public static final String OPT_ENDPOINT = "endpoint";
/**
* Zone for a store.
*/
public static final String OPT_ZONE = "zone";
/**
* Error message if -zone is set but the name doesn't match.
* Value {@value}.
*/
static final String UNSUPPORTED_ZONE_ARG =
"The -zone option is only supported for " + PRODUCT_NAME;
/**
* Error message if the bucket is S3 Express but -zone wasn't set.
* Value {@value}.
*/
static final String NO_ZONE_SUPPLIED = "Required option -zone missing for "
+ PRODUCT_NAME + " bucket";
/**
* Error Message logged/thrown when the tool could not start as
* the bucket probe was not disabled and the probe (inevitably)
* failed.
*/
public static final String PROBE_FAILURE =
"Initialization failed because the bucket existence probe"
+ S3A_BUCKET_PROBE + " was not disabled. Check core-site settings.";
public BucketTool(final Configuration conf) {
super(conf, 1, 1,
CREATE);
CommandFormat format = getCommandFormat();
format.addOptionWithValue(OPT_REGION);
format.addOptionWithValue(OPT_ENDPOINT);
format.addOptionWithValue(OPT_ZONE);
}
public String getUsage() {
return "bucket "
+ "-" + CREATE + " "
+ "[-" + OPT_ENDPOINT + " <endpoint>] "
+ "[-" + OPT_REGION + " <region>] "
+ "[-" + OPT_ZONE + " <zone>] "
+ " <s3a-URL>";
}
public String getName() {
return NAME;
}
private Optional<String> getOptionalString(String key) {
String value = getCommandFormat().getOptValue(key);
return isNotEmpty(value) ? Optional.of(value) : Optional.empty();
}
@VisibleForTesting
int exec(final String...args) throws Exception {
return run(args, System.out);
}
@Override
public int run(final String[] args, final PrintStream out)
throws Exception, ExitUtil.ExitException {
LOG.debug("Supplied arguments: {}", String.join(", ", args));
final List<String> parsedArgs = parseArgsWithErrorReporting(args);
CommandFormat command = getCommandFormat();
boolean create = command.getOpt(CREATE);
Optional<String> endpoint = getOptionalString(OPT_ENDPOINT);
Optional<String> region = getOptionalString(OPT_REGION);
Optional<String> zone = getOptionalString(OPT_ZONE);
final String bucketPath = parsedArgs.get(0);
final Path source = new Path(bucketPath);
URI fsURI = source.toUri();
String bucket = fsURI.getHost();
println(out, "Filesystem %s", fsURI);
if (!"s3a".equals(fsURI.getScheme())) {
throw new ExitUtil.ExitException(EXIT_USAGE, "Filesystem is not S3A URL: " + fsURI);
}
println(out, "Options region=%s endpoint=%s zone=%s s3a://%s",
region.orElse("(unset)"),
endpoint.orElse("(unset)"),
zone.orElse("(unset)"),
bucket);
if (!create) {
errorln(getUsage());
println(out, "Supplied arguments: ["
+ String.join(", ", parsedArgs)
+ "]");
throw new ExitUtil.ExitException(EXIT_USAGE,
"required option not found: -create");
}
final Configuration conf = getConf();
removeBucketOverrides(bucket, conf,
S3A_BUCKET_PROBE,
REJECT_OUT_OF_SPAN_OPERATIONS,
AWS_REGION,
ENDPOINT);
// stop any S3 calls taking place against a bucket which
// may not exist
String bucketPrefix = "fs.s3a.bucket." + bucket + '.';
conf.setInt(bucketPrefix + S3A_BUCKET_PROBE, 0);
conf.setBoolean(bucketPrefix + REJECT_OUT_OF_SPAN_OPERATIONS, false);
// propagate an option
BiFunction<String, Optional<String>, Boolean> propagate = (key, opt) ->
opt.map(v -> {
conf.set(key, v);
LOG.info("{} = {}", key, v);
return true;
}).orElse(false);
propagate.apply(AWS_REGION, region);
propagate.apply(ENDPOINT, endpoint);
// fail fast on third party store
if (hasS3ExpressSuffix(bucket) && !isAwsEndpoint(endpoint.orElse(""))) {
throw new ExitUtil.ExitException(EXIT_NOT_ACCEPTABLE, UNSUPPORTED_ZONE_ARG);
}
S3AFileSystem fs;
try {
fs = (S3AFileSystem) FileSystem.newInstance(fsURI, conf);
} catch (FileNotFoundException e) {
// this happens if somehow the probe wasn't disabled.
errorln(PROBE_FAILURE);
throw new ExitUtil.ExitException(EXIT_BAD_CONFIGURATION, PROBE_FAILURE);
}
try {
// now build the configuration
final CreateBucketConfiguration.Builder builder = CreateBucketConfiguration.builder();
if (fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE)) {
// S3 Express store requires a zone and some other other settings
final String az = zone.orElseThrow(() ->
new ExitUtil.ExitException(EXIT_USAGE, NO_ZONE_SUPPLIED + bucket));
builder.location(LocationInfo.builder()
.type(LocationType.AVAILABILITY_ZONE).name(az).build())
.bucket(software.amazon.awssdk.services.s3.model.BucketInfo.builder()
.type(BucketType.DIRECTORY)
.dataRedundancy(DataRedundancy.SINGLE_AVAILABILITY_ZONE).build());
} else {
if (zone.isPresent()) {
throw new ExitUtil.ExitException(EXIT_USAGE, UNSUPPORTED_ZONE_ARG + " not " + bucket);
}
region.ifPresent(builder::locationConstraint);
}
final CreateBucketRequest request = CreateBucketRequest.builder()
.bucket(bucket)
.createBucketConfiguration(builder.build())
.build();
println(out, "Creating bucket %s", bucket);
final S3Client s3Client = fs.getS3AInternals().getAmazonS3Client(NAME);
try (DurationInfo ignored = new DurationInfo(LOG,
"Create %sbucket %s in region %s",
(fs.hasPathCapability(new Path("/"),
STORE_CAPABILITY_S3_EXPRESS_STORAGE) ? (PRODUCT_NAME + " "): ""),
bucket, region.orElse("(unset)"))) {
once("create", source.toString(), () ->
s3Client.createBucket(request));
}
} finally {
IOUtils.closeStream(fs);
}
return 0;
}
/**
* Remove any values from a bucket.
* @param bucket bucket whose overrides are to be removed. Can be null/empty
* @param conf config
* @param options list of fs.s3a options to remove
*/
public static void removeBucketOverrides(final String bucket,
final Configuration conf,
final String... options) {
if (StringUtils.isEmpty(bucket)) {
return;
}
final String bucketPrefix = "fs.s3a.bucket." + bucket + '.';
for (String option : options) {
final String stripped = option.substring("fs.s3a.".length());
String target = bucketPrefix + stripped;
String v = conf.get(target);
if (v != null) {
conf.unset(target);
}
String extended = bucketPrefix + option;
if (conf.get(extended) != null) {
conf.unset(extended);
}
}
}
}

View File

@ -237,13 +237,7 @@ public void resetBindings() {
public int run(final String[] args, final PrintStream stream)
throws ExitUtil.ExitException, Exception {
this.out = stream;
final List<String> parsedArgs;
try {
parsedArgs = parseArgs(args);
} catch (CommandFormat.UnknownOptionException e) {
errorln(getUsage());
throw new ExitUtil.ExitException(EXIT_USAGE, e.getMessage(), e);
}
final List<String> parsedArgs = parseArgsWithErrorReporting(args);
if (parsedArgs.size() != 1) {
errorln(getUsage());
println(out, "Supplied arguments: ["

View File

@ -288,6 +288,16 @@ public interface S3AInternals {
HeadObjectResponse getObjectMetadata(Path path) throws IOException;
AWSCredentialProviderList shareCredentials(final String purpose);
@AuditEntryPoint
@Retries.RetryTranslated
HeadBucketResponse getBucketMetadata() throws IOException;
boolean isMultipartCopyEnabled();
@AuditEntryPoint
@Retries.RetryTranslated
long abortMultipartUploads(Path path) throws IOException;
}
```

View File

@ -41,9 +41,18 @@ The features which may be unavailable include:
* List API to use (`fs.s3a.list.version = 1`)
* Bucket lifecycle rules to clean up pending uploads.
## Configuring s3a to connect to a third party store
### Disabling Change Detection
The (default) etag-based change detection logic expects stores to provide an Etag header in HEAD/GET requests,
and to support it as a precondition in subsequent GET and COPY calls.
If a store does not do this, disable the checks.
```xml
<property>
<name>fs.s3a.change.detection.mode</name>
<value>none</value>
</property>
```
## Connecting to a third party object store over HTTPS
The core setting for a third party store is to change the endpoint in `fs.s3a.endpoint`.

View File

@ -49,6 +49,7 @@ public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
skipIfEncryptionTestsDisabled(conf);
S3ATestUtils.disableFilesystemCaching(conf);
patchConfigurationEncryptionSettings(conf);
return conf;

View File

@ -131,7 +131,7 @@ public void testEndpoint() throws Exception {
S3ATestConstants.CONFIGURATION_TEST_ENDPOINT, "");
if (endpoint.isEmpty()) {
LOG.warn("Custom endpoint test skipped as " +
S3ATestConstants.CONFIGURATION_TEST_ENDPOINT + "config " +
S3ATestConstants.CONFIGURATION_TEST_ENDPOINT + " config " +
"setting was not detected");
} else {
conf.set(Constants.ENDPOINT, endpoint);

View File

@ -49,6 +49,7 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
@ -104,7 +105,6 @@ public ITestS3AEncryptionSSEC(final String name,
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
disableFilesystemCaching(conf);
String bucketName = getTestBucketName(conf);
// directory marker options
removeBaseAndBucketOverrides(bucketName, conf,

View File

@ -19,18 +19,12 @@
package org.apache.hadoop.fs.s3a;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetBucketEncryptionRequest;
import software.amazon.awssdk.services.s3.model.GetBucketEncryptionResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
@ -45,7 +39,6 @@
import org.apache.hadoop.fs.store.EtagChecksum;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
@ -55,7 +48,6 @@
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.XA_ETAG;
import static org.hamcrest.Matchers.nullValue;
/**
* Tests of the S3A FileSystem which don't have a specific home and can share
@ -153,39 +145,6 @@ Path mkFile(String name, byte[] data) throws IOException {
return f;
}
/**
* The assumption here is that 0-byte files uploaded in a single PUT
* always have the same checksum, including stores with encryption.
* This will be skipped if the bucket has S3 default encryption enabled.
* @throws Throwable on a failure
*/
@Test
public void testEmptyFileChecksums() throws Throwable {
assumeNoDefaultEncryption();
final S3AFileSystem fs = getFileSystem();
Path file1 = touchFile("file1");
EtagChecksum checksum1 = fs.getFileChecksum(file1, 0);
LOG.info("Checksum for {}: {}", file1, checksum1);
assertHasPathCapabilities(fs, file1,
CommonPathCapabilities.FS_CHECKSUMS);
assertNotNull("Null file 1 checksum", checksum1);
assertNotEquals("file 1 checksum", 0, checksum1.getLength());
assertEquals("checksums of empty files", checksum1,
fs.getFileChecksum(touchFile("file2"), 0));
Assertions.assertThat(fs.getXAttr(file1, XA_ETAG))
.describedAs("etag from xattr")
.isEqualTo(checksum1.getBytes());
}
/**
* Skip a test if we can get the default encryption on a bucket and it is
* non-null.
*/
private void assumeNoDefaultEncryption() throws IOException {
skipIfClientSideEncryption();
Assume.assumeThat(getDefaultEncryption(), nullValue());
}
/**
* Make sure that when checksums are disabled, the caller
* gets null back.
@ -226,24 +185,6 @@ public void testNonEmptyFileChecksums() throws Throwable {
.isEqualTo(checksum1.getBytes());
}
/**
* Verify that on an unencrypted store, the checksum of two non-empty
* (single PUT) files is the same if the data is the same.
* This will be skipped if the bucket has S3 default encryption enabled.
* @throws Throwable failure
*/
@Test
public void testNonEmptyFileChecksumsUnencrypted() throws Throwable {
Assume.assumeTrue(encryptionAlgorithm().equals(S3AEncryptionMethods.NONE));
assumeNoDefaultEncryption();
final S3AFileSystem fs = getFileSystem();
final EtagChecksum checksum1 =
fs.getFileChecksum(mkFile("file5", HELLO), 0);
assertNotNull("file 3 checksum", checksum1);
assertEquals("checksums", checksum1,
fs.getFileChecksum(mkFile("file6", HELLO), 0));
}
private S3AEncryptionMethods encryptionAlgorithm() {
return getFileSystem().getS3EncryptionAlgorithm();
}
@ -399,22 +340,4 @@ private static <T> T verifyNoTrailingSlash(String role, T o) {
return o;
}
/**
* Gets default encryption settings for the bucket or returns null if default
* encryption is disabled.
*/
private GetBucketEncryptionResponse getDefaultEncryption() throws IOException {
S3AFileSystem fs = getFileSystem();
S3Client s3 = getS3AInternals().getAmazonS3Client("check default encryption");
try (AuditSpan s = span()){
return Invoker.once("getBucketEncryption()",
fs.getBucket(),
() -> s3.getBucketEncryption(GetBucketEncryptionRequest.builder()
.bucket(fs.getBucket())
.build()));
} catch (FileNotFoundException | AccessDeniedException | AWSBadRequestException e) {
return null;
}
}
}

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.slf4j.Logger;
@ -35,22 +34,31 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertFileHasLength;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
/**
* Utilities for S3A multipart upload tests.
*/
public final class MultipartTestUtils {
private static final Logger LOG = LoggerFactory.getLogger(
MultipartTestUtils.class);
/**
* Target file of {@code createMagicFile()}.
*/
public static final String MAGIC_FILE_TARGET = "subdir/file.txt";
/** Not instantiated. */
private MultipartTestUtils() { }
@ -94,24 +102,10 @@ public static IdKey createPartUpload(S3AFileSystem fs, String key, int len,
/** Delete any uploads under given path (recursive). Silent on failure. */
public static void clearAnyUploads(S3AFileSystem fs, Path path) {
String key = fs.pathToKey(path);
AuditSpan span = null;
try {
RemoteIterator<MultipartUpload> uploads = fs.listUploads(key);
span = fs.createSpan("multipart", path.toString(), null);
final WriteOperationHelper helper
= fs.getWriteOperationHelper();
while (uploads.hasNext()) {
MultipartUpload upload = uploads.next();
LOG.debug("Cleaning up upload: {} {}", upload.key(),
truncatedUploadId(upload.uploadId()));
helper.abortMultipartUpload(upload.key(),
upload.uploadId(), true, LOG_EVENT);
}
fs.getS3AInternals().abortMultipartUploads(path);
} catch (IOException ioe) {
LOG.info("Ignoring exception: ", ioe);
} finally {
IOUtils.closeStream(span);
}
}
@ -165,6 +159,33 @@ private static String truncatedUploadId(String fullId) {
return fullId.substring(0, 12) + " ...";
}
/**
* Given a dir, return the name of the magic subdir.
* The naming has changed across versions; this isolates
* the changes.
* @param dir directory
* @return the magic subdir
*/
public static Path magicPath(Path dir) {
return new Path(dir, MAGIC_PATH_PREFIX + "001/");
}
/**
* Create a magic file of "real" length more than 0 bytes long.
* @param fs filesystem
* @param dir directory
* @return the path
* @throws IOException creation failure.p
*/
public static Path createMagicFile(final S3AFileSystem fs, final Path dir) throws IOException {
Path magicFile = new Path(magicPath(dir), "__base/" + MAGIC_FILE_TARGET);
createFile(fs, magicFile, true, "123".getBytes(StandardCharsets.UTF_8));
// the file exists but is a 0 byte marker file.
assertFileHasLength(fs, magicFile, 0);
return magicFile;
}
/** Struct of object key, upload ID. */
public static class IdKey {
private String key;

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.EtagSource;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
@ -37,6 +38,7 @@
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.NetworkBinding;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
@ -56,6 +58,8 @@
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.FutureIO;
@ -90,6 +94,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.test.GenericTestUtils.buildPaths;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
@ -99,6 +104,8 @@
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.buildEncryptionSecrets;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;
import static org.junit.Assert.*;
/**
@ -982,6 +989,54 @@ public static List<Path> createDirsAndFiles(final FileSystem fs,
}
}
/**
* Given a RemoteIterator to a list of file statuses, return a list of paths.
* @param i iterator
* @return list of paths
* @param <T> type of status
* @throws IOException failure retrieving values from the iterator
*/
public static <T extends FileStatus> List<Path> toPathList(RemoteIterator<T> i)
throws IOException {
return toList(mappingRemoteIterator(i, FileStatus::getPath));
}
/**
* Expect an error code from the exception.
* @param code error code
* @param error exception
*/
public static void expectErrorCode(final int code, final ExitUtil.ExitException error) {
if (error.getExitCode() != code) {
throw error;
}
}
/**
* Require a test case to be against Amazon S3 Express store.
*/
public static void assumeS3ExpressFileSystem(final FileSystem fs) throws IOException {
assume("store is not S3 Express: " + fs.getUri(),
fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE));
}
/**
* Require a test case to be against a standard S3 store.
*/
public static void assumeNotS3ExpressFileSystem(final FileSystem fs) throws IOException {
assume("store is S3 Express: " + fs.getUri(),
!fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE));
}
/**
* Require a store to be hosted by Amazon -i.e. not a third party store.
*/
public static void assumeStoreAwsHosted(final FileSystem fs) {
assume("store is not AWS S3",
!NetworkBinding.isAwsEndpoint(fs.getConf()
.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)));
}
/**
* Helper class to do diffs of metrics.
*/
@ -1573,5 +1628,26 @@ public static boolean isCreatePerformanceEnabled(FileSystem fs)
return fs.hasPathCapability(new Path("/"), FS_S3A_CREATE_PERFORMANCE_ENABLED);
}
/**
* Is the filesystem connector bonded to S3Express storage?
* @param fs filesystem.
* @return true if the store has the relevant path capability.
* @throws IOException IO failure
*/
public static boolean isS3ExpressStorage(FileSystem fs) throws IOException {
return fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE);
}
/**
* Get an etag from a FileStatus which must implement
* the {@link EtagSource} interface -which S3AFileStatus does.
*
* @param status the status.
* @return the etag
*/
public static String etag(FileStatus status) {
Preconditions.checkArgument(status instanceof EtagSource,
"Not an EtagSource: %s", status);
return ((EtagSource) status).getEtag();
}
}

View File

@ -20,14 +20,18 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import software.amazon.awssdk.arns.Arn;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
import software.amazon.awssdk.auth.signer.internal.AbstractAwsS3V4Signer;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.signer.Signer;
import software.amazon.awssdk.http.SdkHttpFullRequest;
@ -38,8 +42,10 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
@ -48,12 +54,18 @@
import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
/**
* Tests for custom Signers and SignerInitializers.
* Because the v2 sdk has had some problems with bulk delete
* and custom signing, this suite is parameterized.
*/
@RunWith(Parameterized.class)
public class ITestCustomSigner extends AbstractS3ATestBase {
private static final Logger LOG = LoggerFactory
@ -62,10 +74,33 @@ public class ITestCustomSigner extends AbstractS3ATestBase {
private static final String TEST_ID_KEY = "TEST_ID_KEY";
private static final String TEST_REGION_KEY = "TEST_REGION_KEY";
/**
* Parameterization.
*/
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{"bulk delete", true},
{"simple-delete", false},
});
}
private final boolean bulkDelete;
private final UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("user1");
private final UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser("user2");
private String regionName;
private String endpoint;
public ITestCustomSigner(
final String ignored,
final boolean bulkDelete) {
this.bulkDelete = bulkDelete;
}
@Override
public void setup() throws Exception {
super.setup();
@ -79,6 +114,17 @@ public void setup() throws Exception {
}
LOG.info("Determined region name to be [{}] for bucket [{}]", regionName,
fs.getBucket());
CustomSignerInitializer.reset();
}
/**
* Teardown closes all filesystems for the test UGIs.
*/
@Override
public void teardown() throws Exception {
super.teardown();
FileSystem.closeAllForUGI(ugi1);
FileSystem.closeAllForUGI(ugi2);
}
@Test
@ -86,12 +132,10 @@ public void testCustomSignerAndInitializer()
throws IOException, InterruptedException {
final Path basePath = path(getMethodName());
UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("user1");
FileSystem fs1 = runMkDirAndVerify(ugi1,
FileSystem fs1 = runStoreOperationsAndVerify(ugi1,
new Path(basePath, "customsignerpath1"), "id1");
UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser("user2");
FileSystem fs2 = runMkDirAndVerify(ugi2,
FileSystem fs2 = runStoreOperationsAndVerify(ugi2,
new Path(basePath, "customsignerpath2"), "id2");
Assertions.assertThat(CustomSignerInitializer.knownStores.size())
@ -104,14 +148,15 @@ public void testCustomSignerAndInitializer()
.as("Num registered stores mismatch").isEqualTo(0);
}
private FileSystem runMkDirAndVerify(UserGroupInformation ugi,
private S3AFileSystem runStoreOperationsAndVerify(UserGroupInformation ugi,
Path finalPath, String identifier)
throws IOException, InterruptedException {
Configuration conf = createTestConfig(identifier);
return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> {
return ugi.doAs((PrivilegedExceptionAction<S3AFileSystem>) () -> {
int instantiationCount = CustomSigner.getInstantiationCount();
int invocationCount = CustomSigner.getInvocationCount();
FileSystem fs = finalPath.getFileSystem(conf);
S3AFileSystem fs = (S3AFileSystem)finalPath.getFileSystem(conf);
fs.mkdirs(finalPath);
Assertions.assertThat(CustomSigner.getInstantiationCount())
.as("CustomSigner Instantiation count lower than expected")
@ -130,6 +175,22 @@ private FileSystem runMkDirAndVerify(UserGroupInformation ugi,
.as("Configuration TEST_KEY mismatch in %s", CustomSigner.description())
.isEqualTo(identifier);
// now do some more operations to make sure all is good.
final Path subdir = new Path(finalPath, "year=1970/month=1/day=1");
fs.mkdirs(subdir);
final Path file1 = new Path(subdir, "file1");
ContractTestUtils.touch(fs, new Path(subdir, "file1"));
fs.listStatus(subdir);
fs.delete(file1, false);
ContractTestUtils.touch(fs, new Path(subdir, "file1"));
// create a magic file.
createMagicFile(fs, subdir);
ContentSummary summary = fs.getContentSummary(finalPath);
fs.getS3AInternals().abortMultipartUploads(subdir);
fs.rename(subdir, new Path(finalPath, "renamed"));
fs.delete(finalPath, true);
return fs;
});
}
@ -143,10 +204,15 @@ private FileSystem runMkDirAndVerify(UserGroupInformation ugi,
private Configuration createTestConfig(String identifier) {
Configuration conf = createConfiguration();
removeBaseAndBucketOverrides(conf,
CUSTOM_SIGNERS,
SIGNING_ALGORITHM_S3,
ENABLE_MULTI_DELETE);
conf.set(CUSTOM_SIGNERS,
"CustomS3Signer:" + CustomSigner.class.getName() + ":"
+ CustomSignerInitializer.class.getName());
conf.set(SIGNING_ALGORITHM_S3, "CustomS3Signer");
conf.setBoolean(ENABLE_MULTI_DELETE, bulkDelete);
conf.set(TEST_ID_KEY, identifier);
conf.set(TEST_REGION_KEY, regionName);
@ -162,7 +228,7 @@ private String determineRegion(String bucketName) throws IOException {
}
@Private
public static final class CustomSigner implements Signer {
public static final class CustomSigner extends AbstractAwsS3V4Signer implements Signer {
private static final AtomicInteger INSTANTIATION_COUNT =
@ -202,7 +268,8 @@ public SdkHttpFullRequest sign(SdkHttpFullRequest request,
try {
lastStoreValue = CustomSignerInitializer
.getStoreValue(bucketName, UserGroupInformation.getCurrentUser());
LOG.info("Store value for bucket {} is {}", bucketName, lastStoreValue);
LOG.info("Store value for bucket {} is {}", bucketName,
(lastStoreValue == null) ? "unset" : "set");
} catch (IOException e) {
throw new RuntimeException("Failed to get current Ugi " + e, e);
}
@ -216,35 +283,7 @@ public SdkHttpFullRequest sign(SdkHttpFullRequest request,
}
private String parseBucketFromHost(String host) {
String[] hostBits = host.split("\\.");
String bucketName = hostBits[0];
String service = hostBits[1];
if (bucketName.equals("kms")) {
return bucketName;
}
if (service.contains("s3-accesspoint") || service.contains("s3-outposts")
|| service.contains("s3-object-lambda")) {
// If AccessPoint then bucketName is of format `accessPoint-accountId`;
String[] accessPointBits = bucketName.split("-");
String accountId = accessPointBits[accessPointBits.length - 1];
// Extract the access point name from bucket name. eg: if bucket name is
// test-custom-signer-<accountId>, get the access point name test-custom-signer by removing
// -<accountId> from the bucket name.
String accessPointName =
bucketName.substring(0, bucketName.length() - (accountId.length() + 1));
Arn arn = Arn.builder()
.accountId(accountId)
.partition("aws")
.region(hostBits[2])
.resource("accesspoint" + "/" + accessPointName)
.service("s3").build();
bucketName = arn.toString();
}
return bucketName;
return CustomSdkSigner.parseBucketFromHost(host);
}
public static int getInstantiationCount() {
@ -287,6 +326,13 @@ public void unregisterStore(String bucketName, Configuration storeConf,
knownStores.remove(storeKey);
}
/**
* Clear the stores; invoke during test setup.
*/
public static void reset() {
knownStores.clear();
}
public static StoreValue getStoreValue(String bucketName,
UserGroupInformation ugi) {
StoreKey storeKey = new StoreKey(bucketName, ugi);

View File

@ -620,8 +620,10 @@ public void testBulkCommitFiles() throws Throwable {
commits.add(commit1);
}
assertPathDoesNotExist("destination dir", destDir);
assertPathDoesNotExist("subdirectory", subdir);
if (!isS3ExpressStorage(fs)) {
assertPathDoesNotExist("destination dir", destDir);
assertPathDoesNotExist("subdirectory", subdir);
}
LOG.info("Initiating commit operations");
try (CommitContext commitContext
= actions.createCommitContextForTesting(destDir, JOB_ID, 0)) {

View File

@ -0,0 +1,463 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import software.amazon.awssdk.services.s3.model.MultipartUpload;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.util.DistCpTestUtils;
import static org.apache.hadoop.fs.CommonPathCapabilities.DIRECTORY_LISTING_INCONSISTENT;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.assertNoUploadsAt;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.magicPath;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.toPathList;
import static org.apache.hadoop.fs.s3a.S3AUtils.HIDDEN_FILE_FILTER;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
import static org.apache.hadoop.util.ToolRunner.run;
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromArray;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;
/**
* Test behavior of treewalking when there are pending
* uploads. All commands MUST work.
* Currently, the only one which doesn't is distcp;
* some tests do have different assertions about directories
* found.
*/
public class ITestTreewalkProblems extends AbstractS3ACostTest {
/**
* Exit code to expect on a shell failure.
*/
public static final int SHELL_FAILURE = 1;
/**
* Are directory listings potentially inconsistent?
*/
private boolean listingInconsistent;
@Override
public Configuration createConfiguration() {
final Configuration conf = super.createConfiguration();
removeBaseAndBucketOverrides(conf,
DIRECTORY_OPERATIONS_PURGE_UPLOADS,
MAGIC_COMMITTER_ENABLED);
conf.setBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, true);
conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
return conf;
}
@Override
public void setup() throws Exception {
super.setup();
final S3AFileSystem fs = getFileSystem();
final Path path = methodPath();
assertHasPathCapabilities(fs, path, DIRECTORY_OPERATIONS_PURGE_UPLOADS);
listingInconsistent = fs.hasPathCapability(path, DIRECTORY_LISTING_INCONSISTENT);
clearAnyUploads(fs, path);
}
@Test
public void testListFilesDeep() throws Throwable {
final S3AFileSystem fs = getFileSystem();
final Path src = createDirWithUpload();
LOG.info("listFiles({}, true)", src);
foreach(fs.listFiles(src, true), e -> LOG.info("{}", e));
LOG.info("listFiles({}, true)", src);
foreach(fs.listLocatedStatus(src), e -> LOG.info("{}", e));
// and just verify a cleanup works
Assertions.assertThat(fs.getS3AInternals().abortMultipartUploads(src))
.describedAs("Aborted uploads under %s", src)
.isEqualTo(1);
assertNoUploadsAt(fs, src);
}
/**
* Create a directory methodPath()/src with a magic upload underneath,
* with the upload pointing at {@code src/subdir/file.txt}.
* @return the directory created
* @throws IOException creation problems
*/
private Path createDirWithUpload() throws IOException {
final S3AFileSystem fs = getFileSystem();
final Path src = new Path(methodPath(), "src");
// create a magic file.
createMagicFile(fs, src);
fs.delete(magicPath(src), true);
return src;
}
@Test
public void testLocatedFileStatusFetcher() throws Throwable {
describe("Validate mapreduce LocatedFileStatusFetcher");
final Path src = createDirWithUpload();
Configuration listConfig = new Configuration(getConfiguration());
listConfig.setInt(LIST_STATUS_NUM_THREADS, 2);
LocatedFileStatusFetcher fetcher =
new LocatedFileStatusFetcher(listConfig, new Path[]{src}, true, HIDDEN_FILE_FILTER, true);
Assertions.assertThat(fetcher.getFileStatuses()).hasSize(0);
}
@Test
public void testGetContentSummaryDirsAndFiles() throws Throwable {
describe("FileSystem.getContentSummary()");
final S3AFileSystem fs = getFileSystem();
final Path src = createDirWithUpload();
fs.mkdirs(new Path(src, "child"));
final Path path = methodPath();
file(new Path(path, "file"));
final int dirs = listingInconsistent ? 3 : 3;
assertContentSummary(path, dirs, 1);
}
/**
* Execute getContentSummary() down a directory tree which only
* contains a single real directory.
* This test case has been a bit inconsistent between different store
* types.
*/
@Test
public void testGetContentSummaryPendingDir() throws Throwable {
describe("FileSystem.getContentSummary() with pending dir");
assertContentSummary(createDirWithUpload(), 1, 0);
}
/**
* Make an assertions about the content summary of a path.
* @param path path to scan
* @param dirs number of directories to find.
* @param files number of files to find
* @throws IOException scanning problems
*/
private void assertContentSummary(
final Path path,
final int dirs,
final int files) throws IOException {
ContentSummary summary = getFileSystem().getContentSummary(path);
Assertions.assertThat(summary.getDirectoryCount())
.describedAs("dir count under %s of %s", path, summary)
.isEqualTo(dirs);
Assertions.assertThat(summary.getFileCount())
.describedAs("filecount count under %s of %s", path, summary)
.isEqualTo(files);
}
/**
* Execute getContentSummary() down a directory tree which only
* contains a single real directory.
*/
@Test
public void testGetContentSummaryFiles() throws Throwable {
describe("FileSystem.getContentSummary()");
final S3AFileSystem fs = getFileSystem();
final Path src = createDirWithUpload();
fs.mkdirs(new Path(src, "child"));
final Path base = methodPath();
touch(fs, new Path(base, "file"));
assertContentSummary(base, 3, 1);
}
/**
* Test all the various filesystem.list* calls.
* Bundled into one test case to reduce setup/teardown overhead.
*/
@Test
public void testListStatusOperations() throws Throwable {
describe("FileSystem liststtus calls");
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
final Path src = createDirWithUpload();
final Path file = new Path(base, "file");
final Path dir2 = new Path(base, "dir2");
// path in a child dir
final Path childfile = new Path(dir2, "childfile");
file(childfile);
file(file);
fs.mkdirs(dir2);
Assertions.assertThat(toPathList(fs.listStatusIterator(base)))
.describedAs("listStatusIterator(%s)", base)
.contains(src, dir2, file);
Assertions.assertThat(toPathList(remoteIteratorFromArray(fs.listStatus(base))))
.describedAs("listStatusIterator(%s, false)", false)
.contains(src, dir2, file);
Assertions.assertThat(toPathList(fs.listFiles(base, false)))
.describedAs("listfiles(%s, false)", false)
.containsExactly(file);
Assertions.assertThat(toPathList(fs.listFiles(base, true)))
.describedAs("listfiles(%s, true)", false)
.containsExactlyInAnyOrder(file, childfile);
Assertions.assertThat(toPathList(fs.listLocatedStatus(base, (p) -> true)))
.describedAs("listLocatedStatus(%s, true)", false)
.contains(src, dir2, file);
Assertions.assertThat(toPathList(fs.listLocatedStatus(base)))
.describedAs("listLocatedStatus(%s, true)", false)
.contains(src, dir2, file);
}
@Test
public void testShellList() throws Throwable {
describe("Validate hadoop fs -ls sorted and unsorted");
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
createDirWithUpload();
fs.mkdirs(new Path(base, "dir2"));
// recursive not sorted
shell(base, "-ls", "-R", base.toUri().toString());
// recursive sorted
shell(base, "-ls", "-R", "-S", base.toUri().toString());
}
@Test
public void testShellDu() throws Throwable {
describe("Validate hadoop fs -du");
final Path base = methodPath();
createDirWithUpload();
shell(base, "-du", base.toUri().toString());
}
@Test
public void testShellDf() throws Throwable {
describe("Validate hadoop fs -df");
final Path base = methodPath();
final String p = base.toUri().toString();
shell(SHELL_FAILURE, base, "-df", p);
createDirWithUpload();
shell(base, "-df", p);
}
@Test
public void testShellFind() throws Throwable {
describe("Validate hadoop fs -ls -R");
final Path base = methodPath();
final String p = base.toUri().toString();
shell(SHELL_FAILURE, base, "-find", p, "-print");
createDirWithUpload();
shell(base, "-find", p, "-print");
}
@Test
public void testDistCp() throws Throwable {
describe("Validate distcp");
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
final Path src = createDirWithUpload();
final Path dest = new Path(base, "dest");
file(new Path(src, "real-file"));
// distcp fails if uploads are visible
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(),
"-useiterator -update -delete -direct", getConfiguration());
}
@Test
public void testDistCpNoIterator() throws Throwable {
describe("Validate distcp");
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
final Path src = createDirWithUpload();
final Path dest = new Path(base, "dest");
file(new Path(src, "real-file"));
// distcp fails if uploads are visible
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, src.toString(), dest.toString(),
"-update -delete -direct", getConfiguration());
}
/**
* CTU is also doing treewalking, though it's test only.
*/
@Test
public void testContractTestUtilsTreewalk() throws Throwable {
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
createDirWithUpload();
final ContractTestUtils.TreeScanResults treeWalk = treeWalk(fs, base);
ContractTestUtils.TreeScanResults listing =
new ContractTestUtils.TreeScanResults(fs.listFiles(base, true));
treeWalk.assertFieldsEquivalent("treewalk vs listFiles(/, true)", listing, treeWalk.getFiles(),
listing.getFiles());
}
/**
* Globber is already resilient to missing directories; a relic
* of the time when HEAD requests on s3 objects could leave the
* 404 in S3 front end cache.
*/
@Test
public void testGlobberTreewalk() throws Throwable {
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
final Path src = createDirWithUpload();
// this is the pending dir
final Path subdir = new Path(src, "subdir/");
final Path dest = new Path(base, "dest");
final Path monday = new Path(dest, "day=monday");
final Path realFile = file(new Path(monday, "real-file.parquet"));
assertGlob(fs, new Path(base, "*/*/*.parquet"), realFile);
if (listingInconsistent) {
assertGlob(fs, new Path(base, "*"), src, dest);
assertGlob(fs, new Path(base, "*/*"), subdir, monday);
} else {
assertGlob(fs, new Path(base, "*"), src, dest);
assertGlob(fs, new Path(base, "*/*"), monday);
}
}
private static void assertGlob(final S3AFileSystem fs,
final Path pattern,
final Path... expected)
throws IOException {
final FileStatus[] globbed = fs.globStatus(pattern,
(f) -> true);
final List<Path> paths = Arrays.stream(globbed).map(s -> s.getPath())
.collect(Collectors.toList());
Assertions.assertThat(paths)
.describedAs("glob(%s)", pattern)
.containsExactlyInAnyOrder(expected);
}
@Test
public void testFileInputFormatSplits() throws Throwable {
final S3AFileSystem fs = getFileSystem();
final Path base = methodPath();
final Path src = createDirWithUpload();
final Path dest = new Path(base, "dest");
final Path monday = new Path(dest, "day=monday");
final int count = 4;
List<Path> files = new ArrayList<>();
for (int i = 0; i < count; i++) {
files.add(file(new Path(monday, "file-0" + i + ".parquet")));
}
final JobContextImpl jobContext = new JobContextImpl(getConfiguration(), new JobID("job", 1));
final JobConf jc = (JobConf) jobContext.getConfiguration();
jc.set("mapreduce.input.fileinputformat.inputdir", base.toUri().toString());
jc.setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);
final TextInputFormat inputFormat = new TextInputFormat();
final List<Path> paths = inputFormat.getSplits(jobContext).stream().map(s ->
((FileSplit) s).getPath())
.collect(Collectors.toList());
Assertions.assertThat(paths)
.describedAs("input split of base directory")
.containsExactlyInAnyOrderElementsOf(files);
}
/**
* Exec a shell command; require it to succeed.
* @param base base dir
* @param command command sequence
* @throws Exception failure
*/
private void shell(final Path base, final String... command) throws Exception {
shell(0, base, command);
}
/**
* Exec a shell command; require the result to match the expected outcome.
* @param expected expected outcome
* @param base base dir
* @param command command sequence
* @throws Exception failure
*/
private void shell(int expected, final Path base, final String... command) throws Exception {
Assertions.assertThat(run(getConfiguration(), new FsShell(), command))
.describedAs("%s %s", command[0], base)
.isEqualTo(expected);
}
/**
* Assert the upload count under a dir is the expected value.
* Failure message will include the list of entries.
* @param dir dir
* @param expected expected count
* @throws IOException listing problem
*/
private void assertUploadCount(final Path dir, final int expected) throws IOException {
Assertions.assertThat(toList(listUploads(dir)))
.describedAs("uploads under %s", dir)
.hasSize(expected);
}
/**
* List uploads; use the same APIs that the directory operations use,
* so implicitly validating them.
* @param dir directory to list
* @return full list of entries
* @throws IOException listing problem
*/
private RemoteIterator<MultipartUpload> listUploads(Path dir) throws IOException {
final S3AFileSystem fs = getFileSystem();
try (AuditSpan ignored = span()) {
final StoreContext sc = fs.createStoreContext();
return fs.listUploadsUnderPrefix(sc, sc.pathToKey(dir));
}
}
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@ -32,21 +31,20 @@
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertFileHasLength;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertHasPathCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.clearAnyUploads;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.createMagicFile;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_LIST;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static org.apache.hadoop.util.functional.RemoteIterators.toList;
/**
* Test behavior of purging uploads in rename and delete.
* S3 Express tests automatically set this; it is explicitly set for the rest.
*/
public class ITestUploadPurgeOnDirectoryOperations extends AbstractS3ACostTest {
@ -117,22 +115,6 @@ public void testRenameWithPendingUpload() throws Throwable {
assertUploadCount(dir, 0);
}
/**
* Create a magic file of "real" length more than 0 bytes long.
* @param fs filesystem
* @param dir directory
* @return the path
* @throws IOException creation failure.p
*/
private static Path createMagicFile(final S3AFileSystem fs, final Path dir) throws IOException {
Path magicFile = new Path(dir, MAGIC_PATH_PREFIX + "001/file.txt");
createFile(fs, magicFile, true, "123".getBytes(StandardCharsets.UTF_8));
// the file exists but is a 0 byte marker file.
assertFileHasLength(fs, magicFile, 0);
return magicFile;
}
/**
* Assert the upload count under a dir is the expected value.
* Failure message will include the list of entries.

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.isS3ExpressStore;
/**
* Test S3 Express Storage methods.
*/
public class TestS3ExpressStorage extends AbstractHadoopTestBase {
private static final String S3EXPRESS_BUCKET = "bucket--usw2-az2--x-s3";
private static final String AZ = "usw2-az2";
@Test
public void testS3ExpressStateDefaultEndpoint() {
assertS3ExpressState(S3EXPRESS_BUCKET, true, "");
assertS3ExpressState("bucket", false, "");
}
@Test
public void testS3ExpressStateAwsRegions() {
assertS3ExpressState(S3EXPRESS_BUCKET, true, "s3.amazonaws.com");
assertS3ExpressState(S3EXPRESS_BUCKET, true, "s3.cn-northwest-1.amazonaws.com.cn");
assertS3ExpressState(S3EXPRESS_BUCKET, true, "s3-fips.us-gov-east-1.amazonaws.com");
assertS3ExpressState(S3EXPRESS_BUCKET, true,
"s3-accesspoint-fips.dualstack.us-east-1.amazonaws.com");
assertS3ExpressState(S3EXPRESS_BUCKET, true, "s3.ca-central-1.amazonaws.com");
}
@Test
public void testS3ExpressStateThirdPartyStore() {
assertS3ExpressState(S3EXPRESS_BUCKET, false, "storeendpoint.example.com");
}
private void assertS3ExpressState(final String bucket, final boolean expected, String endpoint) {
Assertions.assertThat(isS3ExpressStore(bucket, endpoint))
.describedAs("isS3ExpressStore(%s) with endpoint %s", bucket, endpoint)
.isEqualTo(expected);
}
}

View File

@ -27,6 +27,7 @@
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
@ -684,7 +685,8 @@ private void assertContainsFileUnderMarkerOnly(
}
/**
* Expect the list of status objects to match that of the paths.
* Expect the list of status objects to match that of the paths,
* without enforcing ordering of the values.
* @param statuses status object list
* @param paths ordered varargs list of paths
* @param <T> type of status objects
@ -692,20 +694,11 @@ private void assertContainsFileUnderMarkerOnly(
private <T extends FileStatus> void assertContainsExactlyStatusOfPaths(
List<T> statuses, Path... paths) {
String actual = statuses.stream()
.map(Object::toString)
.collect(Collectors.joining(";"));
String expected = Arrays.stream(paths)
.map(Object::toString)
.collect(Collectors.joining(";"));
String summary = "expected [" + expected + "]"
+ " actual = [" + actual + "]";
assertEquals("mismatch in size of listing " + summary,
paths.length, statuses.size());
for (int i = 0; i < statuses.size(); i++) {
assertEquals("Path mismatch at element " + i + " in " + summary,
paths[i], statuses.get(i).getPath());
}
final List<Path> pathList = statuses.stream()
.map(FileStatus::getPath)
.collect(Collectors.toList());
Assertions.assertThat(pathList)
.containsExactlyInAnyOrder(paths);
}
/**

View File

@ -219,8 +219,10 @@ public void testToolsNoBucket() throws Throwable {
cmdR.getName(),
S3A_THIS_BUCKET_DOES_NOT_EXIST
};
intercept(UnknownStoreException.class,
() -> cmdR.run(argsR));
intercept(UnknownStoreException.class, () -> {
final int e = cmdR.run(argsR);
return String.format("Outcome of %s on missing bucket: %d", tool, e);
});
}
}

View File

@ -18,10 +18,12 @@
package org.apache.hadoop.fs.s3a.scale;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntFunction;
@ -72,6 +74,7 @@
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
/**
* Scale test which creates a huge file.
@ -103,8 +106,8 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
public void setup() throws Exception {
super.setup();
scaleTestDir = new Path(getTestPath(), getTestSuiteName());
hugefile = new Path(scaleTestDir, "hugefile");
hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed");
hugefile = new Path(scaleTestDir, "src/hugefile");
hugefileRenamed = new Path(scaleTestDir, "dest/hugefile");
uploadBlockSize = uploadBlockSize();
filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE,
DEFAULT_HUGE_FILESIZE);
@ -490,7 +493,40 @@ public void test_030_postCreationAssertions() throws Throwable {
ContractTestUtils.assertPathExists(fs, "Huge file", hugefile);
FileStatus status = fs.getFileStatus(hugefile);
ContractTestUtils.assertIsFile(hugefile, status);
LOG.info("Huge File Status: {}", status);
assertEquals("File size in " + status, filesize, status.getLen());
// now do some etag status checks asserting they are always the same
// across listing operations.
final Path path = hugefile;
final FileStatus listStatus = listFile(hugefile);
LOG.info("List File Status: {}", listStatus);
Assertions.assertThat(listStatus.getLen())
.describedAs("List file status length %s", listStatus)
.isEqualTo(filesize);
Assertions.assertThat(etag(listStatus))
.describedAs("List file status etag %s", listStatus)
.isEqualTo(etag(status));
}
/**
* Get a filestatus by listing the parent directory.
* @param path path
* @return status
* @throws IOException failure to read, file not found
*/
private FileStatus listFile(final Path path)
throws IOException {
try {
return filteringRemoteIterator(
getFileSystem().listStatusIterator(path.getParent()),
st -> st.getPath().equals(path))
.next();
} catch (NoSuchElementException e) {
throw (FileNotFoundException)(new FileNotFoundException("Not found: " + path)
.initCause(e));
}
}
/**
@ -509,7 +545,7 @@ public void test_040_PositionedReadHugeFile() throws Throwable {
String filetype = encrypted ? "encrypted file" : "file";
describe("Positioned reads of %s %s", filetype, hugefile);
S3AFileSystem fs = getFileSystem();
FileStatus status = fs.getFileStatus(hugefile);
FileStatus status = listFile(hugefile);
long size = status.getLen();
int ops = 0;
final int bufferSize = 8192;
@ -518,7 +554,11 @@ public void test_040_PositionedReadHugeFile() throws Throwable {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
ContractTestUtils.NanoTimer readAtByte0, readAtByte0Again, readAtEOF;
try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) {
try (FSDataInputStream in = fs.openFile(hugefile)
.withFileStatus(status)
.opt(FS_OPTION_OPENFILE_READ_POLICY, "random")
.opt(FS_OPTION_OPENFILE_BUFFER_SIZE, uploadBlockSize)
.build().get()) {
readAtByte0 = new ContractTestUtils.NanoTimer();
in.readFully(0, buffer);
readAtByte0.end("time to read data at start of file");
@ -661,27 +701,45 @@ public void test_100_renameHugeFile() throws Throwable {
S3AFileSystem fs = getFileSystem();
FileStatus status = fs.getFileStatus(hugefile);
long size = status.getLen();
fs.delete(hugefileRenamed, false);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
fs.rename(hugefile, hugefileRenamed);
renameFile(hugefile, hugefileRenamed);
long mb = Math.max(size / _1MB, 1);
timer.end("time to rename file of %d MB", mb);
LOG.info("Time per MB to rename = {} nS",
toHuman(timer.nanosPerOperation(mb)));
bandwidth(timer, size);
assertPathExists("renamed file", hugefileRenamed);
logFSState();
FileStatus destFileStatus = fs.getFileStatus(hugefileRenamed);
assertEquals(size, destFileStatus.getLen());
// rename back
ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
fs.rename(hugefileRenamed, hugefile);
renameFile(hugefileRenamed, hugefile);
timer2.end("Renaming back");
LOG.info("Time per MB to rename = {} nS",
toHuman(timer2.nanosPerOperation(mb)));
bandwidth(timer2, size);
}
/**
* Rename a file.
* Subclasses may do this differently.
* @param src source file
* @param dest dest file
* @throws IOException IO failure
*/
protected void renameFile(final Path src,
final Path dest) throws IOException {
final S3AFileSystem fs = getFileSystem();
fs.delete(dest, false);
final boolean renamed = fs.rename(src, dest);
Assertions.assertThat(renamed)
.describedAs("rename(%s, %s)", src, dest)
.isTrue();
}
/**
* Test to verify target file encryption key.
* @throws IOException

View File

@ -18,12 +18,20 @@
package org.apache.hadoop.fs.s3a.scale;
import java.io.IOException;
import org.assertj.core.api.Assertions;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BYTEBUFFER;
/**
* Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering.
* This also renames by parent directory, so validates parent
* dir renaming of huge files.
*/
public class ITestS3AHugeFilesByteBufferBlocks
extends AbstractSTestS3AHugeFiles {
@ -31,4 +39,25 @@ public class ITestS3AHugeFilesByteBufferBlocks
protected String getBlockOutputBufferName() {
return FAST_UPLOAD_BYTEBUFFER;
}
/**
* Rename the parent directory, rather than the file itself.
* @param src source file
* @param dest dest file
* @throws IOException
*/
@Override
protected void renameFile(final Path src, final Path dest) throws IOException {
final S3AFileSystem fs = getFileSystem();
final Path srcDir = src.getParent();
final Path destDir = dest.getParent();
fs.delete(destDir, true);
fs.mkdirs(destDir, null);
final boolean renamed = fs.rename(srcDir, destDir);
Assertions.assertThat(renamed)
.describedAs("rename(%s, %s)", src, dest)
.isTrue();
}
}

View File

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.tools;
import java.io.IOException;
import java.net.UnknownHostException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AWSBadRequestException;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.util.ExitUtil;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeNotS3ExpressFileSystem;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeS3ExpressFileSystem;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.expectErrorCode;
import static org.apache.hadoop.fs.s3a.impl.S3ExpressStorage.STORE_CAPABILITY_S3_EXPRESS_STORAGE;
import static org.apache.hadoop.fs.s3a.tools.BucketTool.CREATE;
import static org.apache.hadoop.fs.s3a.tools.BucketTool.NO_ZONE_SUPPLIED;
import static org.apache.hadoop.fs.s3a.tools.BucketTool.OPT_ENDPOINT;
import static org.apache.hadoop.fs.s3a.tools.BucketTool.OPT_REGION;
import static org.apache.hadoop.fs.s3a.tools.BucketTool.OPT_ZONE;
import static org.apache.hadoop.fs.s3a.tools.BucketTool.UNSUPPORTED_ZONE_ARG;
import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_NOT_ACCEPTABLE;
import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_USAGE;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test {@link BucketTool}.
* This is a tricky little test as it doesn't want to create any new bucket,
* including on third party stores.
*/
public class ITestBucketTool extends AbstractS3ATestBase {
public static final String USWEST_2 = "us-west-2";
private static final Logger LOG = LoggerFactory.getLogger(ITestBucketTool.class);
/**
* This is yours.
*/
public static final String OWNED = "BucketAlreadyOwnedByYouException";
/**
* This is not yours. Not *yet* used in any tests.
*/
public static final String EXISTS = "BucketAlreadyExists";
/**
* Sample S3 Express bucket name. {@value}.
*/
private static final String S3EXPRESS_BUCKET = "bucket--usw2-az2--x-s3";
/**
* Sample S3 Express bucket URI. {@value}.
*/
public static final String S3A_S3EXPRESS = "s3a://" + S3EXPRESS_BUCKET;
/**
* US west az. {@value}
*/
public static final String USWEST_AZ_2 = "usw2-az2";
private String bucketName;
private boolean s3ExpressStore;
private BucketTool bucketTool;
private String fsURI;
private String region;
private S3AFileSystem fs;
@Override
public void setup() throws Exception {
super.setup();
fs = getFileSystem();
final Configuration conf = fs.getConf();
bucketName = S3ATestUtils.getTestBucketName(conf);
fsURI = fs.getUri().toString();
s3ExpressStore = fs.hasPathCapability(new Path("/"), STORE_CAPABILITY_S3_EXPRESS_STORAGE);
bucketTool = new BucketTool(conf);
region = conf.get(AWS_REGION, "");
}
@Test
public void testRecreateTestBucketS3Express() throws Throwable {
assumeS3ExpressFileSystem(getFileSystem());
final IOException ex = intercept(IOException.class,
() -> bucketTool.exec("bucket", d(CREATE), d(OPT_ZONE), USWEST_AZ_2, d(OPT_REGION), region,
fsURI));
if (ex instanceof AWSBadRequestException) {
// owned error
assertExceptionContains(OWNED, ex);
} else if (ex instanceof UnknownHostException) {
// endpoint region stuff, expect the error to include the s3express endpoint
// name
assertExceptionContains("s3express-control", ex);
}
}
/**
* Try and recreate the test bucket.
* Third party stores may allow this, so skip test on them.
*/
@Test
public void testRecreateTestBucketNonS3Express() throws Throwable {
assumeNotS3ExpressFileSystem(fs);
assumeStoreAwsHosted(fs);
intercept(AWSBadRequestException.class, OWNED,
() -> bucketTool.exec("bucket", d(CREATE),
d(OPT_REGION), region,
fsURI));
}
@Test
public void testSimpleBucketWithZoneParam() throws Throwable {
expectErrorCode(EXIT_USAGE,
intercept(ExitUtil.ExitException.class, UNSUPPORTED_ZONE_ARG, () ->
bucketTool.exec("bucket", d(CREATE),
d(OPT_ZONE), USWEST_AZ_2,
"s3a://simple/")));
}
@Test
public void testS3ExpressBucketWithoutZoneParam() throws Throwable {
expectErrorCode(EXIT_USAGE,
intercept(ExitUtil.ExitException.class, NO_ZONE_SUPPLIED, () ->
bucketTool.exec("bucket", d(CREATE),
S3A_S3EXPRESS)));
}
/**
* To avoid accidentally trying to create S3 Express stores on third party
* endpoints, the tool rejects such attempts.
*/
@Test
public void testS3ExpressStorageOnlyOnAwsEndpoint() throws Throwable {
describe("The tool only supports S3 Express bucket names on aws endpoints");
expectErrorCode(EXIT_NOT_ACCEPTABLE,
intercept(ExitUtil.ExitException.class, () ->
bucketTool.exec("bucket", d(CREATE),
d(OPT_ENDPOINT), "s3.example.org",
S3A_S3EXPRESS)));
}
/**
* Add a dash to a string.
* @param s source.
* @return "-s"
*/
private String d(String s) {
return "-" + s;
}
}

View File

@ -108,7 +108,7 @@
<property>
<name>ireland.endpoint</name>
<value>s3-eu-west-1.amazonaws.com</value>
<value>s3.eu-west-1.amazonaws.com</value>
</property>
<property>

View File

@ -92,3 +92,7 @@ log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO
# uncomment this to trace where context entries are set
# log4j.logger.org.apache.hadoop.fs.audit.CommonAuditContext=TRACE
# uncomment this to get S3 Delete requests to return the list of deleted objects
# log4.logger.org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl=TRACE

View File

@ -24,9 +24,13 @@
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.util.ToolRunner;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -91,7 +95,9 @@ private static void assertRunDistCp(int exitCode, String src, String dst,
optsArr[optsArr.length - 2] = src;
optsArr[optsArr.length - 1] = dst;
assertEquals(exitCode,
ToolRunner.run(conf, distCp, optsArr));
Assertions.assertThat(ToolRunner.run(conf, distCp, optsArr))
.describedAs("Exit code of distcp %s",
Arrays.stream(optsArr).collect(Collectors.joining(" ")))
.isEqualTo(exitCode);
}
}