diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java new file mode 100644 index 0000000000..1bcfe23ee9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.util.Collections; +import java.util.Iterator; + +/** + * EmptyStorageStatistics is a StorageStatistics implementation which has no + * data. + */ +class EmptyStorageStatistics extends StorageStatistics { + EmptyStorageStatistics(String name) { + super(name); + } + + public Iterator getLongStatistics() { + return Collections.emptyIterator(); + } + + public Long getLong(String key) { + return null; + } + + public boolean isTracked(String key) { + return false; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index b37676368f..574027e786 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -49,6 +49,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.AclEntry; @@ -3560,7 +3561,7 @@ public abstract class FileSystem extends Configured implements Closeable { /** * Get the Map of Statistics object indexed by URI Scheme. * @return a Map having a key as URI scheme and value as Statistics object - * @deprecated use {@link #getAllStatistics} instead + * @deprecated use {@link #getGlobalStorageStatistics()} */ @Deprecated public static synchronized Map getStatistics() { @@ -3572,8 +3573,10 @@ public abstract class FileSystem extends Configured implements Closeable { } /** - * Return the FileSystem classes that have Statistics + * Return the FileSystem classes that have Statistics. + * @deprecated use {@link #getGlobalStorageStatistics()} */ + @Deprecated public static synchronized List getAllStatistics() { return new ArrayList(statisticsTable.values()); } @@ -3582,13 +3585,23 @@ public abstract class FileSystem extends Configured implements Closeable { * Get the statistics for a particular file system * @param cls the class to lookup * @return a statistics object + * @deprecated use {@link #getGlobalStorageStatistics()} */ - public static synchronized - Statistics getStatistics(String scheme, Class cls) { + @Deprecated + public static synchronized Statistics getStatistics(final String scheme, + Class cls) { Statistics result = statisticsTable.get(cls); if (result == null) { - result = new Statistics(scheme); - statisticsTable.put(cls, result); + final Statistics newStats = new Statistics(scheme); + statisticsTable.put(cls, newStats); + result = newStats; + GlobalStorageStatistics.INSTANCE.put(scheme, + new StorageStatisticsProvider() { + @Override + public StorageStatistics provide() { + return new FileSystemStorageStatistics(scheme, newStats); + } + }); } return result; } @@ -3628,4 +3641,26 @@ public abstract class FileSystem extends Configured implements Closeable { public static void enableSymlinks() { symlinksEnabled = true; } + + /** + * Get the StorageStatistics for this FileSystem object. These statistics are + * per-instance. They are not shared with any other FileSystem object. + * + *

This is a default method which is intended to be overridden by + * subclasses. The default implementation returns an empty storage statistics + * object.

+ * + * @return The StorageStatistics for this FileSystem instance. + * Will never be null. + */ + public StorageStatistics getStorageStatistics() { + return new EmptyStorageStatistics(getUri().toString()); + } + + /** + * Get the global storage statistics. + */ + public static GlobalStorageStatistics getGlobalStorageStatistics() { + return GlobalStorageStatistics.INSTANCE; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java new file mode 100644 index 0000000000..14f7cdd02e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData; + +/** + * A basic StorageStatistics instance which simply returns data from + * FileSystem#Statistics. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class FileSystemStorageStatistics extends StorageStatistics { + /** + * The per-class FileSystem statistics. + */ + private final FileSystem.Statistics stats; + + private static final String[] KEYS = { + "bytesRead", + "bytesWritten", + "readOps", + "largeReadOps", + "writeOps", + "bytesReadLocalHost", + "bytesReadDistanceOfOneOrTwo", + "bytesReadDistanceOfThreeOrFour", + "bytesReadDistanceOfFiveOrLarger" + }; + + private static class LongStatisticIterator + implements Iterator { + private final StatisticsData data; + + private int keyIdx; + + LongStatisticIterator(StatisticsData data) { + this.data = data; + this.keyIdx = 0; + } + + @Override + public boolean hasNext() { + return (this.keyIdx < KEYS.length); + } + + @Override + public LongStatistic next() { + if (this.keyIdx >= KEYS.length) { + throw new NoSuchElementException(); + } + String key = KEYS[this.keyIdx++]; + Long val = fetch(data, key); + return new LongStatistic(key, val.longValue()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private static Long fetch(StatisticsData data, String key) { + switch (key) { + case "bytesRead": + return data.getBytesRead(); + case "bytesWritten": + return data.getBytesWritten(); + case "readOps": + return Long.valueOf(data.getReadOps()); + case "largeReadOps": + return Long.valueOf(data.getLargeReadOps()); + case "writeOps": + return Long.valueOf(data.getWriteOps()); + case "bytesReadLocalHost": + return data.getBytesReadLocalHost(); + case "bytesReadDistanceOfOneOrTwo": + return data.getBytesReadDistanceOfOneOrTwo(); + case "bytesReadDistanceOfThreeOrFour": + return data.getBytesReadDistanceOfThreeOrFour(); + case "bytesReadDistanceOfFiveOrLarger": + return data.getBytesReadDistanceOfFiveOrLarger(); + default: + return null; + } + } + + FileSystemStorageStatistics(String name, FileSystem.Statistics stats) { + super(name); + this.stats = stats; + } + + @Override + public Iterator getLongStatistics() { + return new LongStatisticIterator(stats.getData()); + } + + @Override + public Long getLong(String key) { + return fetch(stats.getData(), key); + } + + /** + * Return true if a statistic is being tracked. + * + * @return True only if the statistic is being tracked. + */ + public boolean isTracked(String key) { + for (String k: KEYS) { + if (k.equals(key)) { + return true; + } + } + return false; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java new file mode 100644 index 0000000000..f22e78c763 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.NoSuchElementException; +import java.util.TreeMap; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Stores global storage statistics objects. + */ +@InterfaceAudience.Public +public enum GlobalStorageStatistics { + /** + * The GlobalStorageStatistics singleton. + */ + INSTANCE; + + /** + * A map of all global StorageStatistics objects, indexed by name. + */ + private final NavigableMap map = new TreeMap<>(); + + /** + * A callback API for creating new StorageStatistics instances. + */ + public interface StorageStatisticsProvider { + StorageStatistics provide(); + } + + /** + * Get the StorageStatistics object with the given name. + * + * @param name The storage statistics object name. + * @return The StorageStatistics object with the given name, or + * null if there is none. + */ + public synchronized StorageStatistics get(String name) { + return map.get(name); + } + + /** + * Create or return the StorageStatistics object with the given name. + * + * @param name The storage statistics object name. + * @param provider An object which can create a new StorageStatistics + * object if needed. + * @return The StorageStatistics object with the given name. + * @throws RuntimeException If the StorageStatisticsProvider provides a new + * StorageStatistics object with the wrong name. + */ + public synchronized StorageStatistics put(String name, + StorageStatisticsProvider provider) { + StorageStatistics stats = map.get(name); + if (stats != null) { + return stats; + } + stats = provider.provide(); + if (!stats.getName().equals(name)) { + throw new RuntimeException("StorageStatisticsProvider for " + name + + " provided a StorageStatistics object for " + stats.getName() + + " instead."); + } + map.put(name, stats); + return stats; + } + + /** + * Get an iterator that we can use to iterate throw all the global storage + * statistics objects. + */ + synchronized public Iterator iterator() { + Entry first = map.firstEntry(); + return new StorageIterator((first == null) ? null : first.getValue()); + } + + private class StorageIterator implements Iterator { + private StorageStatistics next = null; + + StorageIterator(StorageStatistics first) { + this.next = first; + } + + @Override + public boolean hasNext() { + return (next != null); + } + + @Override + public StorageStatistics next() { + if (next == null) { + throw new NoSuchElementException(); + } + synchronized (GlobalStorageStatistics.this) { + StorageStatistics cur = next; + Entry nextEntry = + map.higherEntry(cur.getName()); + next = (nextEntry == null) ? null : nextEntry.getValue(); + return cur; + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java new file mode 100644 index 0000000000..4bdef80d6b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.util.Iterator; + +/** + * StorageStatistics contains statistics data for a FileSystem or FileContext + * instance. + */ +@InterfaceAudience.Public +public abstract class StorageStatistics { + /** + * A 64-bit storage statistic. + */ + public static class LongStatistic { + private final String name; + private final long value; + + public LongStatistic(String name, long value) { + this.name = name; + this.value = value; + } + + /** + * @return The name of this statistic. + */ + public String getName() { + return name; + } + + /** + * @return The value of this statistic. + */ + public long getValue() { + return value; + } + } + + private final String name; + + public StorageStatistics(String name) { + this.name = name; + } + + /** + * Get the name of this StorageStatistics object. + */ + public String getName() { + return name; + } + + /** + * Get an iterator over all the currently tracked long statistics. + * + * The values returned will depend on the type of FileSystem or FileContext + * object. The values do not necessarily reflect a snapshot in time. + */ + public abstract Iterator getLongStatistics(); + + /** + * Get the value of a statistic. + * + * @return null if the statistic is not being tracked or is not a + * long statistic. + * The value of the statistic, otherwise. + */ + public abstract Long getLong(String key); + + /** + * Return true if a statistic is being tracked. + * + * @return True only if the statistic is being tracked. + */ + public abstract boolean isTracked(String key); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java new file mode 100644 index 0000000000..d9783e6cde --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A StorageStatistics instance which combines the outputs of several other + * StorageStatistics instances. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class UnionStorageStatistics extends StorageStatistics { + /** + * The underlying StorageStatistics. + */ + private final StorageStatistics[] stats; + + private class LongStatisticIterator implements Iterator { + private int statIdx; + + private Iterator cur; + + LongStatisticIterator() { + this.statIdx = 0; + this.cur = null; + } + + @Override + public boolean hasNext() { + return (getIter() != null); + } + + private Iterator getIter() { + while ((cur == null) || (!cur.hasNext())) { + if (stats.length >= statIdx) { + return null; + } + cur = stats[statIdx++].getLongStatistics(); + } + return cur; + } + + @Override + public LongStatistic next() { + Iterator iter = getIter(); + if (iter == null) { + throw new NoSuchElementException(); + } + return iter.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + public UnionStorageStatistics(String name, StorageStatistics[] stats) { + super(name); + this.stats = stats; + } + + @Override + public Iterator getLongStatistics() { + return new LongStatisticIterator(); + } + + @Override + public Long getLong(String key) { + for (int i = 0; i < stats.length; i++) { + Long val = stats[i].getLong(key); + if (val != null) { + return val; + } + } + return null; + } + + /** + * Return true if a statistic is being tracked. + * + * @return True only if the statistic is being tracked. + */ + @Override + public boolean isTracked(String key) { + for (int i = 0; i < stats.length; i++) { + if (stats[i].isTracked(key)) { + return true; + } + } + return false; + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java index 3ecce8f05d..e1312bc00b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java @@ -132,6 +132,7 @@ public class TestFilterFileSystem { public Path fixRelativePart(Path p); public ContentSummary getContentSummary(Path f); public QuotaUsage getQuotaUsage(Path f); + StorageStatistics getStorageStatistics(); } @Test diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java index a8795cc9c5..d2020b9b72 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java @@ -221,6 +221,7 @@ public class TestHarFileSystem { public Path getTrashRoot(Path path) throws IOException; public Collection getTrashRoots(boolean allUsers) throws IOException; + StorageStatistics getStorageStatistics(); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java new file mode 100644 index 0000000000..d58a59f0c0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.fs.StorageStatistics; + +import java.util.EnumMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This storage statistics tracks how many times each DFS operation was issued. + * + * For each tracked DFS operation, there is a respective entry in the enum + * {@link OpType}. To use, increment the value the {@link DistributedFileSystem} + * and {@link org.apache.hadoop.hdfs.web.WebHdfsFileSystem}. + * + * This class is thread safe, and is generally shared by multiple threads. + */ +public class DFSOpsCountStatistics extends StorageStatistics { + + /** This is for counting file system operations. */ + public enum OpType { + ALLOW_SNAPSHOT("allowSnapshot"), + APPEND("append"), + CONCAT("concat"), + COPY_FROM_LOCAL_FILE("copyFromLocalFile"), + CREATE("create"), + CREATE_NON_RECURSIVE("createNonRecursive"), + CREATE_SNAPSHOT("createSnapshot"), + CREATE_SYM_LINK("createSymlink"), + DELETE("delete"), + DELETE_SNAPSHOT("deleteSnapshot"), + DISALLOW_SNAPSHOT("disallowSnapshot"), + EXISTS("exists"), + GET_BYTES_WITH_FUTURE_GS("getBytesWithFutureGenerationStamps"), + GET_CONTENT_SUMMARY("getContentSummary"), + GET_FILE_BLOCK_LOCATIONS("getFileBlockLocations"), + GET_FILE_CHECKSUM("getFileChecksum"), + GET_FILE_LINK_STATUS("getFileLinkStatus"), + GET_FILE_STATUS("getFileStatus"), + GET_LINK_TARGET("getLinkTarget"), + GET_QUOTA_USAGE("getQuotaUsage"), + GET_STATUS("getStatus"), + GET_STORAGE_POLICIES("getStoragePolicies"), + GET_STORAGE_POLICY("getStoragePolicy"), + GET_XATTR("getXAttr"), + LIST_LOCATED_STATUS("listLocatedStatus"), + LIST_STATUS("listStatus"), + MKDIRS("mkdirs"), + MODIFY_ACL_ENTRIES("modifyAclEntries"), + OPEN("open"), + PRIMITIVE_CREATE("primitiveCreate"), + PRIMITIVE_MKDIR("primitiveMkdir"), + REMOVE_ACL("removeAcl"), + REMOVE_ACL_ENTRIES("removeAclEntries"), + REMOVE_DEFAULT_ACL("removeDefaultAcl"), + REMOVE_XATTR("removeXAttr"), + RENAME("rename"), + RENAME_SNAPSHOT("renameSnapshot"), + RESOLVE_LINK("resolveLink"), + SET_ACL("setAcl"), + SET_OWNER("setOwner"), + SET_PERMISSION("setPermission"), + SET_REPLICATION("setReplication"), + SET_STORAGE_POLICY("setStoragePolicy"), + SET_TIMES("setTimes"), + SET_XATTR("setXAttr"), + TRUNCATE("truncate"), + UNSET_STORAGE_POLICY("unsetStoragePolicy"); + + private final String symbol; + + OpType(String symbol) { + this.symbol = symbol; + } + + public String getSymbol() { + return symbol; + } + + public static OpType fromSymbol(String symbol) { + if (symbol != null) { + for (OpType opType : values()) { + if (opType.getSymbol().equals(symbol)) { + return opType; + } + } + } + return null; + } + } + + public static final String NAME = "DFSOpsCountStatistics"; + + private final Map opsCount = new EnumMap<>(OpType.class); + + public DFSOpsCountStatistics() { + super(NAME); + for (OpType opType : OpType.values()) { + opsCount.put(opType, new AtomicLong(0)); + } + } + + public void incrementOpCounter(OpType op) { + opsCount.get(op).addAndGet(1); + } + + private class LongIterator implements Iterator { + private Iterator> iterator = + opsCount.entrySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public LongStatistic next() { + if (!iterator.hasNext()) { + throw new NoSuchElementException(); + } + final Entry entry = iterator.next(); + return new LongStatistic(entry.getKey().name(), entry.getValue().get()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + @Override + public Iterator getLongStatistics() { + return new LongIterator(); + } + + @Override + public Long getLong(String key) { + final OpType type = OpType.fromSymbol(key); + return type == null ? null : opsCount.get(type).get(); + } + + @Override + public boolean isTracked(String key) { + return OpType.fromSymbol(key) == null; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 7a562654d4..0ae4d70d28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -47,8 +47,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystemLinkResolver; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.GlobalStorageStatistics; +import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; @@ -65,6 +68,7 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator; +import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -95,7 +99,6 @@ import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - /**************************************************************** * Implementation of the abstract FileSystem for the DFS system. * This object is the way end-user code interacts with a Hadoop @@ -113,6 +116,8 @@ public class DistributedFileSystem extends FileSystem { DFSClient dfs; private boolean verifyChecksum = true; + private DFSOpsCountStatistics storageStatistics; + static{ HdfsConfiguration.init(); } @@ -150,6 +155,15 @@ public class DistributedFileSystem extends FileSystem { this.dfs = new DFSClient(uri, conf, statistics); this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority()); this.workingDir = getHomeDirectory(); + + storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE + .put(DFSOpsCountStatistics.NAME, + new StorageStatisticsProvider() { + @Override + public StorageStatistics provide() { + return new DFSOpsCountStatistics(); + } + }); } @Override @@ -214,6 +228,7 @@ public class DistributedFileSystem extends FileSystem { public BlockLocation[] getFileBlockLocations(Path p, final long start, final long len) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS); final Path absF = fixRelativePart(p); return new FileSystemLinkResolver() { @Override @@ -264,6 +279,7 @@ public class DistributedFileSystem extends FileSystem { public FSDataInputStream open(Path f, final int bufferSize) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.OPEN); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -300,6 +316,7 @@ public class DistributedFileSystem extends FileSystem { public FSDataOutputStream append(Path f, final EnumSet flag, final int bufferSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.APPEND); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -332,6 +349,7 @@ public class DistributedFileSystem extends FileSystem { final int bufferSize, final Progressable progress, final InetSocketAddress[] favoredNodes) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.APPEND); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -375,6 +393,7 @@ public class DistributedFileSystem extends FileSystem { final Progressable progress, final InetSocketAddress[] favoredNodes) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -408,6 +427,7 @@ public class DistributedFileSystem extends FileSystem { final Progressable progress, final ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -432,6 +452,7 @@ public class DistributedFileSystem extends FileSystem { short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.PRIMITIVE_CREATE); final DFSOutputStream dfsos = dfs.primitiveCreate( getPathName(fixRelativePart(f)), absolutePermission, flag, true, replication, blockSize, @@ -448,6 +469,7 @@ public class DistributedFileSystem extends FileSystem { final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE); if (flag.contains(CreateFlag.OVERWRITE)) { flag.add(CreateFlag.CREATE); } @@ -473,6 +495,7 @@ public class DistributedFileSystem extends FileSystem { public boolean setReplication(Path src, final short replication) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_REPLICATION); Path absF = fixRelativePart(src); return new FileSystemLinkResolver() { @Override @@ -497,6 +520,7 @@ public class DistributedFileSystem extends FileSystem { public void setStoragePolicy(final Path src, final String policyName) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_STORAGE_POLICY); Path absF = fixRelativePart(src); new FileSystemLinkResolver() { @Override @@ -517,6 +541,7 @@ public class DistributedFileSystem extends FileSystem { public void unsetStoragePolicy(final Path src) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.UNSET_STORAGE_POLICY); Path absF = fixRelativePart(src); new FileSystemLinkResolver() { @Override @@ -541,6 +566,7 @@ public class DistributedFileSystem extends FileSystem { @Override public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICY); Path absF = fixRelativePart(path); return new FileSystemLinkResolver() { @@ -571,6 +597,7 @@ public class DistributedFileSystem extends FileSystem { */ public long getBytesWithFutureGenerationStamps() throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_BYTES_WITH_FUTURE_GS); return dfs.getBytesInFutureBlocks(); } @@ -581,6 +608,7 @@ public class DistributedFileSystem extends FileSystem { @Deprecated public BlockStoragePolicy[] getStoragePolicies() throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICIES); return dfs.getStoragePolicies(); } @@ -595,6 +623,7 @@ public class DistributedFileSystem extends FileSystem { @Override public void concat(Path trg, Path [] psrcs) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CONCAT); // Make target absolute Path absF = fixRelativePart(trg); // Make all srcs absolute @@ -639,6 +668,7 @@ public class DistributedFileSystem extends FileSystem { @Override public boolean rename(Path src, Path dst) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME); final Path absSrc = fixRelativePart(src); final Path absDst = fixRelativePart(dst); @@ -673,6 +703,7 @@ public class DistributedFileSystem extends FileSystem { public void rename(Path src, Path dst, final Options.Rename... options) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME); final Path absSrc = fixRelativePart(src); final Path absDst = fixRelativePart(dst); // Try the rename without resolving first @@ -701,6 +732,7 @@ public class DistributedFileSystem extends FileSystem { @Override public boolean truncate(Path f, final long newLength) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.TRUNCATE); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -718,6 +750,7 @@ public class DistributedFileSystem extends FileSystem { @Override public boolean delete(Path f, final boolean recursive) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.DELETE); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -735,6 +768,7 @@ public class DistributedFileSystem extends FileSystem { @Override public ContentSummary getContentSummary(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -752,6 +786,7 @@ public class DistributedFileSystem extends FileSystem { @Override public QuotaUsage getQuotaUsage(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_QUOTA_USAGE); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -836,6 +871,7 @@ public class DistributedFileSystem extends FileSystem { stats[i] = partialListing[i].makeQualified(getUri(), p); } statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_STATUS); return stats; } @@ -850,6 +886,7 @@ public class DistributedFileSystem extends FileSystem { listing.add(fileStatus.makeQualified(getUri(), p)); } statistics.incrementLargeReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_STATUS); // now fetch more entries do { @@ -864,6 +901,7 @@ public class DistributedFileSystem extends FileSystem { listing.add(fileStatus.makeQualified(getUri(), p)); } statistics.incrementLargeReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_STATUS); } while (thisListing.hasMore()); return listing.toArray(new FileStatus[listing.size()]); @@ -977,6 +1015,7 @@ public class DistributedFileSystem extends FileSystem { thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, needLocation); statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_LOCATED_STATUS); if (thisListing == null) { // the directory does not exist throw new FileNotFoundException("File " + p + " does not exist."); } @@ -1072,6 +1111,7 @@ public class DistributedFileSystem extends FileSystem { private boolean mkdirsInternal(Path f, final FsPermission permission, final boolean createParent) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.MKDIRS); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -1098,6 +1138,7 @@ public class DistributedFileSystem extends FileSystem { protected boolean primitiveMkdir(Path f, FsPermission absolutePermission) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.PRIMITIVE_MKDIR); return dfs.primitiveMkdir(getPathName(f), absolutePermission); } @@ -1126,6 +1167,7 @@ public class DistributedFileSystem extends FileSystem { @Override public FsStatus getStatus(Path p) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_STATUS); return dfs.getDiskStatus(); } @@ -1317,6 +1359,7 @@ public class DistributedFileSystem extends FileSystem { @Override public FileStatus getFileStatus(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -1344,6 +1387,7 @@ public class DistributedFileSystem extends FileSystem { throw new UnsupportedOperationException("Symlinks not supported"); } statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK); final Path absF = fixRelativePart(link); new FileSystemLinkResolver() { @Override @@ -1367,6 +1411,7 @@ public class DistributedFileSystem extends FileSystem { @Override public FileStatus getFileLinkStatus(final Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_LINK_STATUS); final Path absF = fixRelativePart(f); FileStatus status = new FileSystemLinkResolver() { @Override @@ -1396,6 +1441,7 @@ public class DistributedFileSystem extends FileSystem { @Override public Path getLinkTarget(final Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_LINK_TARGET); final Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -1417,6 +1463,7 @@ public class DistributedFileSystem extends FileSystem { @Override protected Path resolveLink(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.RESOLVE_LINK); String target = dfs.getLinkTarget(getPathName(fixRelativePart(f))); if (target == null) { throw new FileNotFoundException("File does not exist: " + f.toString()); @@ -1427,6 +1474,7 @@ public class DistributedFileSystem extends FileSystem { @Override public FileChecksum getFileChecksum(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -1446,6 +1494,7 @@ public class DistributedFileSystem extends FileSystem { public FileChecksum getFileChecksum(Path f, final long length) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -1471,6 +1520,7 @@ public class DistributedFileSystem extends FileSystem { public void setPermission(Path p, final FsPermission permission ) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_PERMISSION); Path absF = fixRelativePart(p); new FileSystemLinkResolver() { @Override @@ -1495,6 +1545,7 @@ public class DistributedFileSystem extends FileSystem { throw new IOException("username == null && groupname == null"); } statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_OWNER); Path absF = fixRelativePart(p); new FileSystemLinkResolver() { @Override @@ -1516,6 +1567,7 @@ public class DistributedFileSystem extends FileSystem { public void setTimes(Path p, final long mtime, final long atime) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_TIMES); Path absF = fixRelativePart(p); new FileSystemLinkResolver() { @Override @@ -1593,6 +1645,8 @@ public class DistributedFileSystem extends FileSystem { /** @see HdfsAdmin#allowSnapshot(Path) */ public void allowSnapshot(final Path path) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.ALLOW_SNAPSHOT); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -1619,6 +1673,8 @@ public class DistributedFileSystem extends FileSystem { /** @see HdfsAdmin#disallowSnapshot(Path) */ public void disallowSnapshot(final Path path) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.DISALLOW_SNAPSHOT); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -1646,6 +1702,8 @@ public class DistributedFileSystem extends FileSystem { @Override public Path createSnapshot(final Path path, final String snapshotName) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT); Path absF = fixRelativePart(path); return new FileSystemLinkResolver() { @Override @@ -1671,6 +1729,8 @@ public class DistributedFileSystem extends FileSystem { @Override public void renameSnapshot(final Path path, final String snapshotOldName, final String snapshotNewName) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -1707,6 +1767,8 @@ public class DistributedFileSystem extends FileSystem { @Override public void deleteSnapshot(final Path snapshotDir, final String snapshotName) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT); Path absF = fixRelativePart(snapshotDir); new FileSystemLinkResolver() { @Override @@ -1954,6 +2016,8 @@ public class DistributedFileSystem extends FileSystem { @Override public void modifyAclEntries(Path path, final List aclSpec) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -1976,6 +2040,8 @@ public class DistributedFileSystem extends FileSystem { @Override public void removeAclEntries(Path path, final List aclSpec) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -1997,6 +2063,8 @@ public class DistributedFileSystem extends FileSystem { */ @Override public void removeDefaultAcl(Path path) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL); final Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -2017,6 +2085,8 @@ public class DistributedFileSystem extends FileSystem { */ @Override public void removeAcl(Path path) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_ACL); final Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -2038,6 +2108,8 @@ public class DistributedFileSystem extends FileSystem { @Override public void setAcl(Path path, final List aclSpec) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_ACL); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -2136,6 +2208,8 @@ public class DistributedFileSystem extends FileSystem { @Override public void setXAttr(Path path, final String name, final byte[] value, final EnumSet flag) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_XATTR); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @@ -2155,6 +2229,8 @@ public class DistributedFileSystem extends FileSystem { @Override public byte[] getXAttr(Path path, final String name) throws IOException { + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_XATTR); final Path absF = fixRelativePart(path); return new FileSystemLinkResolver() { @Override @@ -2220,6 +2296,8 @@ public class DistributedFileSystem extends FileSystem { @Override public void removeXAttr(Path path, final String name) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -2450,4 +2528,5 @@ public class DistributedFileSystem extends FileSystem { Statistics getFsStatistics() { return statistics; } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index ab4e0d06bf..9e42b242ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -62,6 +62,11 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.GlobalStorageStatistics; +import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; +import org.apache.hadoop.fs.StorageStatistics; +import org.apache.hadoop.hdfs.DFSOpsCountStatistics; +import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; @@ -143,6 +148,8 @@ public class WebHdfsFileSystem extends FileSystem private static final ObjectReader READER = new ObjectMapper().reader(Map.class); + private DFSOpsCountStatistics storageStatistics; + /** * Return the protocol scheme for the FileSystem. *

@@ -240,6 +247,15 @@ public class WebHdfsFileSystem extends FileSystem CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.initializeRestCsrf(conf); this.delegationToken = null; + + storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE + .put(DFSOpsCountStatistics.NAME, + new StorageStatisticsProvider() { + @Override + public StorageStatistics provide() { + return new DFSOpsCountStatistics(); + } + }); } /** @@ -974,6 +990,7 @@ public class WebHdfsFileSystem extends FileSystem @Override public FileStatus getFileStatus(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS); return makeQualified(getHdfsFileStatus(f), f); } @@ -1003,6 +1020,7 @@ public class WebHdfsFileSystem extends FileSystem @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.MKDIRS); final HttpOpParam.Op op = PutOpParam.Op.MKDIRS; return new FsPathBooleanRunner(op, f, new PermissionParam(applyUMask(permission)) @@ -1015,6 +1033,7 @@ public class WebHdfsFileSystem extends FileSystem public void createSymlink(Path destination, Path f, boolean createParent ) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK); final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK; new FsPathRunner(op, f, new DestinationParam(makeQualified(destination).toUri().getPath()), @@ -1025,6 +1044,7 @@ public class WebHdfsFileSystem extends FileSystem @Override public boolean rename(final Path src, final Path dst) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME); final HttpOpParam.Op op = PutOpParam.Op.RENAME; return new FsPathBooleanRunner(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()) @@ -1036,6 +1056,7 @@ public class WebHdfsFileSystem extends FileSystem public void rename(final Path src, final Path dst, final Options.Rename... options) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME); final HttpOpParam.Op op = PutOpParam.Op.RENAME; new FsPathRunner(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()), @@ -1047,6 +1068,7 @@ public class WebHdfsFileSystem extends FileSystem public void setXAttr(Path p, String name, byte[] value, EnumSet flag) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_XATTR); final HttpOpParam.Op op = PutOpParam.Op.SETXATTR; if (value != null) { new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam( @@ -1060,6 +1082,8 @@ public class WebHdfsFileSystem extends FileSystem @Override public byte[] getXAttr(Path p, final String name) throws IOException { + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_XATTR); final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS; return new FsPathResponseRunner(op, p, new XAttrNameParam(name), new XAttrEncodingParam(XAttrCodec.HEX)) { @@ -1116,6 +1140,7 @@ public class WebHdfsFileSystem extends FileSystem @Override public void removeXAttr(Path p, String name) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR); final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR; new FsPathRunner(op, p, new XAttrNameParam(name)).run(); } @@ -1128,6 +1153,7 @@ public class WebHdfsFileSystem extends FileSystem } statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_OWNER); final HttpOpParam.Op op = PutOpParam.Op.SETOWNER; new FsPathRunner(op, p, new OwnerParam(owner), new GroupParam(group) @@ -1138,6 +1164,7 @@ public class WebHdfsFileSystem extends FileSystem public void setPermission(final Path p, final FsPermission permission ) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_PERMISSION); final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION; new FsPathRunner(op, p,new PermissionParam(permission)).run(); } @@ -1146,6 +1173,7 @@ public class WebHdfsFileSystem extends FileSystem public void modifyAclEntries(Path path, List aclSpec) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES); final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES; new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); } @@ -1154,6 +1182,7 @@ public class WebHdfsFileSystem extends FileSystem public void removeAclEntries(Path path, List aclSpec) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES); final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES; new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); } @@ -1161,6 +1190,7 @@ public class WebHdfsFileSystem extends FileSystem @Override public void removeDefaultAcl(Path path) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL); final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL; new FsPathRunner(op, path).run(); } @@ -1168,6 +1198,7 @@ public class WebHdfsFileSystem extends FileSystem @Override public void removeAcl(Path path) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_ACL); final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL; new FsPathRunner(op, path).run(); } @@ -1176,12 +1207,14 @@ public class WebHdfsFileSystem extends FileSystem public void setAcl(final Path p, final List aclSpec) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_ACL); final HttpOpParam.Op op = PutOpParam.Op.SETACL; new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run(); } public void allowSnapshot(final Path p) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.ALLOW_SNAPSHOT); final HttpOpParam.Op op = PutOpParam.Op.ALLOWSNAPSHOT; new FsPathRunner(op, p).run(); } @@ -1190,6 +1223,7 @@ public class WebHdfsFileSystem extends FileSystem public Path createSnapshot(final Path path, final String snapshotName) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT); final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT; return new FsPathResponseRunner(op, path, new SnapshotNameParam(snapshotName)) { @@ -1202,6 +1236,7 @@ public class WebHdfsFileSystem extends FileSystem public void disallowSnapshot(final Path p) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.DISALLOW_SNAPSHOT); final HttpOpParam.Op op = PutOpParam.Op.DISALLOWSNAPSHOT; new FsPathRunner(op, p).run(); } @@ -1210,6 +1245,7 @@ public class WebHdfsFileSystem extends FileSystem public void deleteSnapshot(final Path path, final String snapshotName) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT); final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT; new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run(); } @@ -1218,6 +1254,7 @@ public class WebHdfsFileSystem extends FileSystem public void renameSnapshot(final Path path, final String snapshotOldName, final String snapshotNewName) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT); final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT; new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName), new SnapshotNameParam(snapshotNewName)).run(); @@ -1227,6 +1264,7 @@ public class WebHdfsFileSystem extends FileSystem public boolean setReplication(final Path p, final short replication ) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_REPLICATION); final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION; return new FsPathBooleanRunner(op, p, new ReplicationParam(replication) @@ -1237,6 +1275,7 @@ public class WebHdfsFileSystem extends FileSystem public void setTimes(final Path p, final long mtime, final long atime ) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_TIMES); final HttpOpParam.Op op = PutOpParam.Op.SETTIMES; new FsPathRunner(op, p, new ModificationTimeParam(mtime), @@ -1259,6 +1298,7 @@ public class WebHdfsFileSystem extends FileSystem @Override public void concat(final Path trg, final Path [] srcs) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CONCAT); final HttpOpParam.Op op = PostOpParam.Op.CONCAT; new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run(); } @@ -1268,6 +1308,7 @@ public class WebHdfsFileSystem extends FileSystem final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE); final HttpOpParam.Op op = PutOpParam.Op.CREATE; return new FsPathOutputStreamRunner(op, f, bufferSize, @@ -1285,6 +1326,7 @@ public class WebHdfsFileSystem extends FileSystem final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE); final HttpOpParam.Op op = PutOpParam.Op.CREATE; return new FsPathOutputStreamRunner(op, f, bufferSize, @@ -1301,6 +1343,7 @@ public class WebHdfsFileSystem extends FileSystem public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.APPEND); final HttpOpParam.Op op = PostOpParam.Op.APPEND; return new FsPathOutputStreamRunner(op, f, bufferSize, @@ -1311,6 +1354,7 @@ public class WebHdfsFileSystem extends FileSystem @Override public boolean truncate(Path f, long newLength) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.TRUNCATE); final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE; return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run(); @@ -1318,6 +1362,8 @@ public class WebHdfsFileSystem extends FileSystem @Override public boolean delete(Path f, boolean recursive) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.DELETE); final HttpOpParam.Op op = DeleteOpParam.Op.DELETE; return new FsPathBooleanRunner(op, f, new RecursiveParam(recursive) @@ -1328,6 +1374,7 @@ public class WebHdfsFileSystem extends FileSystem public FSDataInputStream open(final Path f, final int bufferSize ) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.OPEN); return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize)); } @@ -1427,6 +1474,7 @@ public class WebHdfsFileSystem extends FileSystem @Override public FileStatus[] listStatus(final Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_STATUS); final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS; return new FsPathResponseRunner(op, f) { @@ -1522,6 +1570,7 @@ public class WebHdfsFileSystem extends FileSystem public BlockLocation[] getFileBlockLocations(final Path p, final long offset, final long length) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS); final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS; return new FsPathResponseRunner(op, p, @@ -1543,6 +1592,7 @@ public class WebHdfsFileSystem extends FileSystem @Override public ContentSummary getContentSummary(final Path p) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY); final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY; return new FsPathResponseRunner(op, p) { @@ -1557,6 +1607,7 @@ public class WebHdfsFileSystem extends FileSystem public MD5MD5CRC32FileChecksum getFileChecksum(final Path p ) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM); final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM; return new FsPathResponseRunner(op, p) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 1db0da8623..f3bf6b53a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; @@ -39,9 +40,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -49,18 +54,22 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.StorageStatistics.LongStatistic; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; @@ -69,19 +78,27 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; import org.mockito.InOrder; +import org.mockito.internal.util.reflection.Whitebox; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestDistributedFileSystem { private static final Random RAN = new Random(); + private static final Logger LOG = LoggerFactory.getLogger( + TestDistributedFileSystem.class); static { GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL); @@ -554,54 +571,84 @@ public class TestDistributedFileSystem { } @Test - public void testStatistics() throws Exception { + public void testStatistics() throws IOException { + FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME, + DistributedFileSystem.class).reset(); + @SuppressWarnings("unchecked") + ThreadLocal data = (ThreadLocal) + Whitebox.getInternalState( + FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME, + DistributedFileSystem.class), "threadData"); + data.set(null); + int lsLimit = 2; final Configuration conf = getTestConfiguration(); conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, lsLimit); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); try { + cluster.waitActive(); final FileSystem fs = cluster.getFileSystem(); Path dir = new Path("/test"); Path file = new Path(dir, "file"); - - int readOps = DFSTestUtil.getStatistics(fs).getReadOps(); - int writeOps = DFSTestUtil.getStatistics(fs).getWriteOps(); - int largeReadOps = DFSTestUtil.getStatistics(fs).getLargeReadOps(); + + int readOps = 0; + int writeOps = 0; + int largeReadOps = 0; + + long opCount = getOpStatistics(OpType.MKDIRS); fs.mkdirs(dir); checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.MKDIRS, opCount + 1); + opCount = getOpStatistics(OpType.CREATE); FSDataOutputStream out = fs.create(file, (short)1); out.close(); checkStatistics(fs, readOps, ++writeOps, largeReadOps); - + checkOpStatistics(OpType.CREATE, opCount + 1); + + opCount = getOpStatistics(OpType.GET_FILE_STATUS); FileStatus status = fs.getFileStatus(file); checkStatistics(fs, ++readOps, writeOps, largeReadOps); + checkOpStatistics(OpType.GET_FILE_STATUS, opCount + 1); + opCount = getOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS); fs.getFileBlockLocations(file, 0, 0); checkStatistics(fs, ++readOps, writeOps, largeReadOps); - + checkOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS, opCount + 1); fs.getFileBlockLocations(status, 0, 0); checkStatistics(fs, ++readOps, writeOps, largeReadOps); + checkOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS, opCount + 2); + opCount = getOpStatistics(OpType.OPEN); FSDataInputStream in = fs.open(file); in.close(); checkStatistics(fs, ++readOps, writeOps, largeReadOps); + checkOpStatistics(OpType.OPEN, opCount + 1); + opCount = getOpStatistics(OpType.SET_REPLICATION); fs.setReplication(file, (short)2); checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.SET_REPLICATION, opCount + 1); + opCount = getOpStatistics(OpType.RENAME); Path file1 = new Path(dir, "file1"); fs.rename(file, file1); checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.RENAME, opCount + 1); + opCount = getOpStatistics(OpType.GET_CONTENT_SUMMARY); fs.getContentSummary(file1); checkStatistics(fs, ++readOps, writeOps, largeReadOps); + checkOpStatistics(OpType.GET_CONTENT_SUMMARY, opCount + 1); // Iterative ls test + long mkdirOp = getOpStatistics(OpType.MKDIRS); + long listStatusOp = getOpStatistics(OpType.LIST_STATUS); for (int i = 0; i < 10; i++) { Path p = new Path(dir, Integer.toString(i)); fs.mkdirs(p); + mkdirOp++; FileStatus[] list = fs.listStatus(dir); if (list.length > lsLimit) { // if large directory, then count readOps and largeReadOps by @@ -609,41 +656,131 @@ public class TestDistributedFileSystem { int iterations = (int)Math.ceil((double)list.length/lsLimit); largeReadOps += iterations; readOps += iterations; + listStatusOp += iterations; } else { // Single iteration in listStatus - no large read operation done readOps++; + listStatusOp++; } // writeOps incremented by 1 for mkdirs // readOps and largeReadOps incremented by 1 or more checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.MKDIRS, mkdirOp); + checkOpStatistics(OpType.LIST_STATUS, listStatusOp); } + opCount = getOpStatistics(OpType.GET_STATUS); fs.getStatus(file1); checkStatistics(fs, ++readOps, writeOps, largeReadOps); - + checkOpStatistics(OpType.GET_STATUS, opCount + 1); + + opCount = getOpStatistics(OpType.GET_FILE_CHECKSUM); fs.getFileChecksum(file1); checkStatistics(fs, ++readOps, writeOps, largeReadOps); + checkOpStatistics(OpType.GET_FILE_CHECKSUM, opCount + 1); + opCount = getOpStatistics(OpType.SET_PERMISSION); fs.setPermission(file1, new FsPermission((short)0777)); checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.SET_PERMISSION, opCount + 1); + opCount = getOpStatistics(OpType.SET_TIMES); fs.setTimes(file1, 0L, 0L); checkStatistics(fs, readOps, ++writeOps, largeReadOps); - + checkOpStatistics(OpType.SET_TIMES, opCount + 1); + + opCount = getOpStatistics(OpType.SET_OWNER); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); fs.setOwner(file1, ugi.getUserName(), ugi.getGroupNames()[0]); + checkOpStatistics(OpType.SET_OWNER, opCount + 1); checkStatistics(fs, readOps, ++writeOps, largeReadOps); - + + opCount = getOpStatistics(OpType.DELETE); fs.delete(dir, true); checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.DELETE, opCount + 1); } finally { if (cluster != null) cluster.shutdown(); } } - + + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + @Test (timeout = 180000) + public void testConcurrentStatistics() + throws IOException, InterruptedException { + FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME, + DistributedFileSystem.class).reset(); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder( + new Configuration()).build(); + cluster.waitActive(); + final FileSystem fs = cluster.getFileSystem(); + final int numThreads = 5; + final ExecutorService threadPool = + HadoopExecutors.newFixedThreadPool(numThreads); + + try { + final CountDownLatch allExecutorThreadsReady = + new CountDownLatch(numThreads); + final CountDownLatch startBlocker = new CountDownLatch(1); + final CountDownLatch allDone = new CountDownLatch(numThreads); + final AtomicReference childError = new AtomicReference<>(); + + for (int i = 0; i < numThreads; i++) { + threadPool.submit(new Runnable() { + @Override + public void run() { + allExecutorThreadsReady.countDown(); + try { + startBlocker.await(); + final FileSystem fs = cluster.getFileSystem(); + fs.mkdirs(new Path("/testStatisticsParallelChild")); + } catch (Throwable t) { + LOG.error("Child failed when calling mkdir", t); + childError.compareAndSet(null, t); + } finally { + allDone.countDown(); + } + } + }); + } + + final long oldMkdirOpCount = getOpStatistics(OpType.MKDIRS); + + // wait until all threads are ready + allExecutorThreadsReady.await(); + // all threads start making directories + startBlocker.countDown(); + // wait until all threads are done + allDone.await(); + + assertNull("Child failed with exception " + childError.get(), + childError.get()); + + checkStatistics(fs, 0, numThreads, 0); + // check the single operation count stat + checkOpStatistics(OpType.MKDIRS, numThreads + oldMkdirOpCount); + // iterate all the operation counts + for (Iterator opCountIter = + FileSystem.getGlobalStorageStatistics() + .get(DFSOpsCountStatistics.NAME).getLongStatistics(); + opCountIter.hasNext();) { + final LongStatistic opCount = opCountIter.next(); + if (OpType.MKDIRS.getSymbol().equals(opCount.getName())) { + assertEquals("Unexpected op count from iterator!", + numThreads + oldMkdirOpCount, opCount.getValue()); + } + LOG.info(opCount.getName() + "\t" + opCount.getValue()); + } + } finally { + threadPool.shutdownNow(); + cluster.shutdown(); + } + } + /** Checks statistics. -1 indicates do not check for the operations */ private void checkStatistics(FileSystem fs, int readOps, int writeOps, int largeReadOps) { @@ -713,6 +850,17 @@ public class TestDistributedFileSystem { } } + private static void checkOpStatistics(OpType op, long count) { + assertEquals("Op " + op.getSymbol() + " has unexpected count!", + count, getOpStatistics(op)); + } + + private static long getOpStatistics(OpType op) { + return GlobalStorageStatistics.INSTANCE.get( + DFSOpsCountStatistics.NAME) + .getLong(op.getSymbol()); + } + @Test public void testFileChecksum() throws Exception { final long seed = RAN.nextLong();