HADOOP-13283. Support reset operation for new global storage statistics and per FS storage stats. Contributed by Mingliang Liu.
This commit is contained in:
parent
a0035661c1
commit
6e597600f7
@ -29,15 +29,22 @@ class EmptyStorageStatistics extends StorageStatistics {
|
|||||||
super(name);
|
super(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Iterator<LongStatistic> getLongStatistics() {
|
public Iterator<LongStatistic> getLongStatistics() {
|
||||||
return Collections.emptyIterator();
|
return Collections.emptyIterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public Long getLong(String key) {
|
public Long getLong(String key) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isTracked(String key) {
|
public boolean isTracked(String key) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -3619,8 +3619,11 @@ public StorageStatistics provide() {
|
|||||||
* Reset all statistics for all file systems
|
* Reset all statistics for all file systems
|
||||||
*/
|
*/
|
||||||
public static synchronized void clearStatistics() {
|
public static synchronized void clearStatistics() {
|
||||||
for(Statistics stat: statisticsTable.values()) {
|
final Iterator<StorageStatistics> iterator =
|
||||||
stat.reset();
|
GlobalStorageStatistics.INSTANCE.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
final StorageStatistics statistics = iterator.next();
|
||||||
|
statistics.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,6 +138,7 @@ public Long getLong(String key) {
|
|||||||
*
|
*
|
||||||
* @return True only if the statistic is being tracked.
|
* @return True only if the statistic is being tracked.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public boolean isTracked(String key) {
|
public boolean isTracked(String key) {
|
||||||
for (String k: KEYS) {
|
for (String k: KEYS) {
|
||||||
if (k.equals(key)) {
|
if (k.equals(key)) {
|
||||||
@ -146,4 +147,9 @@ public boolean isTracked(String key) {
|
|||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
stats.reset();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -66,8 +66,9 @@ public synchronized StorageStatistics get(String name) {
|
|||||||
* @param provider An object which can create a new StorageStatistics
|
* @param provider An object which can create a new StorageStatistics
|
||||||
* object if needed.
|
* object if needed.
|
||||||
* @return The StorageStatistics object with the given name.
|
* @return The StorageStatistics object with the given name.
|
||||||
* @throws RuntimeException If the StorageStatisticsProvider provides a new
|
* @throws RuntimeException If the StorageStatisticsProvider provides a null
|
||||||
* StorageStatistics object with the wrong name.
|
* object or a new StorageStatistics object with the
|
||||||
|
* wrong name.
|
||||||
*/
|
*/
|
||||||
public synchronized StorageStatistics put(String name,
|
public synchronized StorageStatistics put(String name,
|
||||||
StorageStatisticsProvider provider) {
|
StorageStatisticsProvider provider) {
|
||||||
@ -78,6 +79,10 @@ public synchronized StorageStatistics put(String name,
|
|||||||
return stats;
|
return stats;
|
||||||
}
|
}
|
||||||
stats = provider.provide();
|
stats = provider.provide();
|
||||||
|
if (stats == null) {
|
||||||
|
throw new RuntimeException("StorageStatisticsProvider for " + name +
|
||||||
|
" should not provide a null StorageStatistics object.");
|
||||||
|
}
|
||||||
if (!stats.getName().equals(name)) {
|
if (!stats.getName().equals(name)) {
|
||||||
throw new RuntimeException("StorageStatisticsProvider for " + name +
|
throw new RuntimeException("StorageStatisticsProvider for " + name +
|
||||||
" provided a StorageStatistics object for " + stats.getName() +
|
" provided a StorageStatistics object for " + stats.getName() +
|
||||||
@ -87,6 +92,15 @@ public synchronized StorageStatistics put(String name,
|
|||||||
return stats;
|
return stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset all global storage statistics.
|
||||||
|
*/
|
||||||
|
public synchronized void reset() {
|
||||||
|
for (StorageStatistics statistics : map.values()) {
|
||||||
|
statistics.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get an iterator that we can use to iterate throw all the global storage
|
* Get an iterator that we can use to iterate throw all the global storage
|
||||||
* statistics objects.
|
* statistics objects.
|
||||||
|
@ -132,8 +132,7 @@ public String getScheme() {
|
|||||||
* Get the value of a statistic.
|
* Get the value of a statistic.
|
||||||
*
|
*
|
||||||
* @return null if the statistic is not being tracked or is not a
|
* @return null if the statistic is not being tracked or is not a
|
||||||
* long statistic.
|
* long statistic. The value of the statistic, otherwise.
|
||||||
* The value of the statistic, otherwise.
|
|
||||||
*/
|
*/
|
||||||
public abstract Long getLong(String key);
|
public abstract Long getLong(String key);
|
||||||
|
|
||||||
@ -143,4 +142,9 @@ public String getScheme() {
|
|||||||
* @return True only if the statistic is being tracked.
|
* @return True only if the statistic is being tracked.
|
||||||
*/
|
*/
|
||||||
public abstract boolean isTracked(String key);
|
public abstract boolean isTracked(String key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset all the statistic data.
|
||||||
|
*/
|
||||||
|
public abstract void reset();
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.NoSuchElementException;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
@ -77,6 +78,16 @@ public void remove() {
|
|||||||
|
|
||||||
public UnionStorageStatistics(String name, StorageStatistics[] stats) {
|
public UnionStorageStatistics(String name, StorageStatistics[] stats) {
|
||||||
super(name);
|
super(name);
|
||||||
|
|
||||||
|
Preconditions.checkArgument(name != null,
|
||||||
|
"The name of union storage statistics can not be null!");
|
||||||
|
Preconditions.checkArgument(stats != null,
|
||||||
|
"The stats of union storage statistics can not be null!");
|
||||||
|
for (StorageStatistics stat : stats) {
|
||||||
|
Preconditions.checkArgument(stat != null,
|
||||||
|
"The stats of union storage statistics can not have null element!");
|
||||||
|
}
|
||||||
|
|
||||||
this.stats = stats;
|
this.stats = stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,8 +98,8 @@ public Iterator<LongStatistic> getLongStatistics() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long getLong(String key) {
|
public Long getLong(String key) {
|
||||||
for (int i = 0; i < stats.length; i++) {
|
for (StorageStatistics stat : stats) {
|
||||||
Long val = stats[i].getLong(key);
|
Long val = stat.getLong(key);
|
||||||
if (val != null) {
|
if (val != null) {
|
||||||
return val;
|
return val;
|
||||||
}
|
}
|
||||||
@ -103,11 +114,18 @@ public Long getLong(String key) {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean isTracked(String key) {
|
public boolean isTracked(String key) {
|
||||||
for (int i = 0; i < stats.length; i++) {
|
for (StorageStatistics stat : stats) {
|
||||||
if (stats[i].isTracked(key)) {
|
if (stat.isTracked(key)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
for (StorageStatistics stat : stats) {
|
||||||
|
stat.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -77,7 +77,7 @@ public void setup() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testgetLongStatistics() {
|
public void testGetLongStatistics() {
|
||||||
Iterator<LongStatistic> iter = storageStatistics.getLongStatistics();
|
Iterator<LongStatistic> iter = storageStatistics.getLongStatistics();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
final LongStatistic longStat = iter.next();
|
final LongStatistic longStat = iter.next();
|
||||||
|
@ -171,4 +171,11 @@ public boolean isTracked(String key) {
|
|||||||
return OpType.fromSymbol(key) != null;
|
return OpType.fromSymbol(key) != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
for (AtomicLong count : opsCount.values()) {
|
||||||
|
count.set(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,19 +23,27 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||||
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.util.concurrent.HadoopExecutors.newFixedThreadPool;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
@ -47,25 +55,25 @@
|
|||||||
*/
|
*/
|
||||||
public class TestDFSOpsCountStatistics {
|
public class TestDFSOpsCountStatistics {
|
||||||
|
|
||||||
private static final DFSOpsCountStatistics STORAGE_STATISTICS =
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
new DFSOpsCountStatistics();
|
TestDFSOpsCountStatistics.class);
|
||||||
private static final Map<String, Long> OP_COUNTER_MAP = new HashMap<>();
|
|
||||||
private static final String NO_SUCH_OP = "no-such-dfs-operation-dude";
|
private static final String NO_SUCH_OP = "no-such-dfs-operation-dude";
|
||||||
|
|
||||||
|
private final DFSOpsCountStatistics statistics =
|
||||||
|
new DFSOpsCountStatistics();
|
||||||
|
private final Map<OpType, AtomicLong> expectedOpsCountMap = new HashMap<>();
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final Timeout globalTimeout = new Timeout(10 * 1000);
|
public final Timeout globalTimeout = new Timeout(10 * 1000);
|
||||||
@Rule
|
@Rule
|
||||||
public final ExpectedException exception = ExpectedException.none();
|
public final ExpectedException exception = ExpectedException.none();
|
||||||
|
|
||||||
@BeforeClass
|
@Before
|
||||||
public static void setup() {
|
public void setup() {
|
||||||
for (OpType opType : OpType.values()) {
|
for (OpType opType : OpType.values()) {
|
||||||
final Long opCount = RandomUtils.nextLong(0, 100);
|
expectedOpsCountMap.put(opType, new AtomicLong());
|
||||||
OP_COUNTER_MAP.put(opType.getSymbol(), opCount);
|
|
||||||
for (long i = 0; i < opCount; i++) {
|
|
||||||
STORAGE_STATISTICS.incrementOpCounter(opType);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
incrementOpsCountByRandomNumbers();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -84,13 +92,15 @@ public void testOpTypeSymbolsAreUnique() {
|
|||||||
@Test
|
@Test
|
||||||
public void testGetLongStatistics() {
|
public void testGetLongStatistics() {
|
||||||
short iterations = 0; // number of the iter.hasNext()
|
short iterations = 0; // number of the iter.hasNext()
|
||||||
final Iterator<LongStatistic> iter = STORAGE_STATISTICS.getLongStatistics();
|
final Iterator<LongStatistic> iter = statistics.getLongStatistics();
|
||||||
|
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
final LongStatistic longStat = iter.next();
|
final LongStatistic longStat = iter.next();
|
||||||
assertNotNull(longStat);
|
assertNotNull(longStat);
|
||||||
assertTrue(OP_COUNTER_MAP.containsKey(longStat.getName()));
|
final OpType opType = OpType.fromSymbol(longStat.getName());
|
||||||
assertEquals(OP_COUNTER_MAP.get(longStat.getName()).longValue(),
|
assertNotNull(opType);
|
||||||
|
assertTrue(expectedOpsCountMap.containsKey(opType));
|
||||||
|
assertEquals(expectedOpsCountMap.get(opType).longValue(),
|
||||||
longStat.getValue());
|
longStat.getValue());
|
||||||
iterations++;
|
iterations++;
|
||||||
}
|
}
|
||||||
@ -101,22 +111,103 @@ public void testGetLongStatistics() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetLong() {
|
public void testGetLong() {
|
||||||
assertNull(STORAGE_STATISTICS.getLong(NO_SUCH_OP));
|
assertNull(statistics.getLong(NO_SUCH_OP));
|
||||||
|
verifyStatistics();
|
||||||
for (OpType opType : OpType.values()) {
|
|
||||||
final String key = opType.getSymbol();
|
|
||||||
assertEquals(OP_COUNTER_MAP.get(key), STORAGE_STATISTICS.getLong(key));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIsTracked() {
|
public void testIsTracked() {
|
||||||
assertFalse(STORAGE_STATISTICS.isTracked(NO_SUCH_OP));
|
assertFalse(statistics.isTracked(NO_SUCH_OP));
|
||||||
|
|
||||||
final Iterator<LongStatistic> iter = STORAGE_STATISTICS.getLongStatistics();
|
final Iterator<LongStatistic> iter = statistics.getLongStatistics();
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
final LongStatistic longStatistic = iter.next();
|
final LongStatistic longStatistic = iter.next();
|
||||||
assertTrue(STORAGE_STATISTICS.isTracked(longStatistic.getName()));
|
assertTrue(statistics.isTracked(longStatistic.getName()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReset() {
|
||||||
|
statistics.reset();
|
||||||
|
for (OpType opType : OpType.values()) {
|
||||||
|
expectedOpsCountMap.get(opType).set(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Iterator<LongStatistic> iter = statistics.getLongStatistics();
|
||||||
|
while (iter.hasNext()) {
|
||||||
|
final LongStatistic longStat = iter.next();
|
||||||
|
assertEquals(0, longStat.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
incrementOpsCountByRandomNumbers();
|
||||||
|
verifyStatistics();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCurrentAccess() throws InterruptedException {
|
||||||
|
final int numThreads = 10;
|
||||||
|
final ExecutorService threadPool = newFixedThreadPool(numThreads);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final CountDownLatch allReady = new CountDownLatch(numThreads);
|
||||||
|
final CountDownLatch startBlocker = new CountDownLatch(1);
|
||||||
|
final CountDownLatch allDone = new CountDownLatch(numThreads);
|
||||||
|
final AtomicReference<Throwable> childError = new AtomicReference<>();
|
||||||
|
|
||||||
|
for (int i = 0; i < numThreads; i++) {
|
||||||
|
threadPool.submit(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
allReady.countDown();
|
||||||
|
try {
|
||||||
|
startBlocker.await();
|
||||||
|
incrementOpsCountByRandomNumbers();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error("Child failed when calling mkdir", t);
|
||||||
|
childError.compareAndSet(null, t);
|
||||||
|
} finally {
|
||||||
|
allDone.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
allReady.await(); // wait until all threads are ready
|
||||||
|
startBlocker.countDown(); // all threads start making directories
|
||||||
|
allDone.await(); // wait until all threads are done
|
||||||
|
|
||||||
|
assertNull("Child failed with exception.", childError.get());
|
||||||
|
verifyStatistics();
|
||||||
|
} finally {
|
||||||
|
threadPool.shutdownNow();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is helper method to increment the statistics by random data.
|
||||||
|
*/
|
||||||
|
private void incrementOpsCountByRandomNumbers() {
|
||||||
|
for (OpType opType : OpType.values()) {
|
||||||
|
final Long randomCount = RandomUtils.nextLong(0, 100);
|
||||||
|
expectedOpsCountMap.get(opType).addAndGet(randomCount);
|
||||||
|
for (long i = 0; i < randomCount; i++) {
|
||||||
|
statistics.incrementOpCounter(opType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We have the expected ops count in {@link #expectedOpsCountMap}, and this
|
||||||
|
* method is to verify that its ops count is the same as the one in
|
||||||
|
* {@link #statistics}.
|
||||||
|
*/
|
||||||
|
private void verifyStatistics() {
|
||||||
|
for (OpType opType : OpType.values()) {
|
||||||
|
assertNotNull(expectedOpsCountMap.get(opType));
|
||||||
|
assertNotNull(statistics.getLong(opType.getSymbol()));
|
||||||
|
assertEquals("Not expected count for operation " + opType.getSymbol(),
|
||||||
|
expectedOpsCountMap.get(opType).longValue(),
|
||||||
|
statistics.getLong(opType.getSymbol()).longValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -574,6 +574,43 @@ public void testDFSClient() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is to test that the {@link FileSystem#clearStatistics()} resets all
|
||||||
|
* the global storage statistics.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testClearStatistics() throws Exception {
|
||||||
|
final Configuration conf = getTestConfiguration();
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
FileSystem dfs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
final Path dir = new Path("/testClearStatistics");
|
||||||
|
final long mkdirCount = getOpStatistics(OpType.MKDIRS);
|
||||||
|
long writeCount = DFSTestUtil.getStatistics(dfs).getWriteOps();
|
||||||
|
dfs.mkdirs(dir);
|
||||||
|
checkOpStatistics(OpType.MKDIRS, mkdirCount + 1);
|
||||||
|
assertEquals(++writeCount,
|
||||||
|
DFSTestUtil.getStatistics(dfs).getWriteOps());
|
||||||
|
|
||||||
|
final long createCount = getOpStatistics(OpType.CREATE);
|
||||||
|
FSDataOutputStream out = dfs.create(new Path(dir, "tmpFile"), (short)1);
|
||||||
|
out.write(40);
|
||||||
|
out.close();
|
||||||
|
checkOpStatistics(OpType.CREATE, createCount + 1);
|
||||||
|
assertEquals(++writeCount,
|
||||||
|
DFSTestUtil.getStatistics(dfs).getWriteOps());
|
||||||
|
|
||||||
|
FileSystem.clearStatistics();
|
||||||
|
checkOpStatistics(OpType.MKDIRS, 0);
|
||||||
|
checkOpStatistics(OpType.CREATE, 0);
|
||||||
|
checkStatistics(dfs, 0, 0, 0);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStatistics() throws IOException {
|
public void testStatistics() throws IOException {
|
||||||
FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
|
FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
|
||||||
|
@ -107,4 +107,11 @@ public boolean isTracked(String key) {
|
|||||||
return Statistic.fromSymbol(key) != null;
|
return Statistic.fromSymbol(key) != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
for (AtomicLong value : opsCount.values()) {
|
||||||
|
value.set(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user