diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/LeakReporter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/LeakReporter.java
new file mode 100644
index 0000000000..824852d052
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/LeakReporter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.util.functional.RunnableRaisingIOE;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A class to report leaks of streams.
+ *
+ * It is created during object creation, and closed during finalization.
+ * Predicates should be supplied for the {@link #isOpen} probe check if the
+ * resource is still open, and an operation to actually close the
+ * target.
+ */
+public class LeakReporter implements Closeable {
+
+ /**
+ * Name of logger used to report leaks: {@value}.
+ */
+ public static final String RESOURCE_LEAKS_LOG_NAME = "org.apache.hadoop.fs.resource.leaks";
+
+ /**
+ * Special log for leaked streams.
+ */
+ private static final Logger LEAK_LOG =
+ LoggerFactory.getLogger(RESOURCE_LEAKS_LOG_NAME);
+
+ /**
+ * Format string used to build the thread information: {@value}.
+ */
+ @VisibleForTesting
+ static final String THREAD_FORMAT =
+ "; thread: %s; id: %d";
+
+ /**
+ * Re-entrancy check.
+ */
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /**
+ * Predicate to check if the resource is open.
+ */
+ private final BooleanSupplier isOpen;
+
+ /**
+ * Action to close the resource.
+ */
+ private final RunnableRaisingIOE closeAction;
+
+ /**
+ * Stack trace of object creation; used to
+ * report of unclosed streams in finalize().
+ */
+ private final IOException leakException;
+
+ /**
+ * Constructor.
+ *
+ * Validates the parameters and builds the stack;
+ * append "; thread: " + thread name.
+ * @param message error message
+ * @param isOpen open predicate
+ * @param closeAction action to close
+ */
+ public LeakReporter(
+ final String message,
+ final BooleanSupplier isOpen,
+ final RunnableRaisingIOE closeAction) {
+ this.isOpen = requireNonNull(isOpen);
+ this.closeAction = requireNonNull(closeAction);
+ // build the warning thread.
+ // This includes the error string to print, so as to avoid
+ // constructing objects in finalize().
+ this.leakException = new IOException(message
+ + String.format(THREAD_FORMAT,
+ Thread.currentThread().getName(),
+ Thread.currentThread().getId()));
+ }
+
+ /**
+ * Close the resource.
+ */
+ @Override
+ public void close() {
+ try {
+ if (!closed.getAndSet(true) && isOpen.getAsBoolean()) {
+ // log a warning with the creation stack
+ LEAK_LOG.warn(leakException.getMessage());
+ // The creation stack is logged at INFO, so that
+ // it is possible to configure logging to print
+ // the name of files left open, without printing
+ // the stacks. This is better for production use.
+
+ LEAK_LOG.info("stack", leakException);
+ closeAction.apply();
+ }
+ } catch (Exception e) {
+ LEAK_LOG.info("executing leak cleanup actions", e);
+ }
+ }
+
+ public IOException getLeakException() {
+ return leakException;
+ }
+
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ @Override
+ public String toString() {
+ return "LeakReporter{" +
+ "closed=" + closed.get() +
+ '}';
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index 85b82287e3..57c7757213 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -40,6 +40,14 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Evolving
public final class StreamStatisticNames {
+ /**
+ * Count of Stream leaks from an application which
+ * is not cleaning up correctly.
+ * Value :{@value}.
+ */
+ public static final String STREAM_LEAKS =
+ "stream_leaks";
+
/**
* Count of times the TCP stream was aborted.
* Value: {@value}.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/RunnableRaisingIOE.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/RunnableRaisingIOE.java
new file mode 100644
index 0000000000..0edb06e996
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/RunnableRaisingIOE.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.functional;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+/**
+ * Runnable interface whose {@link #apply()} method may raise
+ * an IOE.
+ * The implementation of {@link Runnable#run} invokes this
+ * and converts any raised IOE into an {@link UncheckedIOException}.
+ */
+@FunctionalInterface
+public interface RunnableRaisingIOE extends Runnable {
+
+ /**
+ * Apply the operation.
+ * @throws IOException Any IO failure
+ */
+ void apply() throws IOException;
+
+ @Override
+ default void run() {
+ try {
+ apply();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestLeakReporter.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestLeakReporter.java
new file mode 100644
index 0000000000..c691a2577f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestLeakReporter.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.fs.impl.LeakReporter.THREAD_FORMAT;
+import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
+
+public final class TestLeakReporter extends AbstractHadoopTestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestLeakReporter.class);
+
+ /**
+ * Count of close calls.
+ */
+ private final AtomicInteger closeCount = new AtomicInteger();
+
+ /**
+ * Big test: creates a reporter, closes it.
+ * Verifies that the error message and stack traces is printed when
+ * open, and that the close callback was invoked.
+ *
+ * After the first invocation, a second invocation is ignored.
+ */
+ @Test
+ public void testLeakInvocation() throws Throwable {
+
+ final String message = "";
+ final LeakReporter reporter = new LeakReporter(message,
+ () -> true,
+ this::closed);
+
+ // store the old thread name and change it,
+ // so the log test can verify that the old thread name is printed.
+ String oldName = Thread.currentThread().getName();
+ Thread.currentThread().setName("thread");
+ // Capture the logs
+ GenericTestUtils.LogCapturer logs =
+ captureLogs(LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
+ expectClose(reporter, 1);
+
+ // check the log
+ logs.stopCapturing();
+ final String output = logs.getOutput();
+ LOG.info("output of leak log is {}", output);
+
+ final String threadInfo = String.format(THREAD_FORMAT,
+ oldName,
+ Thread.currentThread().getId());
+ // log auditing
+ Assertions.assertThat(output)
+ .describedAs("output from the logs")
+ .contains("WARN")
+ .contains(message)
+ .contains(Thread.currentThread().getName())
+ .contains(threadInfo)
+ .contains("TestLeakReporter.testLeakInvocation")
+ .contains("INFO")
+ .contains("stack");
+
+ // no reentrancy
+ expectClose(reporter, 1);
+ }
+
+ /**
+ * Expect the close operation to result in
+ * a value of the close count to be as expected.
+ * @param reporter leak reporter
+ * @param expected expected value after the close
+ */
+ private void expectClose(final LeakReporter reporter, final int expected) {
+ reporter.close();
+ assertCloseCount(expected);
+ }
+
+ /**
+ * Close operation: increments the counter.
+ */
+ private void closed() {
+ closeCount.incrementAndGet();
+ }
+
+ /**
+ * When the source is closed, no leak cleanup takes place.
+ */
+ @Test
+ public void testLeakSkipped() throws Throwable {
+
+ final LeakReporter reporter = new LeakReporter("",
+ () -> false,
+ this::closed);
+ expectClose(reporter, 0);
+ }
+
+ /**
+ * If the probe raises an exception, the exception is swallowed
+ * and the close action is never invoked.
+ */
+ @Test
+ public void testProbeFailureSwallowed() throws Throwable {
+ final LeakReporter reporter = new LeakReporter("",
+ this::raiseNPE,
+ this::closed);
+ expectClose(reporter, 0);
+ }
+
+ /**
+ * Any exception raised in the close action it is swallowed.
+ */
+ @Test
+ public void testCloseActionSwallowed() throws Throwable {
+ final LeakReporter reporter = new LeakReporter("",
+ () -> true,
+ this::raiseNPE);
+ reporter.close();
+
+ Assertions.assertThat(reporter.isClosed())
+ .describedAs("reporter closed)")
+ .isTrue();
+ }
+
+ /**
+ * Always raises an NPE.
+ * @return never
+ */
+ private boolean raiseNPE() {
+ throw new NullPointerException("oops");
+ }
+
+ /**
+ * Assert that the value of {@link #closeCount} is as expected.
+ * @param ex expected.
+ */
+ private void assertCloseCount(final int ex) {
+ Assertions.assertThat(closeCount.get())
+ .describedAs("close count")
+ .isEqualTo(ex);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 9846953911..c0e530cb5c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -160,6 +160,7 @@ import org.apache.hadoop.fs.statistics.FileSystemStatisticNames;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.fs.store.audit.AuditEntryPoint;
@@ -5587,6 +5588,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
case AWS_S3_ACCESS_GRANTS_ENABLED:
return s3AccessGrantsEnabled;
+ // stream leak detection.
+ case StreamStatisticNames.STREAM_LEAKS:
+ return !prefetchEnabled;
+
default:
// is it a performance flag?
if (performanceFlags.hasCapability(capability)) {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 147cd7567a..c620ca042d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction;
@@ -39,6 +40,8 @@ import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.impl.LeakReporter;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -116,6 +119,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
*/
private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3AInputStream.class);
+
/**
* Atomic boolean variable to stop all ongoing vectored read operation
* for this input stream. This will be set to true when the stream is
@@ -159,8 +165,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
private final Optional fileLength;
private final String uri;
- private static final Logger LOG =
- LoggerFactory.getLogger(S3AInputStream.class);
+
private final S3AInputStreamStatistics streamStatistics;
private S3AInputPolicy inputPolicy;
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
@@ -202,6 +207,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
/** Aggregator used to aggregate per thread IOStatistics. */
private final IOStatisticsAggregator threadIOStatistics;
+ /**
+ * Report of leaks.
+ * with report and abort unclosed streams in finalize().
+ */
+ private final LeakReporter leakReporter;
+
/**
* Create the stream.
* This does not attempt to open it; that is only done on the first
@@ -242,6 +253,60 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
this.boundedThreadPool = boundedThreadPool;
this.vectoredIOContext = context.getVectoredIOContext();
this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
+ // build the leak reporter
+ this.leakReporter = new LeakReporter(
+ "Stream not closed while reading " + uri,
+ this::isStreamOpen,
+ () -> abortInFinalizer());
+ }
+
+ /**
+ * Finalizer.
+ *
+ * Verify that the inner stream is closed.
+ *
+ * If it is not, it means streams are being leaked in application code.
+ * Log a warning, including the stack trace of the caller,
+ * then abort the stream.
+ *
+ * This does not attempt to invoke {@link #close()} as that is
+ * a more complex operation, and this method is being executed
+ * during a GC finalization phase.
+ *
+ * Applications MUST close their streams; this is a defensive
+ * operation to return http connections and warn the end users
+ * that their applications are at risk of running out of connections.
+ *
+ * {@inheritDoc}
+ */
+ @Override
+ protected void finalize() throws Throwable {
+ leakReporter.close();
+ super.finalize();
+ }
+
+ /**
+ * Probe for stream being open.
+ * Not synchronized; the flag is volatile.
+ * @return true if the stream is still open.
+ */
+ private boolean isStreamOpen() {
+ return !closed;
+ }
+
+ /**
+ * Brute force stream close; invoked by {@link LeakReporter}.
+ * All exceptions raised are ignored.
+ */
+ private void abortInFinalizer() {
+ try {
+ // stream was leaked: update statistic
+ streamStatistics.streamLeaked();
+ // abort the stream. This merges statistics into the filesystem.
+ closeStream("finalize()", true, true).get();
+ } catch (InterruptedException | ExecutionException ignroed) {
+ /* ignore this failure shutdown */
+ }
}
/**
@@ -710,7 +775,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead;
CompletableFuture operation;
- SDKStreamDrainer drainer = new SDKStreamDrainer(
+ SDKStreamDrainer> drainer = new SDKStreamDrainer<>(
uri,
wrappedStream,
shouldAbort,
@@ -1357,6 +1422,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
switch (toLowerCase(capability)) {
case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.IOSTATISTICS_CONTEXT:
+ case StreamStatisticNames.STREAM_LEAKS:
case StreamCapabilities.READAHEAD:
case StreamCapabilities.UNBUFFER:
case StreamCapabilities.VECTOREDIO:
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index e3bef9f470..b84f19fcd8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -862,6 +862,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
this.filesystemStatistics = filesystemStatistics;
IOStatisticsStore st = iostatisticsStore()
.withCounters(
+ StreamStatisticNames.STREAM_LEAKS,
StreamStatisticNames.STREAM_READ_ABORTED,
StreamStatisticNames.STREAM_READ_BYTES_DISCARDED_ABORT,
StreamStatisticNames.STREAM_READ_CLOSED,
@@ -1126,6 +1127,15 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
merge(true);
}
+ /**
+ * Stream was leaked.
+ */
+ public void streamLeaked() {
+ increment(StreamStatisticNames.STREAM_LEAKS);
+ // merge as if closed.
+ merge(true);
+ }
+
/**
* {@inheritDoc}.
* As well as incrementing the {@code STREAM_READ_SEEK_POLICY_CHANGED}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 0bcdb29330..07133a48e8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -312,6 +312,10 @@ public enum Statistic {
StoreStatisticNames.OBJECT_PUT_BYTES_PENDING,
"number of bytes queued for upload/being actively uploaded",
TYPE_GAUGE),
+ STREAM_LEAKS(
+ StreamStatisticNames.STREAM_LEAKS,
+ "Streams detected as not closed safely",
+ TYPE_COUNTER),
STREAM_READ_ABORTED(
StreamStatisticNames.STREAM_READ_ABORTED,
"Count of times the TCP stream was aborted",
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
index 69f2c279dd..1e28a27e70 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
@@ -210,4 +210,8 @@ public interface S3AInputStreamStatistics extends AutoCloseable,
*/
DurationTracker initiateInnerStreamClose(boolean abort);
+ /**
+ * Stream was leaked.
+ */
+ default void streamLeaked() {};
}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
index 6fa37750de..aebf5761bd 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/connecting.md
@@ -485,6 +485,90 @@ If `storediag` doesn't connect to your S3 store, *nothing else will*.
Based on the experience of people who field support calls, here are
some of the main connectivity issues which cause problems.
+### Connection pool overloaded
+
+If more connections are needed than the HTTP connection pool has,
+then worker threads will block until one is freed.
+
+If the wait exceeds the time set in `fs.s3a.connection.acquisition.timeout`,
+the operation will fail with `"Timeout waiting for connection from pool`.
+
+This may be retried, but time has been lost, which results in slower operations.
+If queries suddenly gets slower as the number of active operations increase,
+then this is a possible cause.
+
+Fixes:
+
+Increase the value of `fs.s3a.connection.maximum`.
+This is the general fix on query engines such as Apache Spark, and Apache Impala
+which run many workers threads simultaneously, and do not keep files open past
+the duration of a single task within a larger query.
+
+It can also surface with applications which deliberately keep files open
+for extended periods.
+These should ideally call `unbuffer()` on the input streams.
+This will free up the connection until another read operation is invoked -yet
+still re-open faster than if `open(Path)` were invoked.
+
+Applications may also be "leaking" http connections by failing to
+`close()` them. This is potentially fatal as eventually the connection pool
+can get exhausted -at which point the program will no longer work.
+
+This can only be fixed in the application code: it is _not_ a bug in
+the S3A filesystem.
+
+1. Applications MUST call `close()` on an input stream when the contents of
+ the file are longer needed.
+2. If long-lived applications eventually fail with unrecoverable
+ `ApiCallTimeout` exceptions, they are not doing so.
+
+To aid in identifying the location of these leaks, when a JVM garbage
+collection releases an unreferenced `S3AInputStream` instance,
+it will log at `WARN` level that it has not been closed,
+listing the file URL, and the thread name + ID of the the thread
+which creating the file.
+The the stack trace of the `open()` call will be logged at `INFO`
+
+```
+2024-11-13 12:48:24,537 [Finalizer] WARN resource.leaks (LeakReporter.java:close(114)) - Stream not closed while reading s3a://bucket/test/testFinalizer; thread: JUnit-testFinalizer; id: 11
+2024-11-13 12:48:24,537 [Finalizer] INFO resource.leaks (LeakReporter.java:close(120)) - stack
+java.io.IOException: Stream not closed while reading s3a://bucket/test/testFinalizer; thread: JUnit-testFinalizer; id: 11
+ at org.apache.hadoop.fs.impl.LeakReporter.(LeakReporter.java:101)
+ at org.apache.hadoop.fs.s3a.S3AInputStream.(S3AInputStream.java:257)
+ at org.apache.hadoop.fs.s3a.S3AFileSystem.executeOpen(S3AFileSystem.java:1891)
+ at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1841)
+ at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:997)
+ at org.apache.hadoop.fs.s3a.ITestS3AInputStreamLeakage.testFinalizer(ITestS3AInputStreamLeakage.java:99)
+```
+
+It will also `abort()` the HTTP connection, freeing up space in the connection pool.
+This automated cleanup is _not_ a substitute for applications correctly closing
+input streams -it only happens during garbage collection, and this may not be
+rapid enough to prevent an application running out of connections.
+
+It is possible to stop these warning messages from being logged,
+by restricting the log `org.apache.hadoop.fs.resource.leaks` to
+only log at `ERROR` or above.
+This will also disable error logging for _all other resources whose leaks
+are detected.
+
+```properties
+log4j.logger.org.apache.hadoop.fs.s3a.connection.leaks=ERROR
+```
+
+To disable stack traces without the URI/thread information, set the log level to `WARN`
+
+```properties
+log4j.logger.org.apache.hadoop.fs.s3a.connection.leaks=WARN
+```
+
+This is better for production deployments: leakages are reported but
+stack traces only of relevance to the application developers are
+omitted.
+
+Finally, note that the filesystem and thread context IOStatistic `stream_leaks"` is updated;
+if these statistics are collected then the existence of leakages can be detected.
+
### Inconsistent configuration across a cluster
All hosts in the cluster need to have the configuration secrets;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
new file mode 100644
index 0000000000..4b871c6a19
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.lang.ref.WeakReference;
+import java.time.Duration;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS;
+import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
+
+/**
+ * Test Stream leakage.
+ */
+public class ITestS3AInputStreamLeakage extends AbstractS3ATestBase {
+
+ /**
+ * How big a file to create?
+ */
+ public static final int FILE_SIZE = 1024;
+
+ public static final byte[] DATASET = dataset(FILE_SIZE, '0', 10);
+
+ /**
+ * Time to wait after a GC/finalize is triggered before looking at the log.
+ */
+ public static final long GC_DELAY = Duration.ofSeconds(1).toMillis();
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ assume("Stream leak detection not avaialable",
+ getFileSystem().hasCapability(STREAM_LEAKS));
+ }
+
+ /**
+ * This test forces a GC of an open file then verifies that the
+ * log contains the error message.
+ *
+ * Care is needed here to ensure that no strong references are held to the
+ * stream, otherwise: no GC.
+ *
+ * It also assumes that {@code System.gc()} will do enough of a treewalk to
+ * prepare the stream for garbage collection (a weak ref is used to verify
+ * that it was removed as a reference), and that
+ * {@code System.runFinalization()} will then
+ * invoke the finalization.
+ *
+ * The finalize code runs its own thread "Finalizer"; this is async enough
+ * that assertions on log entries only work if there is a pause after
+ * finalization is triggered and the log is reviewed.
+ *
+ * The stream leak counter of the FileSystem is also updated; this
+ * is verified.
+ *
+ * Note: if the stream under test is not an S3AInputStream (i.e. is a prefetching one,
+ * this test is skipped. If/when the prefetching stream adds the same code,
+ * this check can be removed.
+ */
+ @Test
+ public void testFinalizer() throws Throwable {
+ Path path = methodPath();
+ final S3AFileSystem fs = getFileSystem();
+
+ ContractTestUtils.createFile(fs, path, true, DATASET);
+
+ // DO NOT use try-with-resources; this
+ // test MUST be able to remove all references
+ // to the stream
+ FSDataInputStream in = fs.open(path);
+
+ try {
+ Assertions.assertThat(in.hasCapability(STREAM_LEAKS))
+ .describedAs("Stream leak detection not supported in: " + in.getClass())
+ .isTrue();
+
+ Assertions.assertThat(in.read())
+ .describedAs("first byte read from %s", in)
+ .isEqualTo(DATASET[0]);
+
+ // get a weak ref so that after a GC we can look for it and verify it is gone
+ Assertions.assertThat(((S3AInputStream) in.getWrappedStream()).isObjectStreamOpen())
+ .describedAs("stream http connection status")
+ .isTrue();
+ // weak reference to track GC progress
+ WeakReference wrs =
+ new WeakReference<>((S3AInputStream) in.getWrappedStream());
+
+ // Capture the logs
+ GenericTestUtils.LogCapturer logs =
+ captureLogs(LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
+
+ LOG.info("captured log");
+
+ // remove strong reference to the stream
+ in = null;
+ // force the gc.
+ System.gc();
+ // make sure the GC removed the S3AInputStream.
+ Assertions.assertThat(wrs.get())
+ .describedAs("weak stream reference wasn't GC'd")
+ .isNull();
+
+ // finalize
+ System.runFinalization();
+
+ // finalize is async, so add a brief wait for it to be called.
+ // without this the log may or may not be empty
+ Thread.sleep(GC_DELAY);
+ LOG.info("end of log");
+
+ // check the log
+ logs.stopCapturing();
+ final String output = logs.getOutput();
+ LOG.info("output of leak log is {}", output);
+ Assertions.assertThat(output)
+ .describedAs("output from the logs during GC")
+ .contains("drain or abort reason finalize()") // stream release
+ .contains(path.toUri().toString()) // path
+ .contains(Thread.currentThread().getName()) // thread
+ .contains("testFinalizer"); // stack
+
+ // verify that leakages are added to the FS statistics
+ assertThatStatisticCounter(fs.getIOStatistics(), STREAM_LEAKS)
+ .isEqualTo(1);
+ } finally {
+ if (in != null) {
+ IOUtils.cleanupWithLogger(LOG, in);
+ }
+ }
+ }
+}