diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java index 11681546e3..e188e168e5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java @@ -24,6 +24,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -69,11 +72,16 @@ * {@link #currentAuditContext()} to get the thread-local * context for the caller, which can then be manipulated. * + * For further information, especially related to memory consumption, + * read the document `auditing_architecture` in the `hadoop-aws` module. */ @InterfaceAudience.Public @InterfaceStability.Unstable public final class CommonAuditContext { + private static final Logger LOG = LoggerFactory.getLogger( + CommonAuditContext.class); + /** * Process ID; currently built from UUID and timestamp. */ @@ -92,7 +100,7 @@ public final class CommonAuditContext { * Supplier operations must themselves be thread safe. */ private final Map> evaluatedEntries = - new ConcurrentHashMap<>(); + new ConcurrentHashMap<>(1); static { // process ID is fixed. @@ -108,7 +116,7 @@ public final class CommonAuditContext { * the span is finalized. */ private static final ThreadLocal ACTIVE_CONTEXT = - ThreadLocal.withInitial(() -> createInstance()); + ThreadLocal.withInitial(CommonAuditContext::createInstance); private CommonAuditContext() { } @@ -125,11 +133,21 @@ public Supplier put(String key, String value) { /** * Put a context entry dynamically evaluated on demand. + * Important: as these supplier methods are long-lived, + * the supplier function MUST NOT be part of/refer to + * any object instance of significant memory size. + * Applications SHOULD remove references when they are + * no longer needed. + * When logged at TRACE, prints the key and stack trace of the caller, + * to allow for debugging of any problems. * @param key key * @param value new value * @return old value or null */ public Supplier put(String key, Supplier value) { + if (LOG.isTraceEnabled()) { + LOG.trace("Adding context entry {}", key, new Exception(key)); + } return evaluatedEntries.put(key, value); } @@ -138,6 +156,9 @@ public Supplier put(String key, Supplier value) { * @param key key */ public void remove(String key) { + if (LOG.isTraceEnabled()) { + LOG.trace("Remove context entry {}", key); + } evaluatedEntries.remove(key); } @@ -168,7 +189,7 @@ public void reset() { private void init() { // thread 1 is dynamic - put(PARAM_THREAD1, () -> currentThreadID()); + put(PARAM_THREAD1, CommonAuditContext::currentThreadID); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java new file mode 100644 index 0000000000..b24bef2a81 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.util.function.Consumer; +import java.util.function.Function; +import javax.annotation.Nullable; + +import org.apache.hadoop.util.WeakReferenceMap; + +/** + * A WeakReferenceMap for threads. + * @param value type of the map + */ +public class WeakReferenceThreadMap extends WeakReferenceMap { + + public WeakReferenceThreadMap(final Function factory, + @Nullable final Consumer referenceLost) { + super(factory, referenceLost); + } + + public V getForCurrentThread() { + return get(currentThreadId()); + } + + public V removeForCurrentThread() { + return remove(currentThreadId()); + } + + public long currentThreadId() { + return Thread.currentThread().getId(); + } + + public V setForCurrentThread(V newVal) { + return put(currentThreadId(), newVal); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java new file mode 100644 index 0000000000..cdc85dcb7c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.lang.ref.WeakReference; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import org.apache.hadoop.classification.InterfaceAudience; + +import static java.util.Objects.requireNonNull; + +/** + * A map of keys type K to objects of type V which uses weak references, + * so does lot leak memory through long-lived references + * at the expense of losing references when GC takes place.. + * + * This class is intended be used instead of ThreadLocal storage when + * references are to be cleaned up when the instance holding. + * In this use case, the key is the Long key. + * + * Concurrency. + * The class assumes that map entries are rarely contended for when writing, + * and that not blocking other threads is more important than atomicity. + * - a ConcurrentHashMap is used to map keys to weak references, with + * all its guarantees. + * - there is no automatic pruning. + * - see {@link #create(Object)} for the concurrency semantics on entry creation. + */ +@InterfaceAudience.Private +public class WeakReferenceMap { + + /** + * The reference map. + */ + private final Map> map = new ConcurrentHashMap<>(); + + /** + * Supplier of new instances. + */ + private final Function factory; + + /** + * Nullable callback when a get on a key got a weak reference back. + * The assumption is that this is for logging/stats, which is why + * no attempt is made to use the call as a supplier of a new value. + */ + private final Consumer referenceLost; + + /** + * Counter of references lost. + */ + private final AtomicLong referenceLostCount = new AtomicLong(); + + /** + * Counter of entries created. + */ + private final AtomicLong entriesCreatedCount = new AtomicLong(); + + /** + * instantiate. + * @param factory supplier of new instances + * @param referenceLost optional callback on lost references. + */ + public WeakReferenceMap( + Function factory, + @Nullable final Consumer referenceLost) { + + this.factory = requireNonNull(factory); + this.referenceLost = referenceLost; + } + + @Override + public String toString() { + return "WeakReferenceMap{" + + "size=" + size() + + ", referenceLostCount=" + referenceLostCount + + ", entriesCreatedCount=" + entriesCreatedCount + + '}'; + } + + /** + * Map size. + * @return the current map size. + */ + public int size() { + return map.size(); + } + + /** + * Clear all entries. + */ + public void clear() { + map.clear(); + } + + /** + * look up the value, returning the possibly empty weak reference + * to a value, or null if no value was found. + * @param key key to look up + * @return null if there is no entry, a weak reference if found + */ + public WeakReference lookup(K key) { + return map.get(key); + } + + /** + * Get the value, creating if needed. + * @param key key. + * @return an instance. + */ + public V get(K key) { + final WeakReference current = lookup(key); + V val = resolve(current); + if (val != null) { + // all good. + return val; + } + + // here, either no ref, or the value is null + if (current != null) { + noteLost(key); + } + return create(key); + } + + /** + * Create a new instance under a key. + * The instance is created, added to the map and then the + * map value retrieved. + * This ensures that the reference returned is that in the map, + * even if there is more than one entry being created at the same time. + * @param key key + * @return the value + */ + public V create(K key) { + entriesCreatedCount.incrementAndGet(); + WeakReference newRef = new WeakReference<>( + requireNonNull(factory.apply(key))); + map.put(key, newRef); + return map.get(key).get(); + } + + /** + * Put a value under the key. + * A null value can be put, though on a get() call + * a new entry is generated + * + * @param key key + * @param value value + * @return any old non-null reference. + */ + public V put(K key, V value) { + return resolve(map.put(key, new WeakReference<>(value))); + } + + /** + * Remove any value under the key. + * @param key key + * @return any old non-null reference. + */ + public V remove(K key) { + return resolve(map.remove(key)); + } + + /** + * Does the map have a valid reference for this object? + * no-side effects: there's no attempt to notify or cleanup + * if the reference is null. + * @param key key to look up + * @return true if there is a valid reference. + */ + public boolean containsKey(K key) { + final WeakReference current = lookup(key); + return resolve(current) != null; + } + + /** + * Given a possibly null weak reference, resolve + * its value. + * @param r reference to resolve + * @return the value or null + */ + private V resolve(WeakReference r) { + return r == null ? null : r.get(); + } + + /** + * Prune all null weak references, calling the referenceLost + * callback for each one. + * + * non-atomic and non-blocking. + * @return the number of entries pruned. + */ + public int prune() { + int count = 0; + final Iterator>> it = map.entrySet().iterator(); + while (it.hasNext()) { + final Map.Entry> next = it.next(); + if (next.getValue().get() == null) { + it.remove(); + count++; + noteLost(next.getKey()); + } + } + return count; + } + + /** + * Notify the reference lost callback. + * @param key key of lost reference + */ + private void noteLost(final K key) { + // incrment local counter + referenceLostCount.incrementAndGet(); + + // and call any notification function supplied in the constructor + if (referenceLost != null) { + referenceLost.accept(key); + } + } + + /** + * Get count of references lost as detected + * during prune() or get() calls. + * @return count of references lost + */ + public final long getReferenceLostCount() { + return referenceLostCount.get(); + } + + /** + * Get count of entries created on demand. + * @return count of entries created + */ + public final long getEntriesCreatedCount() { + return entriesCreatedCount.get(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 26126f14c5..7e2023603c 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2025,6 +2025,17 @@ + + + fs.s3a.audit.enabled + true + + Should auditing of S3A requests be enabled? + + + fs.AbstractFileSystem.wasb.impl diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/audit/TestCommonAuditContext.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/audit/TestCommonAuditContext.java index 798841a2d6..9782eb276d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/audit/TestCommonAuditContext.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/audit/TestCommonAuditContext.java @@ -132,6 +132,15 @@ private AbstractStringAssert assertContextValue(final String key) { .describedAs("Value of context element %s", key) .isNotBlank(); } + /** + * Assert a context value is null. + * @param key key to look up + */ + private void assertContextValueIsNull(final String key) { + assertThat(context.get(key)) + .describedAs("Value of context element %s", key) + .isNull(); + } @Test public void testNoteEntryPoint() throws Throwable { @@ -158,4 +167,13 @@ private AbstractStringAssert assertGlobalEntry(final String key) { return anAssert; } + @Test + public void testAddRemove() throws Throwable { + final String key = "testAddRemove"; + assertContextValueIsNull(key); + context.put(key, key); + assertContextValue(key).isEqualTo(key); + context.remove(key); + assertContextValueIsNull(key); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java new file mode 100644 index 0000000000..cf15743ea0 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWeakReferenceMap.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.util.ArrayList; +import java.util.List; + +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.test.AbstractHadoopTestBase; + +/** + * Test {@link WeakReferenceMap}. + * There's no attempt to force GC here, so the tests are + * more about the basic behavior not the handling of empty references. + */ +public class TestWeakReferenceMap extends AbstractHadoopTestBase { + + public static final String FACTORY_STRING = "recreated %d"; + + /** + * The map to test. + */ + private WeakReferenceMap referenceMap; + + /** + * List of references notified of loss. + */ + private List lostReferences; + + @Before + public void setup() { + lostReferences = new ArrayList<>(); + referenceMap = new WeakReferenceMap<>( + this::factory, + this::referenceLost); + } + + /** + * Reference lost callback. + * @param key key lost + */ + private void referenceLost(Integer key) { + lostReferences.add(key); + } + + + /** + * Basic insertions and lookups of those values. + */ + @Test + public void testBasicOperationsWithValidReferences() { + + referenceMap.put(1, "1"); + referenceMap.put(2, "2"); + assertMapSize(2); + assertMapContainsKey(1); + assertMapEntryEquals(1, "1"); + assertMapEntryEquals(2, "2"); + // overwrite + referenceMap.put(1, "3"); + assertMapEntryEquals(1, "3"); + + // remove an entry + referenceMap.remove(1); + assertMapDoesNotContainKey(1); + assertMapSize(1); + + // clear the map + referenceMap.clear(); + assertMapSize(0); + } + + /** + * pruning removes null entries, leaves the others alone. + */ + @Test + public void testPruneNullEntries() { + referenceMap.put(1, "1"); + assertPruned(0); + referenceMap.put(2, null); + assertMapSize(2); + assertPruned(1); + assertMapSize(1); + assertMapDoesNotContainKey(2); + assertMapEntryEquals(1, "1"); + assertLostCount(1); + } + + /** + * Demand create entries. + */ + @Test + public void testDemandCreateEntries() { + + // ask for an unknown key and expect a generated value + assertMapEntryEquals(1, factory(1)); + assertMapSize(1); + assertMapContainsKey(1); + assertLostCount(0); + + // an empty ref has the same outcome + referenceMap.put(2, null); + assertMapEntryEquals(2, factory(2)); + // but the lost coun goes up + assertLostCount(1); + + } + + /** + * Assert that the value of a map entry is as expected. + * Will trigger entry creation if the key is absent. + * @param key key + * @param val expected valued + */ + private void assertMapEntryEquals(int key, String val) { + Assertions.assertThat(referenceMap.get(key)) + .describedAs("map enty of key %d", key) + .isEqualTo(val); + } + + /** + * Assert that a map entry is present. + * @param key key + */ + private void assertMapContainsKey(int key) { + Assertions.assertThat(referenceMap.containsKey(key)) + .describedAs("map enty of key %d should be present", key) + .isTrue(); + } + + /** + * Assert that a map entry is not present. + * @param key key + */ + private void assertMapDoesNotContainKey(int key) { + Assertions.assertThat(referenceMap.containsKey(key)) + .describedAs("map enty of key %d should be absent", key) + .isFalse(); + } + + /** + * Assert map size. + * @param size expected size. + */ + private void assertMapSize(int size) { + Assertions.assertThat(referenceMap.size()) + .describedAs("size of map %s", referenceMap) + .isEqualTo(size); + } + + /** + * Assert prune returned the given count. + * @param count expected count. + */ + private void assertPruned(int count) { + Assertions.assertThat(referenceMap.prune()) + .describedAs("number of entries pruned from map %s", referenceMap) + .isEqualTo(count); + } + + /** + * Assert number of entries lost matches expected count. + * @param count expected count. + */ + private void assertLostCount(int count) { + Assertions.assertThat(lostReferences) + .describedAs("number of entries lost from map %s", referenceMap) + .hasSize(count); + } + + /** + * Factory operation. + * @param key map key + * @return a string + */ + private String factory(Integer key) { + return String.format(FACTORY_STRING, key); + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java index a5a6dbc847..f2963d7319 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java @@ -210,7 +210,7 @@ private RequestInfo writing(final String verb, * @param request request * @return true if the transfer manager creates them. */ - public static final boolean + public static boolean isRequestNotAlwaysInSpan(final Object request) { return request instanceof CopyPartRequest || request instanceof CompleteMultipartUploadRequest diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/OperationAuditor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/OperationAuditor.java index 672bcdf7f9..10c8f871ca 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/OperationAuditor.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/OperationAuditor.java @@ -70,4 +70,15 @@ default boolean checkAccess(Path path, S3AFileStatus status, FsAction mode) * @return ID */ String getAuditorId(); + + /** + * Span reference lost from GC operations. + * This is only called when an attempt is made to retrieve on + * the active thread or when a prune operation is cleaning up. + * + * @param threadId thread ID. + */ + default void noteSpanReferenceLost(long threadId) { + + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java index 09ff0cbfd7..1d76833f8c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/S3AAuditConstants.java @@ -44,7 +44,7 @@ private S3AAuditConstants() { * Default auditing flag. * Value: {@value}. */ - public static final boolean AUDIT_ENABLED_DEFAULT = false; + public static final boolean AUDIT_ENABLED_DEFAULT = true; /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java index 25f97c07a2..3d2102d305 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java @@ -23,6 +23,7 @@ import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.HandlerContextAware; @@ -33,16 +34,17 @@ import com.amazonaws.handlers.HandlerBeforeAttemptContext; import com.amazonaws.handlers.RequestHandler2; import com.amazonaws.http.HttpResponse; -import com.amazonaws.services.s3.transfer.Transfer; import com.amazonaws.services.s3.transfer.internal.TransferStateChangeListener; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.audit.AWSAuditEventCallbacks; @@ -88,6 +90,11 @@ * then the IOStatistics counter for {@code AUDIT_FAILURE} * is incremented. * + * Uses the WeakReferenceThreadMap to store spans for threads. + * Provided a calling class retains a reference to the span, + * the active span will be retained. + * + * */ @InterfaceAudience.Private public final class ActiveAuditManagerS3A @@ -111,6 +118,14 @@ public final class ActiveAuditManagerS3A public static final String NOT_A_WRAPPED_SPAN = "Span attached to request is not a wrapped span"; + /** + * Arbitrary threshold for triggering pruning on deactivation. + * High enough it doesn't happen very often, low enough + * that it will happen regularly on a busy system. + * Value: {@value}. + */ + static final int PRUNE_THRESHOLD = 10_000; + /** * Audit service. */ @@ -127,12 +142,27 @@ public final class ActiveAuditManagerS3A */ private WrappingAuditSpan unbondedSpan; + /** + * How many spans have to be deactivated before a prune is triggered? + * Fixed as a constant for now unless/until some pressing need + * for it to be made configurable ever surfaces. + */ + private final int pruneThreshold = PRUNE_THRESHOLD; + + /** + * Count down to next pruning. + */ + private final AtomicInteger deactivationsBeforePrune = new AtomicInteger(); + /** * Thread local span. This defaults to being * the unbonded span. */ - private final ThreadLocal activeSpan = - ThreadLocal.withInitial(() -> getUnbondedSpan()); + + private final WeakReferenceThreadMap activeSpanMap = + new WeakReferenceThreadMap<>( + (k) -> getUnbondedSpan(), + this::noteSpanReferenceLost); /** * Destination for recording statistics, especially duration/count of @@ -147,6 +177,7 @@ public final class ActiveAuditManagerS3A public ActiveAuditManagerS3A(final IOStatisticsStore iostatistics) { super("ActiveAuditManagerS3A"); this.ioStatisticsStore = iostatistics; + this.deactivationsBeforePrune.set(pruneThreshold); } @Override @@ -178,6 +209,13 @@ protected void serviceStart() throws Exception { LOG.debug("Started audit service {}", auditor); } + @Override + protected void serviceStop() throws Exception { + // clear all references. + activeSpanMap.clear(); + super.serviceStop(); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder(super.toString()); @@ -225,7 +263,7 @@ public AuditSpanS3A getActiveAuditSpan() { * @return the active WrappingAuditSpan */ private WrappingAuditSpan activeSpan() { - return activeSpan.get(); + return activeSpanMap.getForCurrentThread(); } /** @@ -247,13 +285,66 @@ private AuditSpanS3A setActiveThreadSpan(AuditSpanS3A span) { */ private WrappingAuditSpan switchToActiveSpan(WrappingAuditSpan span) { if (span != null && span.isValidSpan()) { - activeSpan.set(span); + activeSpanMap.setForCurrentThread(span); } else { - activeSpan.set(unbondedSpan); + activeSpanMap.removeForCurrentThread(); } return activeSpan(); } + /** + * Span reference lost from GC operations. + * This is only called when an attempt is made to retrieve on + * the active thread or when a prune operation is cleaning up. + * + * @param threadId thread ID. + */ + private void noteSpanReferenceLost(long threadId) { + auditor.noteSpanReferenceLost(threadId); + } + + /** + * Prune all null weak references, calling the referenceLost + * callback for each one. + * + * non-atomic and non-blocking. + * @return the number of entries pruned. + */ + @VisibleForTesting + int prune() { + return activeSpanMap.prune(); + } + + /** + * remove the span from the reference map, shrinking the map in the process. + * if/when a new span is activated in the thread, a new entry will be created. + * and if queried for a span, the unbounded span will be automatically + * added to the map for this thread ID. + * + */ + @VisibleForTesting + boolean removeActiveSpanFromMap() { + // remove from the map + activeSpanMap.removeForCurrentThread(); + if (deactivationsBeforePrune.decrementAndGet() == 0) { + // trigger a prune + activeSpanMap.prune(); + deactivationsBeforePrune.set(pruneThreshold); + return true; + } + return false; + } + + /** + * Get the map of threads to active spans; allows + * for testing of weak reference resolution after GC. + * @return the span map + */ + @VisibleForTesting + WeakReferenceThreadMap getActiveSpanMap() { + return activeSpanMap; + } + /** * The Span ID in the audit manager is the ID of the auditor, * which can be used in the filesystem toString() method @@ -331,13 +422,7 @@ public List createRequestHandlers() @Override public TransferStateChangeListener createStateChangeListener() { final WrappingAuditSpan span = activeSpan(); - return new TransferStateChangeListener() { - @Override - public void transferStateChanged(final Transfer transfer, - final Transfer.TransferState state) { - switchToActiveSpan(span); - } - }; + return (transfer, state) -> switchToActiveSpan(span); } @Override @@ -641,16 +726,21 @@ public AuditSpanS3A activate() { */ @Override public void deactivate() { - // no-op for invalid spans, - // so as to prevent the unbounded span from being closed - // and everything getting very confused. - if (!isValid || !isActive()) { + + // span is inactive; ignore + if (!isActive()) { return; } - // deactivate the span - span.deactivate(); - // and go to the unbounded one. - switchToActiveSpan(getUnbondedSpan()); + // skipped for invalid spans, + // so as to prevent the unbounded span from being closed + // and everything getting very confused. + if (isValid) { + // deactivate the span + span.deactivate(); + } + // remove the span from the reference map, + // sporadically triggering a prune operation. + removeActiveSpanFromMap(); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index b2c0c5e1b8..f08d6448e4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -904,6 +904,8 @@ public void commitJob(JobContext context) throws IOException { jobCompleted(false); abortJobInternal(context, true); throw e; + } finally { + resetCommonContext(); } } @@ -946,6 +948,7 @@ protected void cleanup(JobContext context, } finally { destroyThreadPool(); cleanupStagingDirs(); + resetCommonContext(); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 34bbfd4ed7..c1ecd7d6b9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -169,6 +169,7 @@ public void commitTask(TaskAttemptContext context) throws IOException { // delete the task attempt so there's no possibility of a second attempt deleteTaskAttemptPathQuietly(context); destroyThreadPool(); + resetCommonContext(); } getCommitOperations().taskCompleted(true); LOG.debug("aggregate statistics\n{}", @@ -252,6 +253,7 @@ public void abortTask(TaskAttemptContext context) throws IOException { attemptPath.getFileSystem(context.getConfiguration()), attemptPath, true); destroyThreadPool(); + resetCommonContext(); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 6d75cb2dd3..121ea7f851 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -600,6 +600,7 @@ public void commitTask(TaskAttemptContext context) throws IOException { throw e; } finally { destroyThreadPool(); + resetCommonContext(); } getCommitOperations().taskCompleted(true); } @@ -739,6 +740,7 @@ public void abortTask(TaskAttemptContext context) throws IOException { throw e; } finally { destroyThreadPool(); + resetCommonContext(); } } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md index 54fbdd11a8..7c00462735 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing.md @@ -24,15 +24,17 @@ this document covers its use. ## Important: Auditing is disabled by default -Due to a memory leak from the use of `ThreadLocal` fields, this auditing feature leaks memory as S3A filesystem -instances are created and deleted. -This causes problems in long-lived processes which either do not re-use filesystem +Due to a memory leak from the use of `ThreadLocal` fields, this auditing feature +leaked memory as S3A filesystem instances were created and deleted. +This caused problems in long-lived processes which either do not re-use filesystem instances, or attempt to delete all instances belonging to specific users. See [HADOOP-18091](https://issues.apache.org/jira/browse/HADOOP-18091) _S3A auditing leaks memory through ThreadLocal references_. -To avoid these memory leaks, auditing is disabled by default. +To avoid these memory leaks, auditing was disabled by default in the hadoop 3.3.2 release. -To turn auditing on, set `fs.s3a.audit.enabled` to `true`. +As these memory leaks have now been fixed, auditing has been re-enabled. + +To disable it, set `fs.s3a.audit.enabled` to `false`. ## Auditing workflow @@ -84,7 +86,7 @@ Other auditor classes may be used instead. | Option | Meaning | Default Value | |--------|---------|---------------| -| `fs.s3a.audit.enabled` | Is auditing enabled | `false` | +| `fs.s3a.audit.enabled` | Is auditing enabled? | `true` | | `fs.s3a.audit.service.classname` | Auditor classname | `org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor` | | `fs.s3a.audit.request.handlers` | List of extra subclasses of AWS SDK RequestHandler2 to include in handler chain | `""` | | `fs.s3a.audit.referrer.enabled` | Logging auditor to publish the audit information in the HTTP Referrer header | `true` | @@ -138,7 +140,6 @@ The Logging Auditor is enabled by providing its classname in the option ``` - To print auditing events in the local client logs, set the associated Log4J log to log at debug: diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md index a4f4fe445e..aa35fae92c 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/auditing_architecture.md @@ -119,16 +119,78 @@ The auditor then creates and returns a span for the specific operation. The AuditManagerS3A will automatically activate the span returned by the auditor (i.e. assign it the thread local variable tracking the active span in each thread). -### Memory Leakage through `ThreadLocal` use +### Memory Leakage through `ThreadLocal` misuse -This architecture contains a critical defect, +The original implementation of the integration with the S3AFileSystem class +contained a critical defect, [HADOOP-18091](https://issues.apache.org/jira/browse/HADOOP-18091) _S3A auditing leaks memory through ThreadLocal references_. -The code was written assuming that when the `ActiveAuditManagerS3A` service is -stopped, it's `ThreadLocal` fields would be freed. -In fact, they are retained until the threads with references are terminated. +The original code was written with the assumption that when the `ActiveAuditManagerS3A` service was +garbage collected, references in its `ThreadLocal` field would be freed. +In fact, they are retained until all threads with references are terminated. +If any long-lived thread had performed an s3 operation which created a span, +a reference back to the audit manager instance was created +*whose lifetime was that of the thread* + +In short-lived processes, and long-lived processes where a limited set of +`S3AFileSystem` instances were reused, this had no adverse effect. +Indeed, if the filesystem instances were retained in the cache until +the process was shut down, there would be strong references to the owning +`S3AFileSystem` instance anyway. + +Where it did have problems was when the following conditions were met +1. Process was long-lived. +2. Long-lived threads in the process invoked filesystem operations on `s3a://` URLs. +3. Either `S3AFileSystem` instances were created repeatedly, rather than retrieved + from the cache of active instances. +4. Or, after a query for a specific user was completed, + `FileSystem.closeAllForUGI(UserGroupInformation)` was invoked to remove all + cached FS instances of that user. + +Conditions 1, 2 and 4 are exactly those which long-lived Hive services can +encounter. + +_Auditing was disabled by default until a fix was implemented._ + +The memory leak has been fixed using a new class `org.apache.hadoop.util.WeakReferenceMap` +to store a map of thread IDs to active spans. When the S3A filesystem is closed, +its audit manager service is stopped and all references to spans removed from the +map of thread ID to span. + +Weak References are used for the map so that span references do not consume memory even if +threads are terminated without resetting the span reference of that thread. +There is therefore a theoretical risk that if a garbage collection takes place during +execution of a spanned operation, the reference will be lost. + +This is not considered an issue as all bounded entry points into the S3A filesystem +retain a strong reference to their audit span. + +All entry points which return an object which can invoke s3 operations (input and output +streams, list iterators, etc.) also retain a strong reference to their span, a reference +they activate before performing S3 operations. +A factory method is provided to demand-generate a new span if, somehow, these conditions +are not met. The "unbounded span" is used here. +Except in deployments where `fs.s3a.audit.reject.out.of.span.operations` is true, +invoking S3 operations within the unbounded span are permitted. +That option is set to `true` within S3A test suites. +Therefore it is unlikely that any operations are invoked in unbounded spans except +for the special case of copy operations invoked by the transfer manager threads. +Those are already ignored in the logging auditor, whose unbounded span ignores +requests which `AWSRequestAnalyzer.isRequestNotAlwaysInSpan()` indicates +may happen outside of a span. +This is restricted to bucket location probes possibly performed by the SDK +on instantiation, and copy part/complete calls. + + +```java + public static boolean + isRequestNotAlwaysInSpan(final Object request) { + return request instanceof CopyPartRequest + || request instanceof CompleteMultipartUploadRequest + || request instanceof GetBucketLocationRequest; + } +``` -This is why auditing is now disabled by default until a fix is implemented. ### Class `org.apache.hadoop.fs.audit.CommonAuditContext` @@ -149,6 +211,39 @@ Spans may be used on different thread from that which they were created. Spans MUST always use the values from the `currentAuditContext()` in the creation thread. +#### Memory Usage of `CommonAuditContext` + +The `CommonAuditContext` map has a `ThreadLocal` field to store global +information which is intended to span multiple operations across multiple +filesystems, for example the MapReduce or Spark job ID, which is set +in the S3A committers. + +Applications and Hadoop code MUST NOT attach context entries +which directly or indirectly consumes lots of memory, as the life +of that memory use will become that of the thread. + +Applications and Hadoop code SHOULD remove context entries when +no-longer needed. + +If memory leakage is suspected here, set the log +`org.apache.hadoop.fs.audit.CommonAuditContext` to `TRACE` +to log the origin of operations which add log entries. + +This will produce a log entry whose stack trace will show the caller chain. +``` +2022-01-26 16:10:28,384 TRACE audit.CommonAuditContext (CommonAuditContext.java:put(149)) - Adding context entry t1 +java.lang.Exception: t1 + at org.apache.hadoop.fs.audit.CommonAuditContext.put(CommonAuditContext.java:149) + at org.apache.hadoop.fs.audit.CommonAuditContext.init(CommonAuditContext.java:192) + at org.apache.hadoop.fs.audit.CommonAuditContext.createInstance(CommonAuditContext.java:210) + at org.apache.hadoop.fs.audit.CommonAuditContext.lambda$static$0(CommonAuditContext.java:119) + at java.lang.ThreadLocal$SuppliedThreadLocal.initialValue(ThreadLocal.java:284) + at java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:180) + at java.lang.ThreadLocal.get(ThreadLocal.java:170) + at org.apache.hadoop.fs.audit.CommonAuditContext.currentAuditContext(CommonAuditContext.java:219) + at org.apache.hadoop.fs.audit.TestCommonAuditContext.(TestCommonAuditContext.java:54) +``` + ### class `NoopAuditor` diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java new file mode 100644 index 0000000000..63e7922001 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/MemoryHungryAuditor.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit; + +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.s3a.audit.impl.AbstractAuditSpanImpl; +import org.apache.hadoop.fs.s3a.audit.impl.AbstractOperationAuditor; + + +/** + * An audit service which consumes lots of memory. + */ +public class MemoryHungryAuditor extends AbstractOperationAuditor { + + public static final String NAME = "org.apache.hadoop.fs.s3a.audit.MemoryHungryAuditor"; + + private static final Logger LOG = + LoggerFactory.getLogger(MemoryHungryAuditor.class); + /** + * How big is each manager? + */ + public static final int MANAGER_SIZE = 10 * 1024 * 1024; + + /** + * How big is each span? + */ + public static final int SPAN_SIZE = 512 * 1024; + + private static final AtomicLong INSTANCE_COUNT = new AtomicLong(); + + private final AtomicLong spanCount = new AtomicLong(); + + private final byte[] data = new byte[MANAGER_SIZE]; + + /** + * unbonded span created on demand. + */ + private AuditSpanS3A unbondedSpan; + + + /** + * Constructor. + */ + public MemoryHungryAuditor() { + super("MemoryHungryAuditor"); + INSTANCE_COUNT.incrementAndGet(); + } + + public long getSpanCount() { + return spanCount.get(); + } + + @Override + public AuditSpanS3A createSpan( + final String operation, + @Nullable final String path1, + @Nullable final String path2) { + spanCount.incrementAndGet(); + return new MemorySpan(createSpanID(), operation); + } + + @Override + public AuditSpanS3A getUnbondedSpan() { + if (unbondedSpan == null) { + unbondedSpan = new MemorySpan(createSpanID(), "unbonded"); + } + return unbondedSpan; + } + + @Override + public String toString() { + return String.format("%s instance %d span count %d", + super.toString(), + getInstanceCount(), + getSpanCount()); + } + + @Override + public void noteSpanReferenceLost(final long threadId) { + LOG.info("Span lost for thread {}", threadId); + } + + public static long getInstanceCount() { + return INSTANCE_COUNT.get(); + } + + /** + * A span which consumes a lot of memory. + */ + private static final class MemorySpan extends AbstractAuditSpanImpl { + + private final byte[] data = new byte[SPAN_SIZE]; + + private MemorySpan(final String spanId, final String operationName) { + super(spanId, operationName); + } + + @Override + public AuditSpanS3A activate() { + return this; + } + + @Override + public void deactivate() { + } + + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java new file mode 100644 index 0000000000..901347d29d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/impl/TestActiveAuditManagerThreadLeakage.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.audit.impl; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.impl.WeakReferenceThreadMap; +import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; +import org.apache.hadoop.fs.s3a.audit.MemoryHungryAuditor; +import org.apache.hadoop.test.AbstractHadoopTestBase; + +import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.AUDIT_SERVICE_CLASSNAME; +import static org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A.PRUNE_THRESHOLD; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore; + +/** + * This test attempts to recreate the OOM problems of + * HADOOP-18091. S3A auditing leaks memory through ThreadLocal references + * it does this by creating a thread pool, then + * creates and destroys FS instances, with threads in + * the pool (but not the main JUnit test thread) creating + * audit spans. + * + * With a custom audit span with a large memory footprint, + * memory demands will be high, and if the closed instances + * don't get cleaned up, memory runs out. + * GCs are forced. + * It is critical no spans are created in the junit thread because they will + * last for the duration of the test JVM. + */ +@SuppressWarnings("ResultOfMethodCallIgnored") +public class TestActiveAuditManagerThreadLeakage extends AbstractHadoopTestBase { + /** + * Logging. + */ + private static final Logger LOG = + LoggerFactory.getLogger(TestActiveAuditManagerThreadLeakage.class); + + /** how many managers to sequentially create. */ + private static final int MANAGER_COUNT = 500; + + /** size of long lived thread pool. */ + private static final int THREAD_COUNT = 20; + private ExecutorService workers; + + /** + * count of prunings which have taken place in the manager lifecycle + * operations. + */ + private int pruneCount; + + /** + * As audit managers are created they are added to the list, + * so we can verify they get GC'd. + */ + private final List> auditManagers = + new ArrayList<>(); + + @After + public void teardown() { + if (workers != null) { + workers.shutdown(); + } + } + + + /** + * When the service is stopped, the span map is + * cleared immediately. + */ + @Test + public void testSpanMapClearedInServiceStop() throws IOException { + try (ActiveAuditManagerS3A auditManager = + new ActiveAuditManagerS3A(emptyStatisticsStore())) { + auditManager.init(createMemoryHungryConfiguration()); + auditManager.start(); + auditManager.getActiveAuditSpan(); + // get the span map + final WeakReferenceThreadMap spanMap + = auditManager.getActiveSpanMap(); + Assertions.assertThat(spanMap.size()) + .describedAs("map size") + .isEqualTo(1); + auditManager.stop(); + Assertions.assertThat(spanMap.size()) + .describedAs("map size") + .isEqualTo(0); + } + } + + @Test + public void testMemoryLeak() throws Throwable { + workers = Executors.newFixedThreadPool(THREAD_COUNT); + for (int i = 0; i < MANAGER_COUNT; i++) { + final long oneAuditConsumption = createAndTestOneAuditor(); + LOG.info("manager {} memory retained {}", i, oneAuditConsumption); + } + + // pruning must have taken place. + // that's somewhat implicit in the test not going OOM. + // but if memory allocation in test runs increase, it + // may cease to hold. in which case: create more + // audit managers + LOG.info("Total prune count {}", pruneCount); + + Assertions.assertThat(pruneCount) + .describedAs("Total prune count") + .isNotZero(); + + // now count number of audit managers GC'd + // some must have been GC'd, showing that no other + // references are being retained internally. + Assertions.assertThat(auditManagers.stream() + .filter((r) -> r.get() == null) + .count()) + .describedAs("number of audit managers garbage collected") + .isNotZero(); + } + + /** + * Create, use and then shutdown one auditor in a unique thread. + * @return memory consumed/released + */ + private long createAndTestOneAuditor() throws Exception { + long original = Runtime.getRuntime().freeMemory(); + ExecutorService factory = Executors.newSingleThreadExecutor(); + + try { + pruneCount += factory.submit(this::createAuditorAndWorkers).get(); + } finally { + factory.shutdown(); + factory.awaitTermination(60, TimeUnit.SECONDS); + } + + + final long current = Runtime.getRuntime().freeMemory(); + return current - original; + + } + + /** + * This is the core of the leakage test. + * Create an audit manager and spans across multiple threads. + * The spans are created in the long-lived pool, so if there is + * any bonding of the life of managers/spans to that of threads, + * it will surface as OOM events. + * @return count of weak references whose reference values were + * nullified. + */ + private int createAuditorAndWorkers() + throws IOException, InterruptedException, ExecutionException { + try (ActiveAuditManagerS3A auditManager = + new ActiveAuditManagerS3A(emptyStatisticsStore())) { + auditManager.init(createMemoryHungryConfiguration()); + auditManager.start(); + LOG.info("Using {}", auditManager); + auditManagers.add(new WeakReference<>(auditManager)); + + // no guarantee every thread gets used, so track + // in a set. This will give us the thread ID of every + // entry in the map. + + Set threadIds = new HashSet<>(); + + List> futures = new ArrayList<>(THREAD_COUNT); + + // perform the spanning operation in a long lived thread. + for (int i = 0; i < THREAD_COUNT; i++) { + futures.add(workers.submit(() -> spanningOperation(auditManager))); + } + + // get the results and so determine the thread IDs + for (Future future : futures) { + final Result r = future.get(); + threadIds.add(r.getThreadId()); + } + + final int threadsUsed = threadIds.size(); + final Long[] threadIdArray = threadIds.toArray(new Long[0]); + + // gc + System.gc(); + // get the span map + final WeakReferenceThreadMap spanMap + = auditManager.getActiveSpanMap(); + + // count number of spans removed + final long derefenced = threadIds.stream() + .filter((id) -> !spanMap.containsKey(id)) + .count(); + if (derefenced > 0) { + LOG.info("{} executed across {} threads and dereferenced {} entries", + auditManager, threadsUsed, derefenced); + } + + // resolve not quite all of the threads. + // why not all? leaves at least one for pruning + // but it does complicate some of the assertions... + int spansRecreated = 0; + int subset = threadIdArray.length - 1; + LOG.info("Resolving {} thread references", subset); + for (int i = 0; i < subset; i++) { + final long id = threadIdArray[i]; + + // note whether or not the span is present + final boolean present = spanMap.containsKey(id); + + // get the the span for that ID. which must never be + // null + Assertions.assertThat(spanMap.get(id)) + .describedAs("Span map entry for thread %d", id) + .isNotNull(); + + // if it wasn't present, the unbounded span must therefore have been + // bounded to this map entry. + if (!present) { + spansRecreated++; + } + } + LOG.info("Recreated {} spans", subset); + + // if the number of spans lost is more than the number + // of entries not probed, then at least one span was + // recreated + if (derefenced > threadIdArray.length - subset) { + Assertions.assertThat(spansRecreated) + .describedAs("number of recreated spans") + .isGreaterThan(0); + } + + // now prune. + int pruned = auditManager.prune(); + if (pruned > 0) { + LOG.info("{} executed across {} threads and pruned {} entries", + auditManager, threadsUsed, pruned); + } + Assertions.assertThat(pruned) + .describedAs("Count of references pruned") + .isEqualTo(derefenced - spansRecreated); + return pruned + (int) derefenced; + } + + } + + private Configuration createMemoryHungryConfiguration() { + final Configuration conf = new Configuration(false); + conf.set(AUDIT_SERVICE_CLASSNAME, MemoryHungryAuditor.NAME); + return conf; + } + + /** + * The operation in each worker thread. + * @param auditManager audit manager + * @return result of the call + * @throws IOException troluble + */ + private Result spanningOperation(final ActiveAuditManagerS3A auditManager) + throws IOException { + auditManager.getActiveAuditSpan(); + final AuditSpanS3A auditSpan = + auditManager.createSpan("span", null, null); + Assertions.assertThat(auditSpan) + .describedAs("audit span for current thread") + .isNotNull(); + // this is needed to ensure that more of the thread pool is used up + Thread.yield(); + return new Result(Thread.currentThread().getId()); + } + + /** + * Result of the spanning operation. + */ + private static final class Result { + /** thread operation took place in. */ + private final long threadId; + + + private Result(final long threadId) { + this.threadId = threadId; + } + + private long getThreadId() { + return threadId; + } + + + } + + /** + * Verify that pruning takes place intermittently. + */ + @Test + public void testRegularPruning() throws Throwable { + try (ActiveAuditManagerS3A auditManager = + new ActiveAuditManagerS3A(emptyStatisticsStore())) { + auditManager.init(createMemoryHungryConfiguration()); + auditManager.start(); + // get the span map + final WeakReferenceThreadMap spanMap + = auditManager.getActiveSpanMap(); + // add a null entry at a thread ID other than this one + spanMap.put(Thread.currentThread().getId() + 1, null); + + // remove this span enough times that pruning shall take + // place twice + // this verifies that pruning takes place and that the + // counter is reset + int pruningCount = 0; + for (int i = 0; i < PRUNE_THRESHOLD * 2 + 1; i++) { + boolean pruned = auditManager.removeActiveSpanFromMap(); + if (pruned) { + pruningCount++; + } + } + // pruning must have taken place + Assertions.assertThat(pruningCount) + .describedAs("Intermittent pruning count") + .isEqualTo(2); + } + } + + /** + * Verify span deactivation removes the entry from the map. + */ + @Test + public void testSpanDeactivationRemovesEntryFromMap() throws Throwable { + try (ActiveAuditManagerS3A auditManager = + new ActiveAuditManagerS3A(emptyStatisticsStore())) { + auditManager.init(createMemoryHungryConfiguration()); + auditManager.start(); + // get the span map + final WeakReferenceThreadMap spanMap + = auditManager.getActiveSpanMap(); + final AuditSpanS3A auditSpan = + auditManager.createSpan("span", null, null); + Assertions.assertThat(auditManager.getActiveAuditSpan()) + .describedAs("active span") + .isSameAs(auditSpan); + // this assert gets used repeatedly, so define a lambda-exp + // which can be envoked with different arguments + Consumer assertMapHasKey = expected -> + Assertions.assertThat(spanMap.containsKey(spanMap.currentThreadId())) + .describedAs("map entry for current thread") + .isEqualTo(expected); + + // sets the span to null + auditSpan.deactivate(); + + // there's no entry + assertMapHasKey.accept(false); + + // asking for the current span will return the unbonded one + final AuditSpanS3A newSpan = auditManager.getActiveAuditSpan(); + Assertions.assertThat(newSpan) + .describedAs("active span") + .isNotNull() + .matches(s -> !s.isValidSpan()); + // which is in the map + // there's an entry + assertMapHasKey.accept(true); + + // deactivating the old span does nothing + auditSpan.deactivate(); + assertMapHasKey.accept(true); + + // deactivating the current unbounded span does + // remove the entry + newSpan.deactivate(); + assertMapHasKey.accept(false); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index 637d015a09..fc287e9845 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -82,3 +82,6 @@ log4j.logger.org.apache.hadoop.fs.s3a.S3AStorageStatistics=INFO #log4j.logger.org.apache.hadoop.fs.s3a.audit=DEBUG # log request creation, span lifecycle and other low-level details #log4j.logger.org.apache.hadoop.fs.s3a.audit=TRACE + +# uncomment this to trace where context entries are set +# log4j.logger.org.apache.hadoop.fs.audit.CommonAuditContext=TRACE