diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StorageStatisticsFromIOStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StorageStatisticsFromIOStatistics.java index a55f04cae8..f586cd8d9b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StorageStatisticsFromIOStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StorageStatisticsFromIOStatistics.java @@ -67,23 +67,46 @@ public Iterator iterator() { public Iterator getLongStatistics() { final Set> counters = counters() .entrySet(); - return counters.stream().map(e -> - new StorageStatistics.LongStatistic(e.getKey(), e.getValue())) - .collect(Collectors.toSet()).iterator(); + final Set statisticSet = counters.stream().map( + this::toLongStatistic) + .collect(Collectors.toSet()); + + // add the gauges + gauges().entrySet().forEach(entry -> + statisticSet.add(toLongStatistic(entry))); + return statisticSet.iterator(); + } + + /** + * Convert a counter/gauge entry to a long statistics. + * @param e entry + * @return statistic + */ + private LongStatistic toLongStatistic(final Map.Entry e) { + return new LongStatistic(e.getKey(), e.getValue()); } private Map counters() { return ioStatistics.counters(); } + private Map gauges() { + return ioStatistics.gauges(); + } + @Override public Long getLong(final String key) { - return counters().get(key); + Long l = counters().get(key); + if (l == null) { + l = gauges().get(key); + } + return l; } @Override public boolean isTracked(final String key) { - return counters().containsKey(key); + return counters().containsKey(key) + || gauges().containsKey(key); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 982611a098..c25e3b3c0e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -64,10 +64,8 @@ import java.io.Closeable; import java.net.URI; import java.time.Duration; -import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -182,20 +180,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource, */ private final IOStatisticsStore instanceIOStatistics; - /** - * Gauges to create. - *

- * All statistics which are not gauges or quantiles - * are registered as counters. - */ - private static final Statistic[] GAUGES_TO_CREATE = { - OBJECT_PUT_REQUESTS_ACTIVE, - OBJECT_PUT_BYTES_PENDING, - STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, - STREAM_WRITE_BLOCK_UPLOADS_PENDING, - STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, - }; - /** * Construct the instrumentation for a filesystem. * @param name URI of filesystem. @@ -211,10 +195,6 @@ public S3AInstrumentation(URI name) { // create the builder IOStatisticsStoreBuilder storeBuilder = iostatisticsStore(); - // add the gauges - List gauges = Arrays.asList(GAUGES_TO_CREATE); - gauges.forEach(this::gauge); - // declare all counter statistics EnumSet.allOf(Statistic.class).stream() .filter(statistic -> @@ -223,6 +203,14 @@ public S3AInstrumentation(URI name) { counter(stat); storeBuilder.withCounters(stat.getSymbol()); }); + // declare all gauge statistics + EnumSet.allOf(Statistic.class).stream() + .filter(statistic -> + statistic.getType() == StatisticTypeEnum.TYPE_GAUGE) + .forEach(stat -> { + gauge(stat); + storeBuilder.withGauges(stat.getSymbol()); + }); // and durations EnumSet.allOf(Statistic.class).stream() @@ -1352,15 +1340,13 @@ private OutputStreamStatistics( this.filesystemStatistics = filesystemStatistics; IOStatisticsStore st = iostatisticsStore() .withCounters( - StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS, + STREAM_WRITE_BLOCK_UPLOADS.getSymbol(), STREAM_WRITE_BYTES.getSymbol(), STREAM_WRITE_EXCEPTIONS.getSymbol(), - StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, - STREAM_WRITE_TOTAL_TIME.getSymbol(), + STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(), STREAM_WRITE_QUEUE_DURATION.getSymbol(), STREAM_WRITE_TOTAL_DATA.getSymbol(), - STREAM_WRITE_EXCEPTIONS.getSymbol(), - STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol()) + STREAM_WRITE_TOTAL_TIME.getSymbol()) .withGauges( STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(), STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol()) @@ -1470,7 +1456,7 @@ public void blockUploadFailed( @Override public void bytesTransferred(long byteCount) { bytesUploaded.addAndGet(byteCount); - incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, -byteCount); + incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, -byteCount); } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java index d31536925a..e20e936454 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java @@ -76,6 +76,7 @@ import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.test.LambdaTestUtils.eval; @@ -683,7 +684,8 @@ public void testPartialDirDelete() throws Throwable { readOnlyFiles.size()); rejectionCount.assertDiffEquals("Wrong rejection count", readOnlyFiles.size()); - reset(rejectionCount, deleteVerbCount, deleteObjectCount); + reset(rejectionCount, deleteVerbCount, deleteObjectCount, + bulkDeleteVerbCount); } // all the files are still there? (avoid in scale test due to cost) if (!scaleTest) { @@ -692,9 +694,13 @@ public void testPartialDirDelete() throws Throwable { describe("Trying to delete upper-level directory"); ex = expectDeleteForbidden(basePath); + String iostats = ioStatisticsSourceToString(roleFS); + if (multiDelete) { // multi-delete status checks - deleteVerbCount.assertDiffEquals("Wrong delete count", 1); + deleteVerbCount.assertDiffEquals("Wrong delete request count", 0); + bulkDeleteVerbCount.assertDiffEquals( + "Wrong count of delete operations in " + iostats, 1); MultiObjectDeleteException mde = extractCause( MultiObjectDeleteException.class, ex); List undeletedKeyPaths = diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java index 4a2d695e6a..c4f8db7193 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java @@ -475,7 +475,7 @@ protected OperationCostValidator.ExpectedProbe whenDeleting( /** * Execute a closure expecting a specific number of HEAD/LIST calls - * on raw S3 stores only. + * on raw S3 stores only. The operation is always evaluated. * @param cost expected cost * @param eval closure to evaluate * @param return type of closure @@ -484,7 +484,8 @@ protected OperationCostValidator.ExpectedProbe whenDeleting( protected T verifyRaw( OperationCost cost, Callable eval) throws Exception { - return verifyMetrics(eval, whenRaw(cost)); + return verifyMetrics(eval, + whenRaw(cost), OperationCostValidator.always()); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java index f5d223932d..2901767128 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java @@ -121,17 +121,21 @@ public void testDeleteSingleFileInDir() throws Throwable { with(DIRECTORIES_DELETED, 0), with(FILES_DELETED, 1), + // a single DELETE call is made to delete the object + with(OBJECT_DELETE_REQUEST, DELETE_OBJECT_REQUEST), + // keeping: create no parent dirs or delete parents withWhenKeeping(DIRECTORIES_CREATED, 0), - withWhenKeeping(OBJECT_DELETE_OBJECTS, DELETE_OBJECT_REQUEST), + withWhenKeeping(OBJECT_BULK_DELETE_REQUEST, 0), // deleting: create a parent and delete any of its parents withWhenDeleting(DIRECTORIES_CREATED, 1), - // two objects will be deleted - withWhenDeleting(OBJECT_DELETE_OBJECTS, - DELETE_OBJECT_REQUEST - + DELETE_MARKER_REQUEST) + // a bulk delete for all parents is issued. + // the number of objects in it depends on the depth of the tree; + // don't worry about that + withWhenDeleting(OBJECT_BULK_DELETE_REQUEST, DELETE_MARKER_REQUEST) ); + // there is an empty dir for a parent S3AFileStatus status = verifyRawInnerGetFileStatus(dir, true, StatusProbeEnum.ALL, GET_FILE_STATUS_ON_DIR); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 851b1b16ee..2b3043f39a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -37,7 +37,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; @@ -49,10 +48,11 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; -import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics; -import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; /** * Scale test which creates a huge file. @@ -169,7 +169,8 @@ public void test_010_CreateHugeFile() throws IOException { // there's lots of logging here, so that a tail -f on the output log // can give a view of what is happening. S3AFileSystem fs = getFileSystem(); - StorageStatistics storageStatistics = fs.getStorageStatistics(); + IOStatistics iostats = fs.getIOStatistics(); + String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol(); String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol(); Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE; @@ -205,9 +206,9 @@ public void test_010_CreateHugeFile() throws IOException { percentage, writtenMB, filesizeMB, - storageStatistics.getLong(putBytes), + iostats.counters().get(putBytes), gaugeValue(putBytesPending), - storageStatistics.getLong(putRequests), + iostats.counters().get(putRequests), gaugeValue(putRequestsActive), elapsedTime, writtenMB / elapsedTime)); @@ -227,27 +228,27 @@ public void test_010_CreateHugeFile() throws IOException { logFSState(); bandwidth(timer, filesize); LOG.info("Statistics after stream closed: {}", streamStatistics); - IOStatistics iostats = snapshotIOStatistics( - retrieveIOStatistics(getFileSystem())); + LOG.info("IOStatistics after upload: {}", demandStringifyIOStatistics(iostats)); - long putRequestCount = storageStatistics.getLong(putRequests); - Long putByteCount = storageStatistics.getLong(putBytes); + long putRequestCount = lookupCounterStatistic(iostats, putRequests); + long putByteCount = lookupCounterStatistic(iostats, putBytes); Assertions.assertThat(putRequestCount) .describedAs("Put request count from filesystem stats %s", iostats) .isGreaterThan(0); Assertions.assertThat(putByteCount) - .describedAs("putByteCount count from filesystem stats %s", - iostats) + .describedAs("%s count from filesystem stats %s", + putBytes, iostats) .isGreaterThan(0); LOG.info("PUT {} bytes in {} operations; {} MB/operation", putByteCount, putRequestCount, putByteCount / (putRequestCount * _1MB)); LOG.info("Time per PUT {} nS", toHuman(timer.nanosPerOperation(putRequestCount))); - assertEquals("active put requests in \n" + fs, - 0, gaugeValue(putRequestsActive)); + verifyStatisticGaugeValue(iostats, putRequestsActive.getSymbol(), 0); + verifyStatisticGaugeValue(iostats, + STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0); progress.verifyNoFailures( "Put file " + fileToCreate + " of size " + filesize); if (streamStatistics != null) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java index 33f69aff6c..d95b46b10d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java @@ -23,11 +23,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.S3AInputStream; -import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.S3ATestConstants; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; -import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +33,7 @@ import java.io.InputStream; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupGaugeStatistic; /** * Base class for scale tests; here is where the common scale configuration @@ -184,17 +183,15 @@ protected S3AInputStream getS3AInputStream( } /** - * Get the gauge value of a statistic. Raises an assertion if + * Get the gauge value of a statistic from the + * IOStatistics of the filesystem. Raises an assertion if * there is no such gauge. * @param statistic statistic to look up * @return the value. */ public long gaugeValue(Statistic statistic) { - S3AInstrumentation instrumentation = getFileSystem().getInstrumentation(); - MutableGaugeLong gauge = instrumentation.lookupGauge(statistic.getSymbol()); - assertNotNull("No gauge " + statistic - + " in " + instrumentation.dump("", " = ", "\n", true), gauge); - return gauge.value(); + return lookupGaugeStatistic(getFileSystem().getIOStatistics(), + statistic.getSymbol()); } /**