diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java index d75b9a4fef..5bc164ae8f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.LinkedList; import java.util.List; +import java.util.StringJoiner; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -37,6 +38,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -52,6 +56,9 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; + /** * Utility class to fetch block locations for specified Input paths using a * configured number of threads. @@ -60,7 +67,7 @@ * configuration. */ @Private -public class LocatedFileStatusFetcher { +public class LocatedFileStatusFetcher implements IOStatisticsSource { public static final Logger LOG = LoggerFactory.getLogger(LocatedFileStatusFetcher.class.getName()); @@ -87,6 +94,12 @@ public class LocatedFileStatusFetcher { private volatile Throwable unknownError; + /** + * Demand created IO Statistics: only if the filesystem + * returns statistics does this fetch collect them. + */ + private IOStatisticsSnapshot iostats; + /** * Instantiate. * The newApi switch is only used to configure what exception is raised @@ -226,7 +239,46 @@ private void decrementRunningAndCheckCompletion() { lock.unlock(); } } - + + /** + * Return any IOStatistics collected during listing. + * @return IO stats accrued. + */ + @Override + public synchronized IOStatistics getIOStatistics() { + return iostats; + } + + /** + * Add the statistics of an individual thread's scan. + * @param stats possibly null statistics. + */ + private void addResultStatistics(IOStatistics stats) { + if (stats != null) { + // demand creation of IO statistics. + synchronized (this) { + LOG.debug("Adding IOStatistics: {}", stats); + if (iostats == null) { + // demand create the statistics + iostats = snapshotIOStatistics(stats); + } else { + iostats.aggregate(stats); + } + } + } + } + + @Override + public String toString() { + final IOStatistics ioStatistics = getIOStatistics(); + StringJoiner stringJoiner = new StringJoiner(", ", + LocatedFileStatusFetcher.class.getSimpleName() + "[", "]"); + if (ioStatistics != null) { + stringJoiner.add("IOStatistics=" + ioStatistics); + } + return stringJoiner.toString(); + } + /** * Retrieves block locations for the given @link {@link FileStatus}, and adds * additional paths to the process queue if required. @@ -266,6 +318,8 @@ public Result call() throws Exception { } } } + // aggregate any stats + result.stats = retrieveIOStatistics(iter); } else { result.locatedFileStatuses.add(fileStatus); } @@ -276,6 +330,7 @@ private static class Result { private List locatedFileStatuses = new LinkedList<>(); private List dirsNeedingRecursiveCalls = new LinkedList<>(); private FileSystem fs; + private IOStatistics stats; } } @@ -290,6 +345,7 @@ private class ProcessInputDirCallback implements @Override public void onSuccess(ProcessInputDirCallable.Result result) { try { + addResultStatistics(result.stats); if (!result.locatedFileStatuses.isEmpty()) { resultQueue.add(result.locatedFileStatuses); }