diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index a9b44e321c..50192ae63b 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -490,6 +490,9 @@ Trunk (Unreleased) HADOOP-11347. RawLocalFileSystem#mkdir and create should honor umask (Varun Saxena via Colin P. McCabe) + HADOOP-12107. long running apps may have a huge number of StatisticsData + instances under FileSystem (Sangjin Lee via Ming Ma) + OPTIMIZATIONS HADOOP-7761. Improve the performance of raw comparisons. (todd) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 3f9e3bd381..1d7bc875cc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -20,7 +20,8 @@ import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; -import java.lang.ref.WeakReference; +import java.lang.ref.PhantomReference; +import java.lang.ref.ReferenceQueue; import java.net.URI; import java.net.URISyntaxException; import java.security.PrivilegedExceptionAction; @@ -32,7 +33,6 @@ import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -2920,16 +2920,6 @@ public static class StatisticsData { volatile int readOps; volatile int largeReadOps; volatile int writeOps; - /** - * Stores a weak reference to the thread owning this StatisticsData. - * This allows us to remove StatisticsData objects that pertain to - * threads that no longer exist. - */ - final WeakReference owner; - - StatisticsData(WeakReference owner) { - this.owner = owner; - } /** * Add another StatisticsData object to this one. @@ -3000,17 +2990,37 @@ private interface StatisticsAggregator { * Thread-local data. */ private final ThreadLocal threadData; - + /** - * List of all thread-local data areas. Protected by the Statistics lock. + * Set of all thread-local data areas. Protected by the Statistics lock. + * The references to the statistics data are kept using phantom references + * to the associated threads. Proper clean-up is performed by the cleaner + * thread when the threads are garbage collected. */ - private LinkedList allData; + private final Set allData; + + /** + * Global reference queue and a cleaner thread that manage statistics data + * references from all filesystem instances. + */ + private static final ReferenceQueue STATS_DATA_REF_QUEUE; + private static final Thread STATS_DATA_CLEANER; + + static { + STATS_DATA_REF_QUEUE = new ReferenceQueue(); + // start a single daemon cleaner thread + STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner()); + STATS_DATA_CLEANER. + setName(StatisticsDataReferenceCleaner.class.getName()); + STATS_DATA_CLEANER.setDaemon(true); + STATS_DATA_CLEANER.start(); + } public Statistics(String scheme) { this.scheme = scheme; - this.rootData = new StatisticsData(null); + this.rootData = new StatisticsData(); this.threadData = new ThreadLocal(); - this.allData = null; + this.allData = new HashSet(); } /** @@ -3020,7 +3030,7 @@ public Statistics(String scheme) { */ public Statistics(Statistics other) { this.scheme = other.scheme; - this.rootData = new StatisticsData(null); + this.rootData = new StatisticsData(); other.visitAll(new StatisticsAggregator() { @Override public void accept(StatisticsData data) { @@ -3032,6 +3042,63 @@ public Void aggregate() { } }); this.threadData = new ThreadLocal(); + this.allData = new HashSet(); + } + + /** + * A phantom reference to a thread that also includes the data associated + * with that thread. On the thread being garbage collected, it is enqueued + * to the reference queue for clean-up. + */ + private class StatisticsDataReference extends PhantomReference { + private final StatisticsData data; + + public StatisticsDataReference(StatisticsData data, Thread thread) { + super(thread, STATS_DATA_REF_QUEUE); + this.data = data; + } + + public StatisticsData getData() { + return data; + } + + /** + * Performs clean-up action when the associated thread is garbage + * collected. + */ + public void cleanUp() { + // use the statistics lock for safety + synchronized (Statistics.this) { + /* + * If the thread that created this thread-local data no longer exists, + * remove the StatisticsData from our list and fold the values into + * rootData. + */ + rootData.add(data); + allData.remove(this); + } + } + } + + /** + * Background action to act on references being removed. + */ + private static class StatisticsDataReferenceCleaner implements Runnable { + @Override + public void run() { + while (true) { + try { + StatisticsDataReference ref = + (StatisticsDataReference)STATS_DATA_REF_QUEUE.remove(); + ref.cleanUp(); + } catch (Throwable th) { + // the cleaner thread should continue to run even if there are + // exceptions, including InterruptedException + LOG.warn("exception in the cleaner thread but it will continue to " + + "run", th); + } + } + } } /** @@ -3040,14 +3107,12 @@ public Void aggregate() { public StatisticsData getThreadStatistics() { StatisticsData data = threadData.get(); if (data == null) { - data = new StatisticsData( - new WeakReference(Thread.currentThread())); + data = new StatisticsData(); threadData.set(data); + StatisticsDataReference ref = + new StatisticsDataReference(data, Thread.currentThread()); synchronized(this) { - if (allData == null) { - allData = new LinkedList(); - } - allData.add(data); + allData.add(ref); } } return data; @@ -3105,21 +3170,9 @@ public void incrementWriteOps(int count) { */ private synchronized T visitAll(StatisticsAggregator visitor) { visitor.accept(rootData); - if (allData != null) { - for (Iterator iter = allData.iterator(); - iter.hasNext(); ) { - StatisticsData data = iter.next(); - visitor.accept(data); - if (data.owner.get() == null) { - /* - * If the thread that created this thread-local data no - * longer exists, remove the StatisticsData from our list - * and fold the values into rootData. - */ - rootData.add(data); - iter.remove(); - } - } + for (StatisticsDataReference ref: allData) { + StatisticsData data = ref.getData(); + visitor.accept(data); } return visitor.aggregate(); } @@ -3226,7 +3279,7 @@ public Integer aggregate() { @Override public String toString() { return visitAll(new StatisticsAggregator() { - private StatisticsData total = new StatisticsData(null); + private StatisticsData total = new StatisticsData(); @Override public void accept(StatisticsData data) { @@ -3259,7 +3312,7 @@ public String aggregate() { */ public void reset() { visitAll(new StatisticsAggregator() { - private StatisticsData total = new StatisticsData(null); + private StatisticsData total = new StatisticsData(); @Override public void accept(StatisticsData data) { @@ -3281,6 +3334,11 @@ public Void aggregate() { public String getScheme() { return scheme; } + + @VisibleForTesting + synchronized int getAllThreadLocalDataSize() { + return allData.size(); + } } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java index 90337a6433..3e33362be2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FCStatisticsBaseTest.java @@ -18,26 +18,34 @@ package org.apache.hadoop.fs; +import static org.apache.hadoop.fs.FileContextTestHelper.createFile; + import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Test; +import com.google.common.base.Supplier; import com.google.common.util.concurrent.Uninterruptibles; -import static org.apache.hadoop.fs.FileContextTestHelper.*; - /** *

