HADOOP-12107. long running apps may have a huge number of StatisticsData instances under FileSystem (Sangjin Lee via Ming Ma)
This commit is contained in:
parent
460e98f7b3
commit
8e1bdc17d9
@ -490,6 +490,9 @@ Trunk (Unreleased)
|
||||
HADOOP-11347. RawLocalFileSystem#mkdir and create should honor umask (Varun
|
||||
Saxena via Colin P. McCabe)
|
||||
|
||||
HADOOP-12107. long running apps may have a huge number of StatisticsData
|
||||
instances under FileSystem (Sangjin Lee via Ming Ma)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-7761. Improve the performance of raw comparisons. (todd)
|
||||
|
@ -20,7 +20,8 @@
|
||||
import java.io.Closeable;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.lang.ref.PhantomReference;
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
@ -32,7 +33,6 @@
|
||||
import java.util.HashSet;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
@ -2920,16 +2920,6 @@ public static class StatisticsData {
|
||||
volatile int readOps;
|
||||
volatile int largeReadOps;
|
||||
volatile int writeOps;
|
||||
/**
|
||||
* Stores a weak reference to the thread owning this StatisticsData.
|
||||
* This allows us to remove StatisticsData objects that pertain to
|
||||
* threads that no longer exist.
|
||||
*/
|
||||
final WeakReference<Thread> owner;
|
||||
|
||||
StatisticsData(WeakReference<Thread> owner) {
|
||||
this.owner = owner;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add another StatisticsData object to this one.
|
||||
@ -3000,17 +2990,37 @@ private interface StatisticsAggregator<T> {
|
||||
* Thread-local data.
|
||||
*/
|
||||
private final ThreadLocal<StatisticsData> threadData;
|
||||
|
||||
|
||||
/**
|
||||
* List of all thread-local data areas. Protected by the Statistics lock.
|
||||
* Set of all thread-local data areas. Protected by the Statistics lock.
|
||||
* The references to the statistics data are kept using phantom references
|
||||
* to the associated threads. Proper clean-up is performed by the cleaner
|
||||
* thread when the threads are garbage collected.
|
||||
*/
|
||||
private LinkedList<StatisticsData> allData;
|
||||
private final Set<StatisticsDataReference> allData;
|
||||
|
||||
/**
|
||||
* Global reference queue and a cleaner thread that manage statistics data
|
||||
* references from all filesystem instances.
|
||||
*/
|
||||
private static final ReferenceQueue<Thread> STATS_DATA_REF_QUEUE;
|
||||
private static final Thread STATS_DATA_CLEANER;
|
||||
|
||||
static {
|
||||
STATS_DATA_REF_QUEUE = new ReferenceQueue<Thread>();
|
||||
// start a single daemon cleaner thread
|
||||
STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
|
||||
STATS_DATA_CLEANER.
|
||||
setName(StatisticsDataReferenceCleaner.class.getName());
|
||||
STATS_DATA_CLEANER.setDaemon(true);
|
||||
STATS_DATA_CLEANER.start();
|
||||
}
|
||||
|
||||
public Statistics(String scheme) {
|
||||
this.scheme = scheme;
|
||||
this.rootData = new StatisticsData(null);
|
||||
this.rootData = new StatisticsData();
|
||||
this.threadData = new ThreadLocal<StatisticsData>();
|
||||
this.allData = null;
|
||||
this.allData = new HashSet<StatisticsDataReference>();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -3020,7 +3030,7 @@ public Statistics(String scheme) {
|
||||
*/
|
||||
public Statistics(Statistics other) {
|
||||
this.scheme = other.scheme;
|
||||
this.rootData = new StatisticsData(null);
|
||||
this.rootData = new StatisticsData();
|
||||
other.visitAll(new StatisticsAggregator<Void>() {
|
||||
@Override
|
||||
public void accept(StatisticsData data) {
|
||||
@ -3032,6 +3042,63 @@ public Void aggregate() {
|
||||
}
|
||||
});
|
||||
this.threadData = new ThreadLocal<StatisticsData>();
|
||||
this.allData = new HashSet<StatisticsDataReference>();
|
||||
}
|
||||
|
||||
/**
|
||||
* A phantom reference to a thread that also includes the data associated
|
||||
* with that thread. On the thread being garbage collected, it is enqueued
|
||||
* to the reference queue for clean-up.
|
||||
*/
|
||||
private class StatisticsDataReference extends PhantomReference<Thread> {
|
||||
private final StatisticsData data;
|
||||
|
||||
public StatisticsDataReference(StatisticsData data, Thread thread) {
|
||||
super(thread, STATS_DATA_REF_QUEUE);
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public StatisticsData getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs clean-up action when the associated thread is garbage
|
||||
* collected.
|
||||
*/
|
||||
public void cleanUp() {
|
||||
// use the statistics lock for safety
|
||||
synchronized (Statistics.this) {
|
||||
/*
|
||||
* If the thread that created this thread-local data no longer exists,
|
||||
* remove the StatisticsData from our list and fold the values into
|
||||
* rootData.
|
||||
*/
|
||||
rootData.add(data);
|
||||
allData.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Background action to act on references being removed.
|
||||
*/
|
||||
private static class StatisticsDataReferenceCleaner implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
StatisticsDataReference ref =
|
||||
(StatisticsDataReference)STATS_DATA_REF_QUEUE.remove();
|
||||
ref.cleanUp();
|
||||
} catch (Throwable th) {
|
||||
// the cleaner thread should continue to run even if there are
|
||||
// exceptions, including InterruptedException
|
||||
LOG.warn("exception in the cleaner thread but it will continue to "
|
||||
+ "run", th);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -3040,14 +3107,12 @@ public Void aggregate() {
|
||||
public StatisticsData getThreadStatistics() {
|
||||
StatisticsData data = threadData.get();
|
||||
if (data == null) {
|
||||
data = new StatisticsData(
|
||||
new WeakReference<Thread>(Thread.currentThread()));
|
||||
data = new StatisticsData();
|
||||
threadData.set(data);
|
||||
StatisticsDataReference ref =
|
||||
new StatisticsDataReference(data, Thread.currentThread());
|
||||
synchronized(this) {
|
||||
if (allData == null) {
|
||||
allData = new LinkedList<StatisticsData>();
|
||||
}
|
||||
allData.add(data);
|
||||
allData.add(ref);
|
||||
}
|
||||
}
|
||||
return data;
|
||||
@ -3105,21 +3170,9 @@ public void incrementWriteOps(int count) {
|
||||
*/
|
||||
private synchronized <T> T visitAll(StatisticsAggregator<T> visitor) {
|
||||
visitor.accept(rootData);
|
||||
if (allData != null) {
|
||||
for (Iterator<StatisticsData> iter = allData.iterator();
|
||||
iter.hasNext(); ) {
|
||||
StatisticsData data = iter.next();
|
||||
visitor.accept(data);
|
||||
if (data.owner.get() == null) {
|
||||
/*
|
||||
* If the thread that created this thread-local data no
|
||||
* longer exists, remove the StatisticsData from our list
|
||||
* and fold the values into rootData.
|
||||
*/
|
||||
rootData.add(data);
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
for (StatisticsDataReference ref: allData) {
|
||||
StatisticsData data = ref.getData();
|
||||
visitor.accept(data);
|
||||
}
|
||||
return visitor.aggregate();
|
||||
}
|
||||
@ -3226,7 +3279,7 @@ public Integer aggregate() {
|
||||
@Override
|
||||
public String toString() {
|
||||
return visitAll(new StatisticsAggregator<String>() {
|
||||
private StatisticsData total = new StatisticsData(null);
|
||||
private StatisticsData total = new StatisticsData();
|
||||
|
||||
@Override
|
||||
public void accept(StatisticsData data) {
|
||||
@ -3259,7 +3312,7 @@ public String aggregate() {
|
||||
*/
|
||||
public void reset() {
|
||||
visitAll(new StatisticsAggregator<Void>() {
|
||||
private StatisticsData total = new StatisticsData(null);
|
||||
private StatisticsData total = new StatisticsData();
|
||||
|
||||
@Override
|
||||
public void accept(StatisticsData data) {
|
||||
@ -3281,6 +3334,11 @@ public Void aggregate() {
|
||||
public String getScheme() {
|
||||
return scheme;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized int getAllThreadLocalDataSize() {
|
||||
return allData.size();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -18,26 +18,34 @@
|
||||
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import static org.apache.hadoop.fs.FileContextTestHelper.createFile;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
|
||||
import static org.apache.hadoop.fs.FileContextTestHelper.*;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Base class to test {@link FileContext} Statistics.
|
||||
* </p>
|
||||
*/
|
||||
public abstract class FCStatisticsBaseTest {
|
||||
|
||||
static protected int blockSize = 512;
|
||||
static protected int numBlocks = 1;
|
||||
|
||||
@ -102,6 +110,48 @@ public void testStatistics() throws IOException, URISyntaxException {
|
||||
fc.delete(filePath, true);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testStatisticsThreadLocalDataCleanUp() throws Exception {
|
||||
final Statistics stats = new Statistics("test");
|
||||
// create a small thread pool to test the statistics
|
||||
final int size = 2;
|
||||
ExecutorService es = Executors.newFixedThreadPool(size);
|
||||
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
tasks.add(new Callable<Boolean>() {
|
||||
public Boolean call() {
|
||||
// this populates the data set in statistics
|
||||
stats.incrementReadOps(1);
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
// run the threads
|
||||
es.invokeAll(tasks);
|
||||
// assert that the data size is exactly the number of threads
|
||||
final AtomicInteger allDataSize = new AtomicInteger(0);
|
||||
allDataSize.set(stats.getAllThreadLocalDataSize());
|
||||
Assert.assertEquals(size, allDataSize.get());
|
||||
Assert.assertEquals(size, stats.getReadOps());
|
||||
// force the GC to collect the threads by shutting down the thread pool
|
||||
es.shutdownNow();
|
||||
es.awaitTermination(1, TimeUnit.MINUTES);
|
||||
es = null;
|
||||
System.gc();
|
||||
|
||||
// wait for up to 10 seconds
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
int size = stats.getAllThreadLocalDataSize();
|
||||
allDataSize.set(size);
|
||||
return size == 0;
|
||||
}
|
||||
}, 1000, 10*1000);
|
||||
Assert.assertEquals(0, allDataSize.get());
|
||||
Assert.assertEquals(size, stats.getReadOps());
|
||||
}
|
||||
|
||||
/**
|
||||
* Bytes read may be different for different file systems. This method should
|
||||
* throw assertion error if bytes read are incorrect.
|
||||
|
Loading…
Reference in New Issue
Block a user