From 0676495950fc7f2716a9b5ae7da622c3a8450f71 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 23 Sep 2022 09:54:31 +0100 Subject: [PATCH] HADOOP-18456. NullPointerException in ObjectListingIterator. (#4909) This problem surfaced in impala integration tests IMPALA-11592. TestLocalCatalogRetries.test_fetch_metadata_retry fails in S3 build after the change HADOOP-17461. Add thread-level IOStatistics Context The actual GC race condition came with HADOOP-18091. S3A auditing leaks memory through ThreadLocal references The fix for this is, if our hypothesis is correct, in WeakReferenceMap.create() where a strong reference to the new value is kept in a local variable *and referred to later* so that the JVM will not GC it. Along with the fix, extra assertions ensure that if the problem is not fixed, applications will fail faster/more meaningfully. Contributed by Steve Loughran. --- .../fs/impl/WeakReferenceThreadMap.java | 39 ++++++-- .../fs/statistics/IOStatisticsContext.java | 8 +- .../impl/IOStatisticsContextIntegration.java | 11 ++- .../apache/hadoop/util/WeakReferenceMap.java | 97 ++++++++++++++++--- .../hadoop/util/TestWeakReferenceMap.java | 91 ++++++++++++++++- 5 files changed, 220 insertions(+), 26 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java index 16fe0da7c5..06be20310e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java @@ -25,6 +25,8 @@ import org.apache.hadoop.util.WeakReferenceMap; +import static java.util.Objects.requireNonNull; + /** * A WeakReferenceMap for threads. * @param value type of the map @@ -36,30 +38,55 @@ public WeakReferenceThreadMap(final Function factory, super(factory, referenceLost); } + /** + * Get the value for the current thread, creating if needed. + * @return an instance. + */ public V getForCurrentThread() { return get(currentThreadId()); } + /** + * Remove the reference for the current thread. + * @return any reference value which existed. + */ public V removeForCurrentThread() { return remove(currentThreadId()); } + /** + * Get the current thread ID. + * @return thread ID. + */ public long currentThreadId() { return Thread.currentThread().getId(); } + /** + * Set the new value for the current thread. + * @param newVal new reference to set for the active thread. + * @return the previously set value, possibly null + */ public V setForCurrentThread(V newVal) { + requireNonNull(newVal); long id = currentThreadId(); // if the same object is already in the map, just return it. - WeakReference ref = lookup(id); - // Reference value could be set to null. Thus, ref.get() could return - // null. Should be handled accordingly while using the returned value. - if (ref != null && ref.get() == newVal) { - return ref.get(); + WeakReference existingWeakRef = lookup(id); + + // The looked up reference could be one of + // 1. null: nothing there + // 2. valid but get() == null : reference lost by GC. + // 3. different from the new value + // 4. the same as the old value + if (resolve(existingWeakRef) == newVal) { + // case 4: do nothing, return the new value + return newVal; + } else { + // cases 1, 2, 3: update the map and return the old value + return put(id, newVal); } - return put(id, newVal); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java index 557c57ea4d..1876a48bc5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java @@ -20,6 +20,8 @@ import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration; +import static java.util.Objects.requireNonNull; + /** * An interface defined to capture thread-level IOStatistics by using per * thread context. @@ -67,7 +69,11 @@ public interface IOStatisticsContext extends IOStatisticsSource { * @return instance of IOStatisticsContext for the context. */ static IOStatisticsContext getCurrentIOStatisticsContext() { - return IOStatisticsContextIntegration.getCurrentIOStatisticsContext(); + // the null check is just a safety check to highlight exactly where a null value would + // be returned if HADOOP-18456 has resurfaced. + return requireNonNull( + IOStatisticsContextIntegration.getCurrentIOStatisticsContext(), + "Null IOStatisticsContext"); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java index 2a394e6a1c..71fdb1f17b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java @@ -100,7 +100,10 @@ private IOStatisticsContextIntegration() {} * @return an instance of IOStatisticsContext. */ private static IOStatisticsContext createNewInstance(Long key) { - return new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement()); + IOStatisticsContextImpl instance = + new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement()); + LOG.debug("Created instance {}", instance); + return instance; } /** @@ -131,9 +134,11 @@ public static void setThreadIOStatisticsContext( IOStatisticsContext statisticsContext) { if (isThreadIOStatsEnabled) { if (statisticsContext == null) { + // new value is null, so remove it ACTIVE_IOSTATS_CONTEXT.removeForCurrentThread(); - } - if (ACTIVE_IOSTATS_CONTEXT.getForCurrentThread() != statisticsContext) { + } else { + // the setter is efficient in that it does not create a new + // reference if the context is unchanged. ACTIVE_IOSTATS_CONTEXT.setForCurrentThread(statisticsContext); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java index cdc85dcb7c..18d180ee47 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java @@ -28,7 +28,11 @@ import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.store.LogExactlyOnce; import static java.util.Objects.requireNonNull; @@ -52,6 +56,9 @@ @InterfaceAudience.Private public class WeakReferenceMap { + private static final Logger LOG = + LoggerFactory.getLogger(WeakReferenceMap.class); + /** * The reference map. */ @@ -79,6 +86,12 @@ public class WeakReferenceMap { */ private final AtomicLong entriesCreatedCount = new AtomicLong(); + /** + * Log to report loss of a reference during the create phase, which + * is believed to be a cause of HADOOP-18456. + */ + private final LogExactlyOnce referenceLostDuringCreation = new LogExactlyOnce(LOG); + /** * instantiate. * @param factory supplier of new instances @@ -132,35 +145,93 @@ public WeakReference lookup(K key) { * @return an instance. */ public V get(K key) { - final WeakReference current = lookup(key); - V val = resolve(current); - if (val != null) { + final WeakReference currentWeakRef = lookup(key); + // resolve it, after which if not null, we have a strong reference + V strongVal = resolve(currentWeakRef); + if (strongVal != null) { // all good. - return val; + return strongVal; } - // here, either no ref, or the value is null - if (current != null) { + // here, either currentWeakRef was null, or its reference was GC'd. + if (currentWeakRef != null) { + // garbage collection removed the reference. + + // explicitly remove the weak ref from the map if it has not + // been updated by this point + // this is here just for completeness. + map.remove(key, currentWeakRef); + + // log/report the loss. noteLost(key); } + + // create a new value and add it to the map return create(key); } /** * Create a new instance under a key. + *

* The instance is created, added to the map and then the * map value retrieved. * This ensures that the reference returned is that in the map, * even if there is more than one entry being created at the same time. + * If that race does occur, it will be logged the first time it happens + * for this specific map instance. + *

+ * HADOOP-18456 highlighted the risk of a concurrent GC resulting a null + * value being retrieved and so returned. + * To prevent this: + *

    + *
  1. A strong reference is retained to the newly created instance + * in a local variable.
  2. + *
  3. That variable is used after the resolution process, to ensure + * the JVM doesn't consider it "unreachable" and so eligible for GC.
  4. + *
  5. A check is made for the resolved reference being null, and if so, + * the put() is repeated
  6. + *
* @param key key - * @return the value + * @return the created value */ public V create(K key) { entriesCreatedCount.incrementAndGet(); - WeakReference newRef = new WeakReference<>( - requireNonNull(factory.apply(key))); - map.put(key, newRef); - return map.get(key).get(); + /* + Get a strong ref so even if a GC happens in this method the reference is not lost. + It is NOT enough to have a reference in a field, it MUST be used + so as to ensure the reference isn't optimized away prematurely. + "A reachable object is any object that can be accessed in any potential continuing + computation from any live thread." + */ + + final V strongRef = requireNonNull(factory.apply(key), + "factory returned a null instance"); + V resolvedStrongRef; + do { + WeakReference newWeakRef = new WeakReference<>(strongRef); + + // put it in the map + map.put(key, newWeakRef); + + // get it back from the map + WeakReference retrievedWeakRef = map.get(key); + // resolve that reference, handling the situation where somehow it was removed from the map + // between the put() and the get() + resolvedStrongRef = resolve(retrievedWeakRef); + if (resolvedStrongRef == null) { + referenceLostDuringCreation.warn("reference to %s lost during creation", key); + noteLost(key); + } + } while (resolvedStrongRef == null); + + // note if there was any change in the reference. + // as this forces strongRef to be kept in scope + if (strongRef != resolvedStrongRef) { + LOG.debug("Created instance for key {}: {} overwritten by {}", + key, strongRef, resolvedStrongRef); + } + + return resolvedStrongRef; } /** @@ -203,7 +274,7 @@ public boolean containsKey(K key) { * @param r reference to resolve * @return the value or null */ - private V resolve(WeakReference r) { + protected V resolve(WeakReference r) { return r == null ? null : r.get(); } @@ -233,7 +304,7 @@ public int prune() { * @param key key of lost reference */ private void noteLost(final K key) { - // incrment local counter + // increment local counter referenceLostCount.incrementAndGet(); // and call any notification function supplied in the constructor diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java index cf15743ea0..3203de8a96 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java @@ -20,15 +20,19 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Test; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; import org.apache.hadoop.test.AbstractHadoopTestBase; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + /** - * Test {@link WeakReferenceMap}. + * Test {@link WeakReferenceMap} and {@link WeakReferenceThreadMap}. * There's no attempt to force GC here, so the tests are * more about the basic behavior not the handling of empty references. */ @@ -125,11 +129,92 @@ public void testDemandCreateEntries() { } + /** + * It is an error to have a factory which returns null. + */ + @Test + public void testFactoryReturningNull() throws Throwable { + referenceMap = new WeakReferenceMap<>( + (k) -> null, + null); + intercept(NullPointerException.class, () -> + referenceMap.get(0)); + } + + /** + * Test the WeakReferenceThreadMap extension. + */ + @Test + public void testWeakReferenceThreadMapAssignment() + throws Throwable { + + // counters foor the callbacks + final AtomicLong created = new AtomicLong(); + final AtomicLong lost = new AtomicLong(); + + WeakReferenceThreadMap threadMap = new WeakReferenceThreadMap<>( + id -> "Entry for thread ID " + id + " (" + created.incrementAndGet() + ")", + id -> lost.incrementAndGet()); + + Assertions.assertThat(threadMap.setForCurrentThread("hello")) + .describedAs("current thread map value on first set") + .isNull(); + + // second attempt returns itself + Assertions.assertThat(threadMap.setForCurrentThread("hello")) + .describedAs("current thread map value on second set") + .isEqualTo("hello"); + + // it is forbidden to explicitly set to null via the set() call. + intercept(NullPointerException.class, () -> + threadMap.setForCurrentThread(null)); + + // the map is unchanged + Assertions.assertThat(threadMap.getForCurrentThread()) + .describedAs("current thread map value") + .isEqualTo("hello"); + + // remove the value and assert what the removed entry was + Assertions.assertThat(threadMap.removeForCurrentThread()) + .describedAs("removed thread map value") + .isEqualTo("hello"); + + // remove the value again; this time the removed value is null + Assertions.assertThat(threadMap.removeForCurrentThread()) + .describedAs("removed thread map value on second call") + .isNull(); + + // lookup will return a new instance created by the factory + long c1 = created.get(); + String dynamicValue = threadMap.getForCurrentThread(); + Assertions.assertThat(dynamicValue) + .describedAs("dynamically created thread map value") + .startsWith("Entry for thread ID") + .contains("(" + (c1 + 1) + ")"); + + // and we can overwrite that + Assertions.assertThat(threadMap.setForCurrentThread("hello2")) + .describedAs("value before the thread entry is changed") + .isEqualTo(dynamicValue); + + // simulate a weak gc + long threadId = threadMap.currentThreadId(); + threadMap.put(threadId, null); + String updated = threadMap.getForCurrentThread(); + Assertions.assertThat(lost.get()) + .describedAs("lost count") + .isEqualTo(1); + Assertions.assertThat(updated) + .describedAs("dynamically created thread map value") + .startsWith("Entry for thread ID") + .contains("(" + (c1 + 2) + ")"); + } + /** * Assert that the value of a map entry is as expected. * Will trigger entry creation if the key is absent. * @param key key - * @param val expected valued + * @param val expected value */ private void assertMapEntryEquals(int key, String val) { Assertions.assertThat(referenceMap.get(key)) @@ -143,7 +228,7 @@ private void assertMapEntryEquals(int key, String val) { */ private void assertMapContainsKey(int key) { Assertions.assertThat(referenceMap.containsKey(key)) - .describedAs("map enty of key %d should be present", key) + .describedAs("map entry of key %d should be present", key) .isTrue(); }