* Base class to test {@link FileContext} Statistics. *

*/ public abstract class FCStatisticsBaseTest { - static protected int blockSize = 512; static protected int numBlocks = 1; @@ -102,6 +110,48 @@ public void testStatistics() throws IOException, URISyntaxException { fc.delete(filePath, true); } + @Test(timeout=60000) + public void testStatisticsThreadLocalDataCleanUp() throws Exception { + final Statistics stats = new Statistics("test"); + // create a small thread pool to test the statistics + final int size = 2; + ExecutorService es = Executors.newFixedThreadPool(size); + List> tasks = new ArrayList>(size); + for (int i = 0; i < size; i++) { + tasks.add(new Callable() { + public Boolean call() { + // this populates the data set in statistics + stats.incrementReadOps(1); + return true; + } + }); + } + // run the threads + es.invokeAll(tasks); + // assert that the data size is exactly the number of threads + final AtomicInteger allDataSize = new AtomicInteger(0); + allDataSize.set(stats.getAllThreadLocalDataSize()); + Assert.assertEquals(size, allDataSize.get()); + Assert.assertEquals(size, stats.getReadOps()); + // force the GC to collect the threads by shutting down the thread pool + es.shutdownNow(); + es.awaitTermination(1, TimeUnit.MINUTES); + es = null; + System.gc(); + + // wait for up to 10 seconds + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + int size = stats.getAllThreadLocalDataSize(); + allDataSize.set(size); + return size == 0; + } + }, 1000, 10*1000); + Assert.assertEquals(0, allDataSize.get()); + Assert.assertEquals(size, stats.getReadOps()); + } + /** * Bytes read may be different for different file systems. This method should * throw assertion error if bytes read are incorrect.