diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java
new file mode 100644
index 0000000000..1467738579
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.lang.ref.WeakReference;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsSource;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A weak referenced metrics source which avoids hanging on to large objects
+ * if somehow they don't get fully closed/cleaned up.
+ * The JVM may clean up all objects which are only weakly referenced whenever
+ * it does a GC, even if there is no memory pressure.
+ * To avoid these refs being removed, always keep a strong reference around
+ * somewhere.
+ */
+@InterfaceAudience.Private
+public class WeakRefMetricsSource implements MetricsSource {
+
+ /**
+ * Name to know when unregistering.
+ */
+ private final String name;
+
+ /**
+ * Underlying metrics source.
+ */
+ private final WeakReference sourceWeakReference;
+
+ /**
+ * Constructor.
+ * @param name Name to know when unregistering.
+ * @param source metrics source
+ */
+ public WeakRefMetricsSource(final String name, final MetricsSource source) {
+ this.name = name;
+ this.sourceWeakReference = new WeakReference<>(requireNonNull(source));
+ }
+
+ /**
+ * If the weak reference is non null, update the metrics.
+ * @param collector to contain the resulting metrics snapshot
+ * @param all if true, return all metrics even if unchanged.
+ */
+ @Override
+ public void getMetrics(final MetricsCollector collector, final boolean all) {
+ MetricsSource metricsSource = sourceWeakReference.get();
+ if (metricsSource != null) {
+ metricsSource.getMetrics(collector, all);
+ }
+ }
+
+ /**
+ * Name to know when unregistering.
+ * @return the name passed in during construction.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Get the source, will be null if the reference has been GC'd
+ * @return the source reference
+ */
+ public MetricsSource getSource() {
+ return sourceWeakReference.get();
+ }
+
+ @Override
+ public String toString() {
+ return "WeakRefMetricsSource{" +
+ "name='" + name + '\'' +
+ ", sourceWeakReference is " +
+ (sourceWeakReference.get() == null ? "unset" : "set") +
+ '}';
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 3e6f2322d3..420a92788c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -138,7 +138,6 @@
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatistics;
-import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
@@ -459,6 +458,13 @@ public void initialize(URI name, Configuration originalConf)
AuditSpan span = null;
try {
LOG.debug("Initializing S3AFileSystem for {}", bucket);
+ if (LOG.isTraceEnabled()) {
+ // log a full trace for deep diagnostics of where an object is created,
+ // for tracking down memory leak issues.
+ LOG.trace("Filesystem for {} created; fs.s3a.impl.disable.cache = {}",
+ name, originalConf.getBoolean("fs.s3a.impl.disable.cache", false),
+ new RuntimeException(super.toString()));
+ }
// clone the configuration into one with propagated bucket options
Configuration conf = propagateBucketOptions(originalConf, bucket);
// HADOOP-17894. remove references to s3a stores in JCEKS credentials.
@@ -3999,22 +4005,18 @@ public void close() throws IOException {
}
isClosed = true;
LOG.debug("Filesystem {} is closed", uri);
- if (getConf() != null) {
- String iostatisticsLoggingLevel =
- getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL,
- IOSTATISTICS_LOGGING_LEVEL_DEFAULT);
- logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics());
- }
try {
super.close();
} finally {
stopAllServices();
- }
- // Log IOStatistics at debug.
- if (LOG.isDebugEnabled()) {
- // robust extract and convert to string
- LOG.debug("Statistics for {}: {}", uri,
- IOStatisticsLogging.ioStatisticsToPrettyString(getIOStatistics()));
+ // log IO statistics, including of any file deletion during
+ // superclass close
+ if (getConf() != null) {
+ String iostatisticsLoggingLevel =
+ getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL,
+ IOSTATISTICS_LOGGING_LEVEL_DEFAULT);
+ logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics());
+ }
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 46568ec2a8..9d33efa9d0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.impl.WeakRefMetricsSource;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
@@ -160,7 +161,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
private final DurationTrackerFactory durationTrackerFactory;
- private String metricsSourceName;
+ /**
+ * Weak reference so there's no back reference to the instrumentation.
+ */
+ private WeakRefMetricsSource metricsSourceReference;
private final MetricsRegistry registry =
new MetricsRegistry("s3aFileSystem").setContext(CONTEXT);
@@ -233,19 +237,33 @@ public S3AInstrumentation(URI name) {
new MetricDurationTrackerFactory());
}
+ /**
+ * Get the current metrics system; demand creating.
+ * @return a metric system, creating if need be.
+ */
@VisibleForTesting
- public MetricsSystem getMetricsSystem() {
+ static MetricsSystem getMetricsSystem() {
synchronized (METRICS_SYSTEM_LOCK) {
if (metricsSystem == null) {
metricsSystem = new MetricsSystemImpl();
metricsSystem.init(METRICS_SYSTEM_NAME);
+ LOG.debug("Metrics system inited {}", metricsSystem);
}
}
return metricsSystem;
}
/**
- * Register this instance as a metrics source.
+ * Does the instrumentation have a metrics system?
+ * @return true if the metrics system is present.
+ */
+ @VisibleForTesting
+ static boolean hasMetricSystem() {
+ return metricsSystem != null;
+ }
+
+ /**
+ * Register this instance as a metrics source via a weak reference.
* @param name s3a:// URI for the associated FileSystem instance
*/
private void registerAsMetricsSource(URI name) {
@@ -257,8 +275,9 @@ private void registerAsMetricsSource(URI name) {
number = ++metricsSourceNameCounter;
}
String msName = METRICS_SOURCE_BASENAME + number;
- metricsSourceName = msName + "-" + name.getHost();
- metricsSystem.register(metricsSourceName, "", this);
+ String metricsSourceName = msName + "-" + name.getHost();
+ metricsSourceReference = new WeakRefMetricsSource(metricsSourceName, this);
+ metricsSystem.register(metricsSourceName, "", metricsSourceReference);
}
/**
@@ -680,19 +699,42 @@ public void getMetrics(MetricsCollector collector, boolean all) {
registry.snapshot(collector.addRecord(registry.info().name()), true);
}
+ /**
+ * if registered with the metrics, return the
+ * name of the source.
+ * @return the name of the metrics, or null if this instance is not bonded.
+ */
+ public String getMetricSourceName() {
+ return metricsSourceReference != null
+ ? metricsSourceReference.getName()
+ : null;
+ }
+
public void close() {
- synchronized (METRICS_SYSTEM_LOCK) {
- // it is critical to close each quantile, as they start a scheduled
- // task in a shared thread pool.
- throttleRateQuantile.stop();
- metricsSystem.unregisterSource(metricsSourceName);
- metricsSourceActiveCounter--;
- int activeSources = metricsSourceActiveCounter;
- if (activeSources == 0) {
- LOG.debug("Shutting down metrics publisher");
- metricsSystem.publishMetricsNow();
- metricsSystem.shutdown();
- metricsSystem = null;
+ if (metricsSourceReference != null) {
+ // get the name
+ String name = metricsSourceReference.getName();
+ LOG.debug("Unregistering metrics for {}", name);
+ // then set to null so a second close() is a noop here.
+ metricsSourceReference = null;
+ synchronized (METRICS_SYSTEM_LOCK) {
+ // it is critical to close each quantile, as they start a scheduled
+ // task in a shared thread pool.
+ if (metricsSystem == null) {
+ LOG.debug("there is no metric system to unregister {} from", name);
+ return;
+ }
+ throttleRateQuantile.stop();
+
+ metricsSystem.unregisterSource(name);
+ metricsSourceActiveCounter--;
+ int activeSources = metricsSourceActiveCounter;
+ if (activeSources == 0) {
+ LOG.debug("Shutting down metrics publisher");
+ metricsSystem.publishMetricsNow();
+ metricsSystem.shutdown();
+ metricsSystem = null;
+ }
}
}
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
index 79772ec9da..327b0fab28 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
@@ -103,4 +103,16 @@ public void testClosedOpen() throws Exception {
() -> getFileSystem().open(path("to-open")));
}
+ @Test
+ public void testClosedInstrumentation() throws Exception {
+ // no metrics
+ Assertions.assertThat(S3AInstrumentation.hasMetricSystem())
+ .describedAs("S3AInstrumentation.hasMetricSystem()")
+ .isFalse();
+
+ Assertions.assertThat(getFileSystem().getIOStatistics())
+ .describedAs("iostatistics of %s", getFileSystem())
+ .isNotNull();
+ }
+
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java
new file mode 100644
index 0000000000..d8b9247008
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.net.URI;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.impl.WeakRefMetricsSource;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem;
+import static org.apache.hadoop.fs.s3a.Statistic.DIRECTORIES_CREATED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test the {@link S3AInstrumentation} lifecycle, in particular how
+ * it binds to hadoop metrics through a {@link WeakRefMetricsSource}
+ * and that it will deregister itself in {@link S3AInstrumentation#close()}.
+ */
+public class TestInstrumentationLifecycle extends AbstractHadoopTestBase {
+
+ @Test
+ public void testDoubleClose() throws Throwable {
+ S3AInstrumentation instrumentation = new S3AInstrumentation(new URI("s3a://example/"));
+
+ // the metric system is created in the constructor
+ assertThat(S3AInstrumentation.hasMetricSystem())
+ .describedAs("S3AInstrumentation.hasMetricSystem()")
+ .isTrue();
+ // ask for a metric
+ String metricName = DIRECTORIES_CREATED.getSymbol();
+ assertThat(instrumentation.lookupMetric(metricName))
+ .describedAs("lookupMetric(%s) while open", metricName)
+ .isNotNull();
+
+ MetricsSystem activeMetrics = getMetricsSystem();
+ final String metricSourceName = instrumentation.getMetricSourceName();
+ final MetricsSource source = activeMetrics.getSource(metricSourceName);
+ // verify the source is registered through a weak ref, and that the
+ // reference maps to the instance.
+ Assertions.assertThat(source)
+ .describedAs("metric source %s", metricSourceName)
+ .isNotNull()
+ .isInstanceOf(WeakRefMetricsSource.class)
+ .extracting(m -> ((WeakRefMetricsSource) m).getSource())
+ .isSameAs(instrumentation);
+
+ // this will close the metrics system
+ instrumentation.close();
+
+ // iostats is still valid
+ assertThat(instrumentation.getIOStatistics())
+ .describedAs("iostats of %s", instrumentation)
+ .isNotNull();
+
+ // no metrics
+ assertThat(S3AInstrumentation.hasMetricSystem())
+ .describedAs("S3AInstrumentation.hasMetricSystem()")
+ .isFalse();
+
+ // metric lookup still works, so any invocation of an s3a
+ // method which still updates a metric also works
+ assertThat(instrumentation.lookupMetric(metricName))
+ .describedAs("lookupMetric(%s) when closed", metricName)
+ .isNotNull();
+
+ // which we can implicitly verify by asking for it and
+ // verifying that we get given a different one back
+ // from the demand-created instance
+ MetricsSystem metrics2 = getMetricsSystem();
+ assertThat(metrics2)
+ .describedAs("metric system 2")
+ .isNotSameAs(activeMetrics);
+
+ // this is going to be a no-op
+ instrumentation.close();
+
+ // which we can verify because the metrics system doesn't
+ // get closed this time
+ assertThat(getMetricsSystem())
+ .describedAs("metric system 3")
+ .isSameAs(metrics2);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
index 0ec8d52042..306a79a20a 100644
--- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
+++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
@@ -53,6 +53,8 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN
# for debugging low level S3a operations, uncomment these lines
# Log all S3A classes
log4j.logger.org.apache.hadoop.fs.s3a=DEBUG
+# when logging at trace, the stack of the initialize() call is logged
+#log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=TRACE
#log4j.logger.org.apache.hadoop.fs.s3a.S3AUtils=INFO
#log4j.logger.org.apache.hadoop.fs.s3a.Listing=INFO
log4j.logger.org.apache.hadoop.fs.s3a.SDKV2Upgrade=WARN