HADOOP-13065. Add a new interface for retrieving FS and FC Statistics (Mingliang Liu via cmccabe)
This commit is contained in:
parent
acb509b2fa
commit
687233f20d
@ -0,0 +1,43 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* EmptyStorageStatistics is a StorageStatistics implementation which has no
|
||||
* data.
|
||||
*/
|
||||
class EmptyStorageStatistics extends StorageStatistics {
|
||||
EmptyStorageStatistics(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
public Iterator<LongStatistic> getLongStatistics() {
|
||||
return Collections.emptyIterator();
|
||||
}
|
||||
|
||||
public Long getLong(String key) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public boolean isTracked(String key) {
|
||||
return false;
|
||||
}
|
||||
}
|
@ -49,6 +49,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
|
||||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
@ -3560,7 +3561,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||
/**
|
||||
* Get the Map of Statistics object indexed by URI Scheme.
|
||||
* @return a Map having a key as URI scheme and value as Statistics object
|
||||
* @deprecated use {@link #getAllStatistics} instead
|
||||
* @deprecated use {@link #getGlobalStorageStatistics()}
|
||||
*/
|
||||
@Deprecated
|
||||
public static synchronized Map<String, Statistics> getStatistics() {
|
||||
@ -3572,8 +3573,10 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the FileSystem classes that have Statistics
|
||||
* Return the FileSystem classes that have Statistics.
|
||||
* @deprecated use {@link #getGlobalStorageStatistics()}
|
||||
*/
|
||||
@Deprecated
|
||||
public static synchronized List<Statistics> getAllStatistics() {
|
||||
return new ArrayList<Statistics>(statisticsTable.values());
|
||||
}
|
||||
@ -3582,13 +3585,23 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||
* Get the statistics for a particular file system
|
||||
* @param cls the class to lookup
|
||||
* @return a statistics object
|
||||
* @deprecated use {@link #getGlobalStorageStatistics()}
|
||||
*/
|
||||
public static synchronized
|
||||
Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
|
||||
@Deprecated
|
||||
public static synchronized Statistics getStatistics(final String scheme,
|
||||
Class<? extends FileSystem> cls) {
|
||||
Statistics result = statisticsTable.get(cls);
|
||||
if (result == null) {
|
||||
result = new Statistics(scheme);
|
||||
statisticsTable.put(cls, result);
|
||||
final Statistics newStats = new Statistics(scheme);
|
||||
statisticsTable.put(cls, newStats);
|
||||
result = newStats;
|
||||
GlobalStorageStatistics.INSTANCE.put(scheme,
|
||||
new StorageStatisticsProvider() {
|
||||
@Override
|
||||
public StorageStatistics provide() {
|
||||
return new FileSystemStorageStatistics(scheme, newStats);
|
||||
}
|
||||
});
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@ -3628,4 +3641,26 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||
public static void enableSymlinks() {
|
||||
symlinksEnabled = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the StorageStatistics for this FileSystem object. These statistics are
|
||||
* per-instance. They are not shared with any other FileSystem object.
|
||||
*
|
||||
* <p>This is a default method which is intended to be overridden by
|
||||
* subclasses. The default implementation returns an empty storage statistics
|
||||
* object.</p>
|
||||
*
|
||||
* @return The StorageStatistics for this FileSystem instance.
|
||||
* Will never be null.
|
||||
*/
|
||||
public StorageStatistics getStorageStatistics() {
|
||||
return new EmptyStorageStatistics(getUri().toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the global storage statistics.
|
||||
*/
|
||||
public static GlobalStorageStatistics getGlobalStorageStatistics() {
|
||||
return GlobalStorageStatistics.INSTANCE;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,136 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
|
||||
|
||||
/**
|
||||
* A basic StorageStatistics instance which simply returns data from
|
||||
* FileSystem#Statistics.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class FileSystemStorageStatistics extends StorageStatistics {
|
||||
/**
|
||||
* The per-class FileSystem statistics.
|
||||
*/
|
||||
private final FileSystem.Statistics stats;
|
||||
|
||||
private static final String[] KEYS = {
|
||||
"bytesRead",
|
||||
"bytesWritten",
|
||||
"readOps",
|
||||
"largeReadOps",
|
||||
"writeOps",
|
||||
"bytesReadLocalHost",
|
||||
"bytesReadDistanceOfOneOrTwo",
|
||||
"bytesReadDistanceOfThreeOrFour",
|
||||
"bytesReadDistanceOfFiveOrLarger"
|
||||
};
|
||||
|
||||
private static class LongStatisticIterator
|
||||
implements Iterator<LongStatistic> {
|
||||
private final StatisticsData data;
|
||||
|
||||
private int keyIdx;
|
||||
|
||||
LongStatisticIterator(StatisticsData data) {
|
||||
this.data = data;
|
||||
this.keyIdx = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return (this.keyIdx < KEYS.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStatistic next() {
|
||||
if (this.keyIdx >= KEYS.length) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
String key = KEYS[this.keyIdx++];
|
||||
Long val = fetch(data, key);
|
||||
return new LongStatistic(key, val.longValue());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
private static Long fetch(StatisticsData data, String key) {
|
||||
switch (key) {
|
||||
case "bytesRead":
|
||||
return data.getBytesRead();
|
||||
case "bytesWritten":
|
||||
return data.getBytesWritten();
|
||||
case "readOps":
|
||||
return Long.valueOf(data.getReadOps());
|
||||
case "largeReadOps":
|
||||
return Long.valueOf(data.getLargeReadOps());
|
||||
case "writeOps":
|
||||
return Long.valueOf(data.getWriteOps());
|
||||
case "bytesReadLocalHost":
|
||||
return data.getBytesReadLocalHost();
|
||||
case "bytesReadDistanceOfOneOrTwo":
|
||||
return data.getBytesReadDistanceOfOneOrTwo();
|
||||
case "bytesReadDistanceOfThreeOrFour":
|
||||
return data.getBytesReadDistanceOfThreeOrFour();
|
||||
case "bytesReadDistanceOfFiveOrLarger":
|
||||
return data.getBytesReadDistanceOfFiveOrLarger();
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
FileSystemStorageStatistics(String name, FileSystem.Statistics stats) {
|
||||
super(name);
|
||||
this.stats = stats;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<LongStatistic> getLongStatistics() {
|
||||
return new LongStatisticIterator(stats.getData());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getLong(String key) {
|
||||
return fetch(stats.getData(), key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if a statistic is being tracked.
|
||||
*
|
||||
* @return True only if the statistic is being tracked.
|
||||
*/
|
||||
public boolean isTracked(String key) {
|
||||
for (String k: KEYS) {
|
||||
if (k.equals(key)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
@ -0,0 +1,127 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Stores global storage statistics objects.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public enum GlobalStorageStatistics {
|
||||
/**
|
||||
* The GlobalStorageStatistics singleton.
|
||||
*/
|
||||
INSTANCE;
|
||||
|
||||
/**
|
||||
* A map of all global StorageStatistics objects, indexed by name.
|
||||
*/
|
||||
private final NavigableMap<String, StorageStatistics> map = new TreeMap<>();
|
||||
|
||||
/**
|
||||
* A callback API for creating new StorageStatistics instances.
|
||||
*/
|
||||
public interface StorageStatisticsProvider {
|
||||
StorageStatistics provide();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the StorageStatistics object with the given name.
|
||||
*
|
||||
* @param name The storage statistics object name.
|
||||
* @return The StorageStatistics object with the given name, or
|
||||
* null if there is none.
|
||||
*/
|
||||
public synchronized StorageStatistics get(String name) {
|
||||
return map.get(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create or return the StorageStatistics object with the given name.
|
||||
*
|
||||
* @param name The storage statistics object name.
|
||||
* @param provider An object which can create a new StorageStatistics
|
||||
* object if needed.
|
||||
* @return The StorageStatistics object with the given name.
|
||||
* @throws RuntimeException If the StorageStatisticsProvider provides a new
|
||||
* StorageStatistics object with the wrong name.
|
||||
*/
|
||||
public synchronized StorageStatistics put(String name,
|
||||
StorageStatisticsProvider provider) {
|
||||
StorageStatistics stats = map.get(name);
|
||||
if (stats != null) {
|
||||
return stats;
|
||||
}
|
||||
stats = provider.provide();
|
||||
if (!stats.getName().equals(name)) {
|
||||
throw new RuntimeException("StorageStatisticsProvider for " + name +
|
||||
" provided a StorageStatistics object for " + stats.getName() +
|
||||
" instead.");
|
||||
}
|
||||
map.put(name, stats);
|
||||
return stats;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an iterator that we can use to iterate throw all the global storage
|
||||
* statistics objects.
|
||||
*/
|
||||
synchronized public Iterator<StorageStatistics> iterator() {
|
||||
Entry<String, StorageStatistics> first = map.firstEntry();
|
||||
return new StorageIterator((first == null) ? null : first.getValue());
|
||||
}
|
||||
|
||||
private class StorageIterator implements Iterator<StorageStatistics> {
|
||||
private StorageStatistics next = null;
|
||||
|
||||
StorageIterator(StorageStatistics first) {
|
||||
this.next = first;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return (next != null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StorageStatistics next() {
|
||||
if (next == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
synchronized (GlobalStorageStatistics.this) {
|
||||
StorageStatistics cur = next;
|
||||
Entry<String, StorageStatistics> nextEntry =
|
||||
map.higherEntry(cur.getName());
|
||||
next = (nextEntry == null) ? null : nextEntry.getValue();
|
||||
return cur;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,93 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* StorageStatistics contains statistics data for a FileSystem or FileContext
|
||||
* instance.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
public abstract class StorageStatistics {
|
||||
/**
|
||||
* A 64-bit storage statistic.
|
||||
*/
|
||||
public static class LongStatistic {
|
||||
private final String name;
|
||||
private final long value;
|
||||
|
||||
public LongStatistic(String name, long value) {
|
||||
this.name = name;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The name of this statistic.
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The value of this statistic.
|
||||
*/
|
||||
public long getValue() {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
private final String name;
|
||||
|
||||
public StorageStatistics(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the name of this StorageStatistics object.
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an iterator over all the currently tracked long statistics.
|
||||
*
|
||||
* The values returned will depend on the type of FileSystem or FileContext
|
||||
* object. The values do not necessarily reflect a snapshot in time.
|
||||
*/
|
||||
public abstract Iterator<LongStatistic> getLongStatistics();
|
||||
|
||||
/**
|
||||
* Get the value of a statistic.
|
||||
*
|
||||
* @return null if the statistic is not being tracked or is not a
|
||||
* long statistic.
|
||||
* The value of the statistic, otherwise.
|
||||
*/
|
||||
public abstract Long getLong(String key);
|
||||
|
||||
/**
|
||||
* Return true if a statistic is being tracked.
|
||||
*
|
||||
* @return True only if the statistic is being tracked.
|
||||
*/
|
||||
public abstract boolean isTracked(String key);
|
||||
}
|
@ -0,0 +1,113 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* A StorageStatistics instance which combines the outputs of several other
|
||||
* StorageStatistics instances.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class UnionStorageStatistics extends StorageStatistics {
|
||||
/**
|
||||
* The underlying StorageStatistics.
|
||||
*/
|
||||
private final StorageStatistics[] stats;
|
||||
|
||||
private class LongStatisticIterator implements Iterator<LongStatistic> {
|
||||
private int statIdx;
|
||||
|
||||
private Iterator<LongStatistic> cur;
|
||||
|
||||
LongStatisticIterator() {
|
||||
this.statIdx = 0;
|
||||
this.cur = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return (getIter() != null);
|
||||
}
|
||||
|
||||
private Iterator<LongStatistic> getIter() {
|
||||
while ((cur == null) || (!cur.hasNext())) {
|
||||
if (stats.length >= statIdx) {
|
||||
return null;
|
||||
}
|
||||
cur = stats[statIdx++].getLongStatistics();
|
||||
}
|
||||
return cur;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStatistic next() {
|
||||
Iterator<LongStatistic> iter = getIter();
|
||||
if (iter == null) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
return iter.next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
public UnionStorageStatistics(String name, StorageStatistics[] stats) {
|
||||
super(name);
|
||||
this.stats = stats;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<LongStatistic> getLongStatistics() {
|
||||
return new LongStatisticIterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getLong(String key) {
|
||||
for (int i = 0; i < stats.length; i++) {
|
||||
Long val = stats[i].getLong(key);
|
||||
if (val != null) {
|
||||
return val;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if a statistic is being tracked.
|
||||
*
|
||||
* @return True only if the statistic is being tracked.
|
||||
*/
|
||||
@Override
|
||||
public boolean isTracked(String key) {
|
||||
for (int i = 0; i < stats.length; i++) {
|
||||
if (stats[i].isTracked(key)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
@ -132,6 +132,7 @@ public class TestFilterFileSystem {
|
||||
public Path fixRelativePart(Path p);
|
||||
public ContentSummary getContentSummary(Path f);
|
||||
public QuotaUsage getQuotaUsage(Path f);
|
||||
StorageStatistics getStorageStatistics();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -221,6 +221,7 @@ public class TestHarFileSystem {
|
||||
public Path getTrashRoot(Path path) throws IOException;
|
||||
|
||||
public Collection<FileStatus> getTrashRoots(boolean allUsers) throws IOException;
|
||||
StorageStatistics getStorageStatistics();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -0,0 +1,167 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.fs.StorageStatistics;
|
||||
|
||||
import java.util.EnumMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* This storage statistics tracks how many times each DFS operation was issued.
|
||||
*
|
||||
* For each tracked DFS operation, there is a respective entry in the enum
|
||||
* {@link OpType}. To use, increment the value the {@link DistributedFileSystem}
|
||||
* and {@link org.apache.hadoop.hdfs.web.WebHdfsFileSystem}.
|
||||
*
|
||||
* This class is thread safe, and is generally shared by multiple threads.
|
||||
*/
|
||||
public class DFSOpsCountStatistics extends StorageStatistics {
|
||||
|
||||
/** This is for counting file system operations. */
|
||||
public enum OpType {
|
||||
ALLOW_SNAPSHOT("allowSnapshot"),
|
||||
APPEND("append"),
|
||||
CONCAT("concat"),
|
||||
COPY_FROM_LOCAL_FILE("copyFromLocalFile"),
|
||||
CREATE("create"),
|
||||
CREATE_NON_RECURSIVE("createNonRecursive"),
|
||||
CREATE_SNAPSHOT("createSnapshot"),
|
||||
CREATE_SYM_LINK("createSymlink"),
|
||||
DELETE("delete"),
|
||||
DELETE_SNAPSHOT("deleteSnapshot"),
|
||||
DISALLOW_SNAPSHOT("disallowSnapshot"),
|
||||
EXISTS("exists"),
|
||||
GET_BYTES_WITH_FUTURE_GS("getBytesWithFutureGenerationStamps"),
|
||||
GET_CONTENT_SUMMARY("getContentSummary"),
|
||||
GET_FILE_BLOCK_LOCATIONS("getFileBlockLocations"),
|
||||
GET_FILE_CHECKSUM("getFileChecksum"),
|
||||
GET_FILE_LINK_STATUS("getFileLinkStatus"),
|
||||
GET_FILE_STATUS("getFileStatus"),
|
||||
GET_LINK_TARGET("getLinkTarget"),
|
||||
GET_QUOTA_USAGE("getQuotaUsage"),
|
||||
GET_STATUS("getStatus"),
|
||||
GET_STORAGE_POLICIES("getStoragePolicies"),
|
||||
GET_STORAGE_POLICY("getStoragePolicy"),
|
||||
GET_XATTR("getXAttr"),
|
||||
LIST_LOCATED_STATUS("listLocatedStatus"),
|
||||
LIST_STATUS("listStatus"),
|
||||
MKDIRS("mkdirs"),
|
||||
MODIFY_ACL_ENTRIES("modifyAclEntries"),
|
||||
OPEN("open"),
|
||||
PRIMITIVE_CREATE("primitiveCreate"),
|
||||
PRIMITIVE_MKDIR("primitiveMkdir"),
|
||||
REMOVE_ACL("removeAcl"),
|
||||
REMOVE_ACL_ENTRIES("removeAclEntries"),
|
||||
REMOVE_DEFAULT_ACL("removeDefaultAcl"),
|
||||
REMOVE_XATTR("removeXAttr"),
|
||||
RENAME("rename"),
|
||||
RENAME_SNAPSHOT("renameSnapshot"),
|
||||
RESOLVE_LINK("resolveLink"),
|
||||
SET_ACL("setAcl"),
|
||||
SET_OWNER("setOwner"),
|
||||
SET_PERMISSION("setPermission"),
|
||||
SET_REPLICATION("setReplication"),
|
||||
SET_STORAGE_POLICY("setStoragePolicy"),
|
||||
SET_TIMES("setTimes"),
|
||||
SET_XATTR("setXAttr"),
|
||||
TRUNCATE("truncate"),
|
||||
UNSET_STORAGE_POLICY("unsetStoragePolicy");
|
||||
|
||||
private final String symbol;
|
||||
|
||||
OpType(String symbol) {
|
||||
this.symbol = symbol;
|
||||
}
|
||||
|
||||
public String getSymbol() {
|
||||
return symbol;
|
||||
}
|
||||
|
||||
public static OpType fromSymbol(String symbol) {
|
||||
if (symbol != null) {
|
||||
for (OpType opType : values()) {
|
||||
if (opType.getSymbol().equals(symbol)) {
|
||||
return opType;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static final String NAME = "DFSOpsCountStatistics";
|
||||
|
||||
private final Map<OpType, AtomicLong> opsCount = new EnumMap<>(OpType.class);
|
||||
|
||||
public DFSOpsCountStatistics() {
|
||||
super(NAME);
|
||||
for (OpType opType : OpType.values()) {
|
||||
opsCount.put(opType, new AtomicLong(0));
|
||||
}
|
||||
}
|
||||
|
||||
public void incrementOpCounter(OpType op) {
|
||||
opsCount.get(op).addAndGet(1);
|
||||
}
|
||||
|
||||
private class LongIterator implements Iterator<LongStatistic> {
|
||||
private Iterator<Entry<OpType, AtomicLong>> iterator =
|
||||
opsCount.entrySet().iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongStatistic next() {
|
||||
if (!iterator.hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
final Entry<OpType, AtomicLong> entry = iterator.next();
|
||||
return new LongStatistic(entry.getKey().name(), entry.getValue().get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<LongStatistic> getLongStatistics() {
|
||||
return new LongIterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getLong(String key) {
|
||||
final OpType type = OpType.fromSymbol(key);
|
||||
return type == null ? null : opsCount.get(type).get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTracked(String key) {
|
||||
return OpType.fromSymbol(key) == null;
|
||||
}
|
||||
|
||||
}
|
@ -47,8 +47,11 @@ import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileSystemLinkResolver;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.FsStatus;
|
||||
import org.apache.hadoop.fs.GlobalStorageStatistics;
|
||||
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.StorageStatistics;
|
||||
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -65,6 +68,7 @@ import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
|
||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
@ -95,7 +99,6 @@ import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
|
||||
/****************************************************************
|
||||
* Implementation of the abstract FileSystem for the DFS system.
|
||||
* This object is the way end-user code interacts with a Hadoop
|
||||
@ -113,6 +116,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
DFSClient dfs;
|
||||
private boolean verifyChecksum = true;
|
||||
|
||||
private DFSOpsCountStatistics storageStatistics;
|
||||
|
||||
static{
|
||||
HdfsConfiguration.init();
|
||||
}
|
||||
@ -150,6 +155,15 @@ public class DistributedFileSystem extends FileSystem {
|
||||
this.dfs = new DFSClient(uri, conf, statistics);
|
||||
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
|
||||
this.workingDir = getHomeDirectory();
|
||||
|
||||
storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE
|
||||
.put(DFSOpsCountStatistics.NAME,
|
||||
new StorageStatisticsProvider() {
|
||||
@Override
|
||||
public StorageStatistics provide() {
|
||||
return new DFSOpsCountStatistics();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -214,6 +228,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
public BlockLocation[] getFileBlockLocations(Path p,
|
||||
final long start, final long len) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
|
||||
final Path absF = fixRelativePart(p);
|
||||
return new FileSystemLinkResolver<BlockLocation[]>() {
|
||||
@Override
|
||||
@ -264,6 +279,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
public FSDataInputStream open(Path f, final int bufferSize)
|
||||
throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.OPEN);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<FSDataInputStream>() {
|
||||
@Override
|
||||
@ -300,6 +316,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
|
||||
final int bufferSize, final Progressable progress) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.APPEND);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<FSDataOutputStream>() {
|
||||
@Override
|
||||
@ -332,6 +349,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
final int bufferSize, final Progressable progress,
|
||||
final InetSocketAddress[] favoredNodes) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.APPEND);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<FSDataOutputStream>() {
|
||||
@Override
|
||||
@ -375,6 +393,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
final Progressable progress, final InetSocketAddress[] favoredNodes)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CREATE);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<HdfsDataOutputStream>() {
|
||||
@Override
|
||||
@ -408,6 +427,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
final Progressable progress, final ChecksumOpt checksumOpt)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CREATE);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<FSDataOutputStream>() {
|
||||
@Override
|
||||
@ -432,6 +452,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
short replication, long blockSize, Progressable progress,
|
||||
ChecksumOpt checksumOpt) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.PRIMITIVE_CREATE);
|
||||
final DFSOutputStream dfsos = dfs.primitiveCreate(
|
||||
getPathName(fixRelativePart(f)),
|
||||
absolutePermission, flag, true, replication, blockSize,
|
||||
@ -448,6 +469,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
final int bufferSize, final short replication, final long blockSize,
|
||||
final Progressable progress) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE);
|
||||
if (flag.contains(CreateFlag.OVERWRITE)) {
|
||||
flag.add(CreateFlag.CREATE);
|
||||
}
|
||||
@ -473,6 +495,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
public boolean setReplication(Path src, final short replication)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_REPLICATION);
|
||||
Path absF = fixRelativePart(src);
|
||||
return new FileSystemLinkResolver<Boolean>() {
|
||||
@Override
|
||||
@ -497,6 +520,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
public void setStoragePolicy(final Path src, final String policyName)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_STORAGE_POLICY);
|
||||
Path absF = fixRelativePart(src);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -517,6 +541,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
public void unsetStoragePolicy(final Path src)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.UNSET_STORAGE_POLICY);
|
||||
Path absF = fixRelativePart(src);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -541,6 +566,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICY);
|
||||
Path absF = fixRelativePart(path);
|
||||
|
||||
return new FileSystemLinkResolver<BlockStoragePolicySpi>() {
|
||||
@ -571,6 +597,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
*/
|
||||
public long getBytesWithFutureGenerationStamps() throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_BYTES_WITH_FUTURE_GS);
|
||||
return dfs.getBytesInFutureBlocks();
|
||||
}
|
||||
|
||||
@ -581,6 +608,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Deprecated
|
||||
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICIES);
|
||||
return dfs.getStoragePolicies();
|
||||
}
|
||||
|
||||
@ -595,6 +623,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public void concat(Path trg, Path [] psrcs) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CONCAT);
|
||||
// Make target absolute
|
||||
Path absF = fixRelativePart(trg);
|
||||
// Make all srcs absolute
|
||||
@ -639,6 +668,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public boolean rename(Path src, Path dst) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.RENAME);
|
||||
|
||||
final Path absSrc = fixRelativePart(src);
|
||||
final Path absDst = fixRelativePart(dst);
|
||||
@ -673,6 +703,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
public void rename(Path src, Path dst, final Options.Rename... options)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.RENAME);
|
||||
final Path absSrc = fixRelativePart(src);
|
||||
final Path absDst = fixRelativePart(dst);
|
||||
// Try the rename without resolving first
|
||||
@ -701,6 +732,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public boolean truncate(Path f, final long newLength) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.TRUNCATE);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<Boolean>() {
|
||||
@Override
|
||||
@ -718,6 +750,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public boolean delete(Path f, final boolean recursive) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.DELETE);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<Boolean>() {
|
||||
@Override
|
||||
@ -735,6 +768,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public ContentSummary getContentSummary(Path f) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<ContentSummary>() {
|
||||
@Override
|
||||
@ -752,6 +786,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public QuotaUsage getQuotaUsage(Path f) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_QUOTA_USAGE);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<QuotaUsage>() {
|
||||
@Override
|
||||
@ -836,6 +871,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
stats[i] = partialListing[i].makeQualified(getUri(), p);
|
||||
}
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
|
||||
return stats;
|
||||
}
|
||||
|
||||
@ -850,6 +886,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
listing.add(fileStatus.makeQualified(getUri(), p));
|
||||
}
|
||||
statistics.incrementLargeReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
|
||||
|
||||
// now fetch more entries
|
||||
do {
|
||||
@ -864,6 +901,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
listing.add(fileStatus.makeQualified(getUri(), p));
|
||||
}
|
||||
statistics.incrementLargeReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
|
||||
} while (thisListing.hasMore());
|
||||
|
||||
return listing.toArray(new FileStatus[listing.size()]);
|
||||
@ -977,6 +1015,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
|
||||
needLocation);
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.LIST_LOCATED_STATUS);
|
||||
if (thisListing == null) { // the directory does not exist
|
||||
throw new FileNotFoundException("File " + p + " does not exist.");
|
||||
}
|
||||
@ -1072,6 +1111,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
private boolean mkdirsInternal(Path f, final FsPermission permission,
|
||||
final boolean createParent) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.MKDIRS);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<Boolean>() {
|
||||
@Override
|
||||
@ -1098,6 +1138,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.PRIMITIVE_MKDIR);
|
||||
return dfs.primitiveMkdir(getPathName(f), absolutePermission);
|
||||
}
|
||||
|
||||
@ -1126,6 +1167,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public FsStatus getStatus(Path p) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_STATUS);
|
||||
return dfs.getDiskStatus();
|
||||
}
|
||||
|
||||
@ -1317,6 +1359,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public FileStatus getFileStatus(Path f) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<FileStatus>() {
|
||||
@Override
|
||||
@ -1344,6 +1387,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
throw new UnsupportedOperationException("Symlinks not supported");
|
||||
}
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK);
|
||||
final Path absF = fixRelativePart(link);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -1367,6 +1411,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public FileStatus getFileLinkStatus(final Path f) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_FILE_LINK_STATUS);
|
||||
final Path absF = fixRelativePart(f);
|
||||
FileStatus status = new FileSystemLinkResolver<FileStatus>() {
|
||||
@Override
|
||||
@ -1396,6 +1441,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public Path getLinkTarget(final Path f) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_LINK_TARGET);
|
||||
final Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<Path>() {
|
||||
@Override
|
||||
@ -1417,6 +1463,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
protected Path resolveLink(Path f) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.RESOLVE_LINK);
|
||||
String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
|
||||
if (target == null) {
|
||||
throw new FileNotFoundException("File does not exist: " + f.toString());
|
||||
@ -1427,6 +1474,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public FileChecksum getFileChecksum(Path f) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<FileChecksum>() {
|
||||
@Override
|
||||
@ -1446,6 +1494,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
public FileChecksum getFileChecksum(Path f, final long length)
|
||||
throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<FileChecksum>() {
|
||||
@Override
|
||||
@ -1471,6 +1520,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
public void setPermission(Path p, final FsPermission permission
|
||||
) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_PERMISSION);
|
||||
Path absF = fixRelativePart(p);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -1495,6 +1545,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
throw new IOException("username == null && groupname == null");
|
||||
}
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_OWNER);
|
||||
Path absF = fixRelativePart(p);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -1516,6 +1567,7 @@ public class DistributedFileSystem extends FileSystem {
|
||||
public void setTimes(Path p, final long mtime, final long atime)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_TIMES);
|
||||
Path absF = fixRelativePart(p);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -1593,6 +1645,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
|
||||
/** @see HdfsAdmin#allowSnapshot(Path) */
|
||||
public void allowSnapshot(final Path path) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.ALLOW_SNAPSHOT);
|
||||
Path absF = fixRelativePart(path);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -1619,6 +1673,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
|
||||
/** @see HdfsAdmin#disallowSnapshot(Path) */
|
||||
public void disallowSnapshot(final Path path) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.DISALLOW_SNAPSHOT);
|
||||
Path absF = fixRelativePart(path);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -1646,6 +1702,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public Path createSnapshot(final Path path, final String snapshotName)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT);
|
||||
Path absF = fixRelativePart(path);
|
||||
return new FileSystemLinkResolver<Path>() {
|
||||
@Override
|
||||
@ -1671,6 +1729,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public void renameSnapshot(final Path path, final String snapshotOldName,
|
||||
final String snapshotNewName) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT);
|
||||
Path absF = fixRelativePart(path);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -1707,6 +1767,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT);
|
||||
Path absF = fixRelativePart(snapshotDir);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -1954,6 +2016,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public void modifyAclEntries(Path path, final List<AclEntry> aclSpec)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES);
|
||||
Path absF = fixRelativePart(path);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -1976,6 +2040,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public void removeAclEntries(Path path, final List<AclEntry> aclSpec)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES);
|
||||
Path absF = fixRelativePart(path);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -1997,6 +2063,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
*/
|
||||
@Override
|
||||
public void removeDefaultAcl(Path path) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL);
|
||||
final Path absF = fixRelativePart(path);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -2017,6 +2085,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
*/
|
||||
@Override
|
||||
public void removeAcl(Path path) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.REMOVE_ACL);
|
||||
final Path absF = fixRelativePart(path);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -2038,6 +2108,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public void setAcl(Path path, final List<AclEntry> aclSpec)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_ACL);
|
||||
Path absF = fixRelativePart(path);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -2136,6 +2208,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
@Override
|
||||
public void setXAttr(Path path, final String name, final byte[] value,
|
||||
final EnumSet<XAttrSetFlag> flag) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_XATTR);
|
||||
Path absF = fixRelativePart(path);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
|
||||
@ -2155,6 +2229,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
|
||||
@Override
|
||||
public byte[] getXAttr(Path path, final String name) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_XATTR);
|
||||
final Path absF = fixRelativePart(path);
|
||||
return new FileSystemLinkResolver<byte[]>() {
|
||||
@Override
|
||||
@ -2220,6 +2296,8 @@ public class DistributedFileSystem extends FileSystem {
|
||||
|
||||
@Override
|
||||
public void removeXAttr(Path path, final String name) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR);
|
||||
Path absF = fixRelativePart(path);
|
||||
new FileSystemLinkResolver<Void>() {
|
||||
@Override
|
||||
@ -2450,4 +2528,5 @@ public class DistributedFileSystem extends FileSystem {
|
||||
Statistics getFsStatistics() {
|
||||
return statistics;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -62,6 +62,11 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.GlobalStorageStatistics;
|
||||
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
|
||||
import org.apache.hadoop.fs.StorageStatistics;
|
||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
|
||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||
import org.apache.hadoop.fs.Options;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -143,6 +148,8 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
private static final ObjectReader READER =
|
||||
new ObjectMapper().reader(Map.class);
|
||||
|
||||
private DFSOpsCountStatistics storageStatistics;
|
||||
|
||||
/**
|
||||
* Return the protocol scheme for the FileSystem.
|
||||
* <p/>
|
||||
@ -240,6 +247,15 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
|
||||
this.initializeRestCsrf(conf);
|
||||
this.delegationToken = null;
|
||||
|
||||
storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE
|
||||
.put(DFSOpsCountStatistics.NAME,
|
||||
new StorageStatisticsProvider() {
|
||||
@Override
|
||||
public StorageStatistics provide() {
|
||||
return new DFSOpsCountStatistics();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
@ -974,6 +990,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
@Override
|
||||
public FileStatus getFileStatus(Path f) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS);
|
||||
return makeQualified(getHdfsFileStatus(f), f);
|
||||
}
|
||||
|
||||
@ -1003,6 +1020,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
@Override
|
||||
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.MKDIRS);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
|
||||
return new FsPathBooleanRunner(op, f,
|
||||
new PermissionParam(applyUMask(permission))
|
||||
@ -1015,6 +1033,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public void createSymlink(Path destination, Path f, boolean createParent
|
||||
) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
|
||||
new FsPathRunner(op, f,
|
||||
new DestinationParam(makeQualified(destination).toUri().getPath()),
|
||||
@ -1025,6 +1044,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
@Override
|
||||
public boolean rename(final Path src, final Path dst) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.RENAME);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
|
||||
return new FsPathBooleanRunner(op, src,
|
||||
new DestinationParam(makeQualified(dst).toUri().getPath())
|
||||
@ -1036,6 +1056,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public void rename(final Path src, final Path dst,
|
||||
final Options.Rename... options) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.RENAME);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
|
||||
new FsPathRunner(op, src,
|
||||
new DestinationParam(makeQualified(dst).toUri().getPath()),
|
||||
@ -1047,6 +1068,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public void setXAttr(Path p, String name, byte[] value,
|
||||
EnumSet<XAttrSetFlag> flag) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_XATTR);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
|
||||
if (value != null) {
|
||||
new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
|
||||
@ -1060,6 +1082,8 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
|
||||
@Override
|
||||
public byte[] getXAttr(Path p, final String name) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_XATTR);
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
|
||||
return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
|
||||
new XAttrEncodingParam(XAttrCodec.HEX)) {
|
||||
@ -1116,6 +1140,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
@Override
|
||||
public void removeXAttr(Path p, String name) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR;
|
||||
new FsPathRunner(op, p, new XAttrNameParam(name)).run();
|
||||
}
|
||||
@ -1128,6 +1153,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
}
|
||||
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_OWNER);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
|
||||
new FsPathRunner(op, p,
|
||||
new OwnerParam(owner), new GroupParam(group)
|
||||
@ -1138,6 +1164,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public void setPermission(final Path p, final FsPermission permission
|
||||
) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_PERMISSION);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
|
||||
new FsPathRunner(op, p,new PermissionParam(permission)).run();
|
||||
}
|
||||
@ -1146,6 +1173,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
|
||||
new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
|
||||
}
|
||||
@ -1154,6 +1182,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public void removeAclEntries(Path path, List<AclEntry> aclSpec)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
|
||||
new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
|
||||
}
|
||||
@ -1161,6 +1190,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
@Override
|
||||
public void removeDefaultAcl(Path path) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
|
||||
new FsPathRunner(op, path).run();
|
||||
}
|
||||
@ -1168,6 +1198,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
@Override
|
||||
public void removeAcl(Path path) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.REMOVE_ACL);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
|
||||
new FsPathRunner(op, path).run();
|
||||
}
|
||||
@ -1176,12 +1207,14 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public void setAcl(final Path p, final List<AclEntry> aclSpec)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_ACL);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.SETACL;
|
||||
new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
|
||||
}
|
||||
|
||||
public void allowSnapshot(final Path p) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.ALLOW_SNAPSHOT);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.ALLOWSNAPSHOT;
|
||||
new FsPathRunner(op, p).run();
|
||||
}
|
||||
@ -1190,6 +1223,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public Path createSnapshot(final Path path, final String snapshotName)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
|
||||
return new FsPathResponseRunner<Path>(op, path,
|
||||
new SnapshotNameParam(snapshotName)) {
|
||||
@ -1202,6 +1236,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
|
||||
public void disallowSnapshot(final Path p) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.DISALLOW_SNAPSHOT);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.DISALLOWSNAPSHOT;
|
||||
new FsPathRunner(op, p).run();
|
||||
}
|
||||
@ -1210,6 +1245,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public void deleteSnapshot(final Path path, final String snapshotName)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT);
|
||||
final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT;
|
||||
new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run();
|
||||
}
|
||||
@ -1218,6 +1254,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public void renameSnapshot(final Path path, final String snapshotOldName,
|
||||
final String snapshotNewName) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT;
|
||||
new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName),
|
||||
new SnapshotNameParam(snapshotNewName)).run();
|
||||
@ -1227,6 +1264,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public boolean setReplication(final Path p, final short replication
|
||||
) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_REPLICATION);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
|
||||
return new FsPathBooleanRunner(op, p,
|
||||
new ReplicationParam(replication)
|
||||
@ -1237,6 +1275,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public void setTimes(final Path p, final long mtime, final long atime
|
||||
) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.SET_TIMES);
|
||||
final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
|
||||
new FsPathRunner(op, p,
|
||||
new ModificationTimeParam(mtime),
|
||||
@ -1259,6 +1298,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
@Override
|
||||
public void concat(final Path trg, final Path [] srcs) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CONCAT);
|
||||
final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
|
||||
new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
|
||||
}
|
||||
@ -1268,6 +1308,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
final boolean overwrite, final int bufferSize, final short replication,
|
||||
final long blockSize, final Progressable progress) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CREATE);
|
||||
|
||||
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
|
||||
return new FsPathOutputStreamRunner(op, f, bufferSize,
|
||||
@ -1285,6 +1326,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
final int bufferSize, final short replication, final long blockSize,
|
||||
final Progressable progress) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE);
|
||||
|
||||
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
|
||||
return new FsPathOutputStreamRunner(op, f, bufferSize,
|
||||
@ -1301,6 +1343,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public FSDataOutputStream append(final Path f, final int bufferSize,
|
||||
final Progressable progress) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.APPEND);
|
||||
|
||||
final HttpOpParam.Op op = PostOpParam.Op.APPEND;
|
||||
return new FsPathOutputStreamRunner(op, f, bufferSize,
|
||||
@ -1311,6 +1354,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
@Override
|
||||
public boolean truncate(Path f, long newLength) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.TRUNCATE);
|
||||
|
||||
final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE;
|
||||
return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run();
|
||||
@ -1318,6 +1362,8 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
|
||||
@Override
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.DELETE);
|
||||
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
|
||||
return new FsPathBooleanRunner(op, f,
|
||||
new RecursiveParam(recursive)
|
||||
@ -1328,6 +1374,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public FSDataInputStream open(final Path f, final int bufferSize
|
||||
) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.OPEN);
|
||||
return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize));
|
||||
}
|
||||
|
||||
@ -1427,6 +1474,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
@Override
|
||||
public FileStatus[] listStatus(final Path f) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
|
||||
|
||||
final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
|
||||
return new FsPathResponseRunner<FileStatus[]>(op, f) {
|
||||
@ -1522,6 +1570,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public BlockLocation[] getFileBlockLocations(final Path p,
|
||||
final long offset, final long length) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
|
||||
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
|
||||
return new FsPathResponseRunner<BlockLocation[]>(op, p,
|
||||
@ -1543,6 +1592,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
@Override
|
||||
public ContentSummary getContentSummary(final Path p) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY);
|
||||
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
|
||||
return new FsPathResponseRunner<ContentSummary>(op, p) {
|
||||
@ -1557,6 +1607,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
|
||||
) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
|
||||
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
|
||||
return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
|
||||
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.eq;
|
||||
@ -39,9 +40,13 @@ import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
@ -49,18 +54,22 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.GlobalStorageStatistics;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
|
||||
@ -69,19 +78,27 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.InOrder;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class TestDistributedFileSystem {
|
||||
private static final Random RAN = new Random();
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
TestDistributedFileSystem.class);
|
||||
|
||||
static {
|
||||
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
|
||||
@ -554,54 +571,84 @@ public class TestDistributedFileSystem {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStatistics() throws Exception {
|
||||
public void testStatistics() throws IOException {
|
||||
FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
|
||||
DistributedFileSystem.class).reset();
|
||||
@SuppressWarnings("unchecked")
|
||||
ThreadLocal<StatisticsData> data = (ThreadLocal<StatisticsData>)
|
||||
Whitebox.getInternalState(
|
||||
FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
|
||||
DistributedFileSystem.class), "threadData");
|
||||
data.set(null);
|
||||
|
||||
int lsLimit = 2;
|
||||
final Configuration conf = getTestConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, lsLimit);
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
final FileSystem fs = cluster.getFileSystem();
|
||||
Path dir = new Path("/test");
|
||||
Path file = new Path(dir, "file");
|
||||
|
||||
int readOps = DFSTestUtil.getStatistics(fs).getReadOps();
|
||||
int writeOps = DFSTestUtil.getStatistics(fs).getWriteOps();
|
||||
int largeReadOps = DFSTestUtil.getStatistics(fs).getLargeReadOps();
|
||||
|
||||
int readOps = 0;
|
||||
int writeOps = 0;
|
||||
int largeReadOps = 0;
|
||||
|
||||
long opCount = getOpStatistics(OpType.MKDIRS);
|
||||
fs.mkdirs(dir);
|
||||
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
|
||||
checkOpStatistics(OpType.MKDIRS, opCount + 1);
|
||||
|
||||
opCount = getOpStatistics(OpType.CREATE);
|
||||
FSDataOutputStream out = fs.create(file, (short)1);
|
||||
out.close();
|
||||
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
|
||||
|
||||
checkOpStatistics(OpType.CREATE, opCount + 1);
|
||||
|
||||
opCount = getOpStatistics(OpType.GET_FILE_STATUS);
|
||||
FileStatus status = fs.getFileStatus(file);
|
||||
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
|
||||
checkOpStatistics(OpType.GET_FILE_STATUS, opCount + 1);
|
||||
|
||||
opCount = getOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS);
|
||||
fs.getFileBlockLocations(file, 0, 0);
|
||||
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
|
||||
|
||||
checkOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS, opCount + 1);
|
||||
fs.getFileBlockLocations(status, 0, 0);
|
||||
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
|
||||
checkOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS, opCount + 2);
|
||||
|
||||
opCount = getOpStatistics(OpType.OPEN);
|
||||
FSDataInputStream in = fs.open(file);
|
||||
in.close();
|
||||
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
|
||||
checkOpStatistics(OpType.OPEN, opCount + 1);
|
||||
|
||||
opCount = getOpStatistics(OpType.SET_REPLICATION);
|
||||
fs.setReplication(file, (short)2);
|
||||
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
|
||||
checkOpStatistics(OpType.SET_REPLICATION, opCount + 1);
|
||||
|
||||
opCount = getOpStatistics(OpType.RENAME);
|
||||
Path file1 = new Path(dir, "file1");
|
||||
fs.rename(file, file1);
|
||||
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
|
||||
checkOpStatistics(OpType.RENAME, opCount + 1);
|
||||
|
||||
opCount = getOpStatistics(OpType.GET_CONTENT_SUMMARY);
|
||||
fs.getContentSummary(file1);
|
||||
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
|
||||
checkOpStatistics(OpType.GET_CONTENT_SUMMARY, opCount + 1);
|
||||
|
||||
|
||||
// Iterative ls test
|
||||
long mkdirOp = getOpStatistics(OpType.MKDIRS);
|
||||
long listStatusOp = getOpStatistics(OpType.LIST_STATUS);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Path p = new Path(dir, Integer.toString(i));
|
||||
fs.mkdirs(p);
|
||||
mkdirOp++;
|
||||
FileStatus[] list = fs.listStatus(dir);
|
||||
if (list.length > lsLimit) {
|
||||
// if large directory, then count readOps and largeReadOps by
|
||||
@ -609,41 +656,131 @@ public class TestDistributedFileSystem {
|
||||
int iterations = (int)Math.ceil((double)list.length/lsLimit);
|
||||
largeReadOps += iterations;
|
||||
readOps += iterations;
|
||||
listStatusOp += iterations;
|
||||
} else {
|
||||
// Single iteration in listStatus - no large read operation done
|
||||
readOps++;
|
||||
listStatusOp++;
|
||||
}
|
||||
|
||||
// writeOps incremented by 1 for mkdirs
|
||||
// readOps and largeReadOps incremented by 1 or more
|
||||
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
|
||||
checkOpStatistics(OpType.MKDIRS, mkdirOp);
|
||||
checkOpStatistics(OpType.LIST_STATUS, listStatusOp);
|
||||
}
|
||||
|
||||
opCount = getOpStatistics(OpType.GET_STATUS);
|
||||
fs.getStatus(file1);
|
||||
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
|
||||
|
||||
checkOpStatistics(OpType.GET_STATUS, opCount + 1);
|
||||
|
||||
opCount = getOpStatistics(OpType.GET_FILE_CHECKSUM);
|
||||
fs.getFileChecksum(file1);
|
||||
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
|
||||
checkOpStatistics(OpType.GET_FILE_CHECKSUM, opCount + 1);
|
||||
|
||||
opCount = getOpStatistics(OpType.SET_PERMISSION);
|
||||
fs.setPermission(file1, new FsPermission((short)0777));
|
||||
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
|
||||
checkOpStatistics(OpType.SET_PERMISSION, opCount + 1);
|
||||
|
||||
opCount = getOpStatistics(OpType.SET_TIMES);
|
||||
fs.setTimes(file1, 0L, 0L);
|
||||
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
|
||||
|
||||
checkOpStatistics(OpType.SET_TIMES, opCount + 1);
|
||||
|
||||
opCount = getOpStatistics(OpType.SET_OWNER);
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
fs.setOwner(file1, ugi.getUserName(), ugi.getGroupNames()[0]);
|
||||
checkOpStatistics(OpType.SET_OWNER, opCount + 1);
|
||||
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
|
||||
|
||||
|
||||
opCount = getOpStatistics(OpType.DELETE);
|
||||
fs.delete(dir, true);
|
||||
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
|
||||
checkOpStatistics(OpType.DELETE, opCount + 1);
|
||||
|
||||
} finally {
|
||||
if (cluster != null) cluster.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
|
||||
@Test (timeout = 180000)
|
||||
public void testConcurrentStatistics()
|
||||
throws IOException, InterruptedException {
|
||||
FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
|
||||
DistributedFileSystem.class).reset();
|
||||
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(
|
||||
new Configuration()).build();
|
||||
cluster.waitActive();
|
||||
final FileSystem fs = cluster.getFileSystem();
|
||||
final int numThreads = 5;
|
||||
final ExecutorService threadPool =
|
||||
HadoopExecutors.newFixedThreadPool(numThreads);
|
||||
|
||||
try {
|
||||
final CountDownLatch allExecutorThreadsReady =
|
||||
new CountDownLatch(numThreads);
|
||||
final CountDownLatch startBlocker = new CountDownLatch(1);
|
||||
final CountDownLatch allDone = new CountDownLatch(numThreads);
|
||||
final AtomicReference<Throwable> childError = new AtomicReference<>();
|
||||
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
threadPool.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
allExecutorThreadsReady.countDown();
|
||||
try {
|
||||
startBlocker.await();
|
||||
final FileSystem fs = cluster.getFileSystem();
|
||||
fs.mkdirs(new Path("/testStatisticsParallelChild"));
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Child failed when calling mkdir", t);
|
||||
childError.compareAndSet(null, t);
|
||||
} finally {
|
||||
allDone.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
final long oldMkdirOpCount = getOpStatistics(OpType.MKDIRS);
|
||||
|
||||
// wait until all threads are ready
|
||||
allExecutorThreadsReady.await();
|
||||
// all threads start making directories
|
||||
startBlocker.countDown();
|
||||
// wait until all threads are done
|
||||
allDone.await();
|
||||
|
||||
assertNull("Child failed with exception " + childError.get(),
|
||||
childError.get());
|
||||
|
||||
checkStatistics(fs, 0, numThreads, 0);
|
||||
// check the single operation count stat
|
||||
checkOpStatistics(OpType.MKDIRS, numThreads + oldMkdirOpCount);
|
||||
// iterate all the operation counts
|
||||
for (Iterator<LongStatistic> opCountIter =
|
||||
FileSystem.getGlobalStorageStatistics()
|
||||
.get(DFSOpsCountStatistics.NAME).getLongStatistics();
|
||||
opCountIter.hasNext();) {
|
||||
final LongStatistic opCount = opCountIter.next();
|
||||
if (OpType.MKDIRS.getSymbol().equals(opCount.getName())) {
|
||||
assertEquals("Unexpected op count from iterator!",
|
||||
numThreads + oldMkdirOpCount, opCount.getValue());
|
||||
}
|
||||
LOG.info(opCount.getName() + "\t" + opCount.getValue());
|
||||
}
|
||||
} finally {
|
||||
threadPool.shutdownNow();
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/** Checks statistics. -1 indicates do not check for the operations */
|
||||
private void checkStatistics(FileSystem fs, int readOps, int writeOps,
|
||||
int largeReadOps) {
|
||||
@ -713,6 +850,17 @@ public class TestDistributedFileSystem {
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkOpStatistics(OpType op, long count) {
|
||||
assertEquals("Op " + op.getSymbol() + " has unexpected count!",
|
||||
count, getOpStatistics(op));
|
||||
}
|
||||
|
||||
private static long getOpStatistics(OpType op) {
|
||||
return GlobalStorageStatistics.INSTANCE.get(
|
||||
DFSOpsCountStatistics.NAME)
|
||||
.getLong(op.getSymbol());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFileChecksum() throws Exception {
|
||||
final long seed = RAN.nextLong();
|
||||
|
Loading…
x
Reference in New Issue
Block a user