1. The class WrappedIO has been extended with more filesystem operations
- openFile()
- PathCapabilities
- StreamCapabilities
- ByteBufferPositionedReadable
All these static methods raise UncheckedIOExceptions rather than
checked ones.
2. The adjacent class org.apache.hadoop.io.wrappedio.WrappedStatistics
provides similar access to IOStatistics/IOStatisticsContext classes
and operations.
Allows callers to:
* Get a serializable IOStatisticsSnapshot from an IOStatisticsSource or
IOStatistics instance
* Save an IOStatisticsSnapshot to file
* Convert an IOStatisticsSnapshot to JSON
* Given an object which may be an IOStatisticsSource, return an object
whose toString() value is a dynamically generated, human readable summary.
This is for logging.
* Separate getters to the different sections of IOStatistics.
* Mean values are returned as a Map.Pair<Long, Long> of (samples, sum)
from which means may be calculated.
There are examples of the dynamic bindings to these classes in:
org.apache.hadoop.io.wrappedio.impl.DynamicWrappedIO
org.apache.hadoop.io.wrappedio.impl.DynamicWrappedStatistics
These use DynMethods and other classes in the package
org.apache.hadoop.util.dynamic which are based on the
Apache Parquet equivalents.
This makes re-implementing these in that library and others
which their own fork of the classes (example: Apache Iceberg)
3. The openFile() option "fs.option.openfile.read.policy" has
added specific file format policies for the core filetypes
* avro
* columnar
* csv
* hbase
* json
* orc
* parquet
S3A chooses the appropriate sequential/random policy as a
A policy `parquet, columnar, vector, random, adaptive` will use the parquet policy for
any filesystem aware of it, falling back to the first entry in the list which
the specific version of the filesystem recognizes
4. New Path capability fs.capability.virtual.block.locations
Indicates that locations are generated client side
and don't refer to real hosts.
Contributed by Steve Loughran
This is a followup to #6543 which ensures all test pass in configurations where
fs.s3a.performance.flags is set to "*" or contains "mkdirs"
Contributed by VJ Jasani
If the flag list in fs.s3a.performance.flags includes "mkdir"
then the safety check of a walk up the tree to look for a parent directory,
-done to verify a directory isn't being created under a file- are skipped.
This saves the cost of multiple list operations.
Contributed by Viraj Jasani
1. Configuration adds new method `getEnumSet()` to get a set of enum values from
a configuration string.
<E extends Enum<E>> EnumSet<E> getEnumSet(String key, Class<E> enumClass, boolean ignoreUnknown)
Whitespace is ignored, case is ignored and the value "*" is mapped to "all values of the enum".
If "ignoreUnknown" is true then when parsing, unknown values are ignored.
This is recommended for forward compatiblity with later versions.
2. This support is implemented in org.apache.hadoop.fs.s3a.impl.ConfigurationHelper -it can be used
elsewhere in the hadoop codebase.
3. A new private FlagSet class in hadoop common manages a set of enum flags.
It implements StreamCapabilities and can be probed for a specific option being set
(with a prefix)
S3A adds an option fs.s3a.performance.flags which builds a FlagSet with enum
type PerformanceFlagEnum
* which initially contains {Create, Delete, Mkdir, Open}
* the existing fs.s3a.create.performance option sets the flag "Create".
* tests which configure fs.s3a.create.performance MUST clear
fs.s3a.performance.flags in test setup.
Future performance flags are planned, with different levels of safety
and/or backwards compatibility.
Contributed by Steve Loughran
The new property fs.s3a.encryption.context allow users to specify the AWS KMS Encryption Context to be used in S3A.
The value of the encryption context is a key/value string that will be Base64 encoded and set in the parameter ssekmsEncryptionContext from the S3 client.
Contributed by Raphael Azzolini
Apache httpclient 4.5.x is the new default implementation of http connections;
this supports a large configurable pool of connections along with
the ability to limit their lifespan.
The networking library can be chosen using the configuration
option fs.azure.networking.library
The supported values are
- APACHE_HTTP_CLIENT : Use Apache HttpClient [Default]
- JDK_HTTP_URL_CONNECTION : Use JDK networking library
Important: unless the networking library is switched back to
the JDK, the apache httpcore and httpclient must be on the classpath
Contributed by Pranav Saxena
Adds new ClientManager interface/implementation which provides on-demand
creation of synchronous and asynchronous s3 clients, s3 transfer manager,
and in close() terminates these.
S3A FS is modified to
* Create a ClientManagerImpl instance and pass down to its S3Store.
* Use the same ClientManager interface against S3Store to demand-create
the services.
* Only create the async client as part of the transfer manager creation,
which will take place during the first rename() operation.
* Statistics on client creation count and duration are recorded.
+ Statistics on the time to initialize and shutdown the S3A FS are collected
in IOStatistics for reporting.
Adds to hadoop common class
LazyAtomicReference<T> implements CallableRaisingIOE<T>, Supplier<T>
and subclass
LazyAutoCloseableReference<T extends AutoCloseable>
extends LazyAtomicReference<T> implements AutoCloseable
These evaluate the Supplier<T>/CallableRaisingIOE<T> they were
constructed with on the first (successful) read of the the value.
Any exception raised during this operation will be rethrown, and on future
evaluations the same operation retried.
These classes implement the Supplier and CallableRaisingIOE
interfaces so can actually be used for to implement lazy function evaluation
as Haskell and some other functional languages do.
LazyAutoCloseableReference is AutoCloseable; its close() method will
close the inner reference if it is set
This class is used in ClientManagerImpl for the lazy S3 Cliehnt creation
and closure.
Contributed by Steve Loughran.
Speed up slow tests
* TestS3AAWSCredentialsProvider: decrease thread pool shutdown time
* TestS3AInputStreamRetry: reduce retry limit and intervals
Contributed by Steve Loughran
The new test TestAWSV2SDK scans the aws sdk bundle.jar and prints out all classes
which are unshaded, so at risk of creating classpath problems
It does not fail the test if this holds, because the current SDKs
do ship with unshaded classes; the test would always fail.
The SDK upgrade process should include inspecting the output
of this test to see if it has got worse (do a before/after check).
Once the AWS SDK does shade everything, we can have this
test fail on any regression
Contributed by Harshit Gupta
It is now possible to provide a job ID in the maven "job.id" property
hadoop-aws test runs to isolate paths under a the test bucket
under which all tests will be executed.
This will allow independent builds *in different source trees*
to test against the same bucket in parallel, and is designed for
CI testing.
Example:
mvn verify -Dparallel-tests -Droot.tests.enabled=false -Djob.id=1
mvn verify -Droot.tests.enabled=false -Djob.id=2
- Root tests must be be disabled to stop them cleaning up
the test paths of other test runs.
- Do still regularly run the root tests just to force cleanup
of the output of any interrupted test suites.
Contributed by Steve Loughran
Add support for Azure Active Directory (Azure AD) workload identities which integrate with the Kubernetes's native capabilities to federate with any external identity provider.
Contributed By: Anuj Modi
Customer-provided-keys (CPK) configs are not allowed with non-hierarchal-namespace (non-HNS) accounts for ABFS. This patch aims to prevent ABFS initialization for non-HNS accounts if CPK configs are provided.
Contributed by: Pranav Saxena
* parameterize the test run rather than do it from within the test suite.
* log what the committer factory is up to (and improve its logging)
* close all filesystems, then create the test filesystem with cache enabled.
The cache is critical, we want the fs from cache to be used when querying
filesystem properties, rather than one created from the committer jobconf,
which will have the same options as the task committer, so not actually
validate the override logic.
Contributed by Steve Loughran
WrappedIO.bulkDelete_PageSize() => bulkDelete_pageSize()
Makes it consistent with the HADOOP-19131 naming scheme.
The name needs to be fixed before invoking it through reflection,
as once that is attempted the binding won't work at run time,
though compilation will be happy.
Contributed by Steve Loughran
Adds support for metric collection at the filesystem instance level.
Metrics are pushed to the store upon the closure of a filesystem instance, encompassing all operations
that utilized that specific instance.
Collected Metrics:
- Number of successful requests without any retries.
- Count of requests that succeeded after a specified number of retries (x retries).
- Request count subjected to throttling.
- Number of requests that failed despite exhausting all retry attempts. etc.
Implementation Details:
Incorporated logic in the AbfsClient to facilitate metric pushing through an additional request.
This occurs in scenarios where no requests are sent to the backend for a defined idle period.
By implementing these enhancements, we ensure comprehensive monitoring and analysis of filesystem interactions, enabling a deeper understanding of success rates, retry scenarios, throttling instances, and exhaustive failure scenarios. Additionally, the AbfsClient logic ensures that metrics are proactively pushed even during idle periods, maintaining a continuous and accurate representation of filesystem performance.
Contributed by Anmol Asrani
Applications can create a BulkDelete instance from a
BulkDeleteSource; the BulkDelete interface provides
the pageSize(): the maximum number of entries which can be
deleted, and a bulkDelete(Collection paths)
method which can take a collection up to pageSize() long.
This is optimized for object stores with bulk delete APIs;
the S3A connector will offer the page size of
fs.s3a.bulk.delete.page.size unless bulk delete has
been disabled.
Even with a page size of 1, the S3A implementation is
more efficient than delete(path)
as there are no safety checks for the path being a directory
or probes for the need to recreate directories.
The interface BulkDeleteSource is implemented by
all FileSystem implementations, with a page size
of 1 and mapped to delete(pathToDelete, false).
This means that callers do not need to have special
case handling for object stores versus classic filesystems.
To aid use through reflection APIs, the class
org.apache.hadoop.io.wrappedio.WrappedIO
has been created with "reflection friendly" methods.
Contributed by Mukund Thakur and Steve Loughran
Improve task commit resilience everywhere
and add an option to reduce delete IO requests on
job cleanup (relevant for ABFS and HDFS).
Task Commit Resilience
----------------------
Task manifest saving is re-attempted on failure; the number of
attempts made is configurable with the option:
mapreduce.manifest.committer.manifest.save.attempts
* The default is 5.
* The minimum is 1; asking for less is ignored.
* A retry policy adds 500ms of sleep per attempt.
* Move from classic rename() to commitFile() to rename the file,
after calling getFileStatus() to get its length and possibly etag.
This becomes a rename() on gcs/hdfs anyway, but on abfs it does reach
the ResilientCommitByRename callbacks in abfs, which report on
the outcome to the caller...which is then logged at WARN.
* New statistic task_stage_save_summary_file to distinguish from
other saving operations (job success/report file).
This is only saved to the manifest on task commit retries, and
provides statistics on all previous unsuccessful attempts to save
the manifests
+ test changes to match the codepath changes, including improvements
in fault injection.
Directory size for deletion
---------------------------
New option
mapreduce.manifest.committer.cleanup.parallel.delete.base.first
This attempts an initial attempt at deleting the base dir, only falling
back to parallel deletes if there's a timeout.
This option is disabled by default; Consider enabling it for abfs to
reduce IO load. Consult the documentation for more details.
Success file printing
---------------------
The command to print a JSON _SUCCESS file from this committer and
any S3A committer is now something which can be invoked from
the mapred command:
mapred successfile <path to file>
Contributed by Steve Loughran
HADOOP-19057 switched the hadoop-aws test bucket from landsat-pds to
noaa-cors-pds
This new bucket isn't accessible if the client configuration
sets an fs.s3a.endpoint/region value other than us-east-1.
Contributed by Viraj Jasani
The description of `fs.s3a.committer.abort.pending.uploads` in the section `Concurrent Jobs writing to the same destination` is not correct. Its default value is `true`.
Contributed by Xi Chen
ABFS has a client-side throttling mechanism which works on the metrics collected
from past requests
When requests are fail due to server-side throttling it updates its
metrics and recalculates any client side backoff.
The choice of which requests should be used to compute client side
backoff interval is based on the http status code:
- Status code in 2xx range: Successful Operations should contribute.
- Status code in 3xx range: Redirection Operations should not contribute.
- Status code in 4xx range: User Errors should not contribute.
- Status code is 503: Throttling Error should contribute only if they
are due to client limits breach as follows:
* 503, Ingress Over Account Limit: Should Contribute
* 503, Egress Over Account Limit: Should Contribute
* 503, TPS Over Account Limit: Should Contribute
* 503, Other Server Throttling: Should not Contribute.
- Status code in 5xx range other than 503: Should not Contribute.
- IOException and UnknownHostExceptions: Should not Contribute.
Contributed by Anuj Modi
Clarifies behaviour of VectorIO methods with contract tests as well as
specification.
* Add precondition range checks to all implementations
* Identify and fix bug where direct buffer reads was broken
(HADOOP-19101; this surfaced in ABFS contract tests)
* Logging in VectoredReadUtils.
* TestVectoredReadUtils verifies validation logic.
* FileRangeImpl toString() improvements
* CombinedFileRange tracks bytes in range which are wanted;
toString() output logs this.
HDFS
* Add test TestHDFSContractVectoredRead
ABFS
* Add test ITestAbfsFileSystemContractVectoredRead
S3A
* checks for vector IO being stopped in all iterative
vector operations, including draining
* maps read() returning -1 to failure
* passes in file length to validation
* Error reporting to only completeExceptionally() those ranges
which had not yet read data in.
* Improved logging.
readVectored()
* made synchronized. This is only for the invocation;
the actual async retrieves are unsynchronized.
* closes input stream on invocation
* switches to random IO, so avoids keeping any long-lived connection around.
+ AbstractSTestS3AHugeFiles enhancements.
+ ADDENDUM: test fix in ITestS3AContractVectoredRead
Contains: HADOOP-19101. Vectored Read into off-heap buffer broken in fallback
implementation
Contributed by Steve Loughran
Change-Id: Ia4ed71864c595f175c275aad83a2ff5741693432
Clarifies behaviour of VectorIO methods with contract tests as well as specification.
* Add precondition range checks to all implementations
* Identify and fix bug where direct buffer reads was broken
(HADOOP-19101; this surfaced in ABFS contract tests)
* Logging in VectoredReadUtils.
* TestVectoredReadUtils verifies validation logic.
* FileRangeImpl toString() improvements
* CombinedFileRange tracks bytes in range which are wanted;
toString() output logs this.
HDFS
* Add test TestHDFSContractVectoredRead
ABFS
* Add test ITestAbfsFileSystemContractVectoredRead
S3A
* checks for vector IO being stopped in all iterative
vector operations, including draining
* maps read() returning -1 to failure
* passes in file length to validation
* Error reporting to only completeExceptionally() those ranges
which had not yet read data in.
* Improved logging.
readVectored()
* made synchronized. This is only for the invocation;
the actual async retrieves are unsynchronized.
* closes input stream on invocation
* switches to random IO, so avoids keeping any long-lived connection around.
+ AbstractSTestS3AHugeFiles enhancements.
Contains: HADOOP-19101. Vectored Read into off-heap buffer broken in fallback implementation
Contributed by Steve Loughran