diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java index 1bcfe23ee9..1ef30dd7db 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java @@ -29,15 +29,22 @@ class EmptyStorageStatistics extends StorageStatistics { super(name); } + @Override public Iterator getLongStatistics() { return Collections.emptyIterator(); } + @Override public Long getLong(String key) { return null; } + @Override public boolean isTracked(String key) { return false; } + + @Override + public void reset() { + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 9e13a7a808..146bce5b43 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -3619,8 +3619,11 @@ public StorageStatistics provide() { * Reset all statistics for all file systems */ public static synchronized void clearStatistics() { - for(Statistics stat: statisticsTable.values()) { - stat.reset(); + final Iterator iterator = + GlobalStorageStatistics.INSTANCE.iterator(); + while (iterator.hasNext()) { + final StorageStatistics statistics = iterator.next(); + statistics.reset(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java index 8a1eb54c05..8c633f6f35 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java @@ -138,6 +138,7 @@ public Long getLong(String key) { * * @return True only if the statistic is being tracked. */ + @Override public boolean isTracked(String key) { for (String k: KEYS) { if (k.equals(key)) { @@ -146,4 +147,9 @@ public boolean isTracked(String key) { } return false; } + + @Override + public void reset() { + stats.reset(); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java index 750296577c..2dba525e5d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java @@ -66,8 +66,9 @@ public synchronized StorageStatistics get(String name) { * @param provider An object which can create a new StorageStatistics * object if needed. * @return The StorageStatistics object with the given name. - * @throws RuntimeException If the StorageStatisticsProvider provides a new - * StorageStatistics object with the wrong name. + * @throws RuntimeException If the StorageStatisticsProvider provides a null + * object or a new StorageStatistics object with the + * wrong name. */ public synchronized StorageStatistics put(String name, StorageStatisticsProvider provider) { @@ -78,6 +79,10 @@ public synchronized StorageStatistics put(String name, return stats; } stats = provider.provide(); + if (stats == null) { + throw new RuntimeException("StorageStatisticsProvider for " + name + + " should not provide a null StorageStatistics object."); + } if (!stats.getName().equals(name)) { throw new RuntimeException("StorageStatisticsProvider for " + name + " provided a StorageStatistics object for " + stats.getName() + @@ -87,6 +92,15 @@ public synchronized StorageStatistics put(String name, return stats; } + /** + * Reset all global storage statistics. + */ + public synchronized void reset() { + for (StorageStatistics statistics : map.values()) { + statistics.reset(); + } + } + /** * Get an iterator that we can use to iterate throw all the global storage * statistics objects. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java index 0971f10b4b..d987ad084d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java @@ -132,8 +132,7 @@ public String getScheme() { * Get the value of a statistic. * * @return null if the statistic is not being tracked or is not a - * long statistic. - * The value of the statistic, otherwise. + * long statistic. The value of the statistic, otherwise. */ public abstract Long getLong(String key); @@ -143,4 +142,9 @@ public String getScheme() { * @return True only if the statistic is being tracked. */ public abstract boolean isTracked(String key); + + /** + * Reset all the statistic data. + */ + public abstract void reset(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java index d9783e6cde..3d5b6af794 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -77,6 +78,16 @@ public void remove() { public UnionStorageStatistics(String name, StorageStatistics[] stats) { super(name); + + Preconditions.checkArgument(name != null, + "The name of union storage statistics can not be null!"); + Preconditions.checkArgument(stats != null, + "The stats of union storage statistics can not be null!"); + for (StorageStatistics stat : stats) { + Preconditions.checkArgument(stat != null, + "The stats of union storage statistics can not have null element!"); + } + this.stats = stats; } @@ -87,8 +98,8 @@ public Iterator getLongStatistics() { @Override public Long getLong(String key) { - for (int i = 0; i < stats.length; i++) { - Long val = stats[i].getLong(key); + for (StorageStatistics stat : stats) { + Long val = stat.getLong(key); if (val != null) { return val; } @@ -103,11 +114,18 @@ public Long getLong(String key) { */ @Override public boolean isTracked(String key) { - for (int i = 0; i < stats.length; i++) { - if (stats[i].isTracked(key)) { + for (StorageStatistics stat : stats) { + if (stat.isTracked(key)) { return true; } } return false; } + + @Override + public void reset() { + for (StorageStatistics stat : stats) { + stat.reset(); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java index 59c3b8d7fd..8debb69717 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemStorageStatistics.java @@ -77,7 +77,7 @@ public void setup() { } @Test - public void testgetLongStatistics() { + public void testGetLongStatistics() { Iterator iter = storageStatistics.getLongStatistics(); while (iter.hasNext()) { final LongStatistic longStat = iter.next(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java index 84fc925e5b..83d880a248 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java @@ -171,4 +171,11 @@ public boolean isTracked(String key) { return OpType.fromSymbol(key) != null; } + @Override + public void reset() { + for (AtomicLong count : opsCount.values()) { + count.set(0); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDFSOpsCountStatistics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDFSOpsCountStatistics.java index 7578930a4b..d63ef1079d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDFSOpsCountStatistics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestDFSOpsCountStatistics.java @@ -23,19 +23,27 @@ import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import static org.apache.hadoop.util.concurrent.HadoopExecutors.newFixedThreadPool; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -47,25 +55,25 @@ */ public class TestDFSOpsCountStatistics { - private static final DFSOpsCountStatistics STORAGE_STATISTICS = - new DFSOpsCountStatistics(); - private static final Map OP_COUNTER_MAP = new HashMap<>(); + private static final Logger LOG = LoggerFactory.getLogger( + TestDFSOpsCountStatistics.class); private static final String NO_SUCH_OP = "no-such-dfs-operation-dude"; + private final DFSOpsCountStatistics statistics = + new DFSOpsCountStatistics(); + private final Map expectedOpsCountMap = new HashMap<>(); + @Rule public final Timeout globalTimeout = new Timeout(10 * 1000); @Rule public final ExpectedException exception = ExpectedException.none(); - @BeforeClass - public static void setup() { + @Before + public void setup() { for (OpType opType : OpType.values()) { - final Long opCount = RandomUtils.nextLong(0, 100); - OP_COUNTER_MAP.put(opType.getSymbol(), opCount); - for (long i = 0; i < opCount; i++) { - STORAGE_STATISTICS.incrementOpCounter(opType); - } + expectedOpsCountMap.put(opType, new AtomicLong()); } + incrementOpsCountByRandomNumbers(); } /** @@ -84,13 +92,15 @@ public void testOpTypeSymbolsAreUnique() { @Test public void testGetLongStatistics() { short iterations = 0; // number of the iter.hasNext() - final Iterator iter = STORAGE_STATISTICS.getLongStatistics(); + final Iterator iter = statistics.getLongStatistics(); while (iter.hasNext()) { final LongStatistic longStat = iter.next(); assertNotNull(longStat); - assertTrue(OP_COUNTER_MAP.containsKey(longStat.getName())); - assertEquals(OP_COUNTER_MAP.get(longStat.getName()).longValue(), + final OpType opType = OpType.fromSymbol(longStat.getName()); + assertNotNull(opType); + assertTrue(expectedOpsCountMap.containsKey(opType)); + assertEquals(expectedOpsCountMap.get(opType).longValue(), longStat.getValue()); iterations++; } @@ -101,22 +111,103 @@ public void testGetLongStatistics() { @Test public void testGetLong() { - assertNull(STORAGE_STATISTICS.getLong(NO_SUCH_OP)); - - for (OpType opType : OpType.values()) { - final String key = opType.getSymbol(); - assertEquals(OP_COUNTER_MAP.get(key), STORAGE_STATISTICS.getLong(key)); - } + assertNull(statistics.getLong(NO_SUCH_OP)); + verifyStatistics(); } @Test public void testIsTracked() { - assertFalse(STORAGE_STATISTICS.isTracked(NO_SUCH_OP)); + assertFalse(statistics.isTracked(NO_SUCH_OP)); - final Iterator iter = STORAGE_STATISTICS.getLongStatistics(); + final Iterator iter = statistics.getLongStatistics(); while (iter.hasNext()) { final LongStatistic longStatistic = iter.next(); - assertTrue(STORAGE_STATISTICS.isTracked(longStatistic.getName())); + assertTrue(statistics.isTracked(longStatistic.getName())); + } + } + + @Test + public void testReset() { + statistics.reset(); + for (OpType opType : OpType.values()) { + expectedOpsCountMap.get(opType).set(0); + } + + final Iterator iter = statistics.getLongStatistics(); + while (iter.hasNext()) { + final LongStatistic longStat = iter.next(); + assertEquals(0, longStat.getValue()); + } + + incrementOpsCountByRandomNumbers(); + verifyStatistics(); + } + + @Test + public void testCurrentAccess() throws InterruptedException { + final int numThreads = 10; + final ExecutorService threadPool = newFixedThreadPool(numThreads); + + try { + final CountDownLatch allReady = new CountDownLatch(numThreads); + final CountDownLatch startBlocker = new CountDownLatch(1); + final CountDownLatch allDone = new CountDownLatch(numThreads); + final AtomicReference childError = new AtomicReference<>(); + + for (int i = 0; i < numThreads; i++) { + threadPool.submit(new Runnable() { + @Override + public void run() { + allReady.countDown(); + try { + startBlocker.await(); + incrementOpsCountByRandomNumbers(); + } catch (Throwable t) { + LOG.error("Child failed when calling mkdir", t); + childError.compareAndSet(null, t); + } finally { + allDone.countDown(); + } + } + }); + } + + allReady.await(); // wait until all threads are ready + startBlocker.countDown(); // all threads start making directories + allDone.await(); // wait until all threads are done + + assertNull("Child failed with exception.", childError.get()); + verifyStatistics(); + } finally { + threadPool.shutdownNow(); + } + } + + /** + * This is helper method to increment the statistics by random data. + */ + private void incrementOpsCountByRandomNumbers() { + for (OpType opType : OpType.values()) { + final Long randomCount = RandomUtils.nextLong(0, 100); + expectedOpsCountMap.get(opType).addAndGet(randomCount); + for (long i = 0; i < randomCount; i++) { + statistics.incrementOpCounter(opType); + } + } + } + + /** + * We have the expected ops count in {@link #expectedOpsCountMap}, and this + * method is to verify that its ops count is the same as the one in + * {@link #statistics}. + */ + private void verifyStatistics() { + for (OpType opType : OpType.values()) { + assertNotNull(expectedOpsCountMap.get(opType)); + assertNotNull(statistics.getLong(opType.getSymbol())); + assertEquals("Not expected count for operation " + opType.getSymbol(), + expectedOpsCountMap.get(opType).longValue(), + statistics.getLong(opType.getSymbol()).longValue()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index a82e9dceba..50f9f36b23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -573,7 +573,44 @@ public void testDFSClient() throws Exception { if (cluster != null) {cluster.shutdown();} } } - + + /** + * This is to test that the {@link FileSystem#clearStatistics()} resets all + * the global storage statistics. + */ + @Test + public void testClearStatistics() throws Exception { + final Configuration conf = getTestConfiguration(); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + cluster.waitActive(); + FileSystem dfs = cluster.getFileSystem(); + + final Path dir = new Path("/testClearStatistics"); + final long mkdirCount = getOpStatistics(OpType.MKDIRS); + long writeCount = DFSTestUtil.getStatistics(dfs).getWriteOps(); + dfs.mkdirs(dir); + checkOpStatistics(OpType.MKDIRS, mkdirCount + 1); + assertEquals(++writeCount, + DFSTestUtil.getStatistics(dfs).getWriteOps()); + + final long createCount = getOpStatistics(OpType.CREATE); + FSDataOutputStream out = dfs.create(new Path(dir, "tmpFile"), (short)1); + out.write(40); + out.close(); + checkOpStatistics(OpType.CREATE, createCount + 1); + assertEquals(++writeCount, + DFSTestUtil.getStatistics(dfs).getWriteOps()); + + FileSystem.clearStatistics(); + checkOpStatistics(OpType.MKDIRS, 0); + checkOpStatistics(OpType.CREATE, 0); + checkStatistics(dfs, 0, 0, 0); + } finally { + cluster.shutdown(); + } + } + @Test public void testStatistics() throws IOException { FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java index 3a90c6bf83..c1cf7cfcef 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java @@ -107,4 +107,11 @@ public boolean isTracked(String key) { return Statistic.fromSymbol(key) != null; } + @Override + public void reset() { + for (AtomicLong value : opsCount.values()) { + value.set(0); + } + } + }