From d3014e01f3538c6b161b48fa297ba8afeb002b30 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 12 Jan 2021 17:25:14 +0000 Subject: [PATCH] HADOOP-17451. IOStatistics test failures in S3A code. (#2594) Caused by HADOOP-16380 and HADOOP-17271. Fixes tests which fail intermittently based on configs and in the case of the HugeFile tests, bulk runs with existing FS instances meant statistic probes sometimes ended up probing those of a previous FS. Contributed by Steve Loughran. --- .../StorageStatisticsFromIOStatistics.java | 33 +++++++++++++--- .../hadoop/fs/s3a/S3AInstrumentation.java | 38 ++++++------------- .../s3a/impl/ITestPartialRenamesDeletes.java | 10 ++++- .../s3a/performance/AbstractS3ACostTest.java | 5 ++- .../s3a/performance/ITestS3ADeleteCost.java | 14 ++++--- .../s3a/scale/AbstractSTestS3AHugeFiles.java | 29 +++++++------- .../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 13 +++---- 7 files changed, 80 insertions(+), 62 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StorageStatisticsFromIOStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StorageStatisticsFromIOStatistics.java index a55f04cae8..f586cd8d9b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StorageStatisticsFromIOStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/StorageStatisticsFromIOStatistics.java @@ -67,23 +67,46 @@ public Iterator iterator() { public Iterator getLongStatistics() { final Set> counters = counters() .entrySet(); - return counters.stream().map(e -> - new StorageStatistics.LongStatistic(e.getKey(), e.getValue())) - .collect(Collectors.toSet()).iterator(); + final Set statisticSet = counters.stream().map( + this::toLongStatistic) + .collect(Collectors.toSet()); + + // add the gauges + gauges().entrySet().forEach(entry -> + statisticSet.add(toLongStatistic(entry))); + return statisticSet.iterator(); + } + + /** + * Convert a counter/gauge entry to a long statistics. + * @param e entry + * @return statistic + */ + private LongStatistic toLongStatistic(final Map.Entry e) { + return new LongStatistic(e.getKey(), e.getValue()); } private Map counters() { return ioStatistics.counters(); } + private Map gauges() { + return ioStatistics.gauges(); + } + @Override public Long getLong(final String key) { - return counters().get(key); + Long l = counters().get(key); + if (l == null) { + l = gauges().get(key); + } + return l; } @Override public boolean isTracked(final String key) { - return counters().containsKey(key); + return counters().containsKey(key) + || gauges().containsKey(key); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 982611a098..c25e3b3c0e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -64,10 +64,8 @@ import java.io.Closeable; import java.net.URI; import java.time.Duration; -import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -182,20 +180,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource, */ private final IOStatisticsStore instanceIOStatistics; - /** - * Gauges to create. - *

- * All statistics which are not gauges or quantiles - * are registered as counters. - */ - private static final Statistic[] GAUGES_TO_CREATE = { - OBJECT_PUT_REQUESTS_ACTIVE, - OBJECT_PUT_BYTES_PENDING, - STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, - STREAM_WRITE_BLOCK_UPLOADS_PENDING, - STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, - }; - /** * Construct the instrumentation for a filesystem. * @param name URI of filesystem. @@ -211,10 +195,6 @@ public S3AInstrumentation(URI name) { // create the builder IOStatisticsStoreBuilder storeBuilder = iostatisticsStore(); - // add the gauges - List gauges = Arrays.asList(GAUGES_TO_CREATE); - gauges.forEach(this::gauge); - // declare all counter statistics EnumSet.allOf(Statistic.class).stream() .filter(statistic -> @@ -223,6 +203,14 @@ public S3AInstrumentation(URI name) { counter(stat); storeBuilder.withCounters(stat.getSymbol()); }); + // declare all gauge statistics + EnumSet.allOf(Statistic.class).stream() + .filter(statistic -> + statistic.getType() == StatisticTypeEnum.TYPE_GAUGE) + .forEach(stat -> { + gauge(stat); + storeBuilder.withGauges(stat.getSymbol()); + }); // and durations EnumSet.allOf(Statistic.class).stream() @@ -1352,15 +1340,13 @@ private OutputStreamStatistics( this.filesystemStatistics = filesystemStatistics; IOStatisticsStore st = iostatisticsStore() .withCounters( - StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS, + STREAM_WRITE_BLOCK_UPLOADS.getSymbol(), STREAM_WRITE_BYTES.getSymbol(), STREAM_WRITE_EXCEPTIONS.getSymbol(), - StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, - STREAM_WRITE_TOTAL_TIME.getSymbol(), + STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(), STREAM_WRITE_QUEUE_DURATION.getSymbol(), STREAM_WRITE_TOTAL_DATA.getSymbol(), - STREAM_WRITE_EXCEPTIONS.getSymbol(), - STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol()) + STREAM_WRITE_TOTAL_TIME.getSymbol()) .withGauges( STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(), STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol()) @@ -1470,7 +1456,7 @@ public void blockUploadFailed( @Override public void bytesTransferred(long byteCount) { bytesUploaded.addAndGet(byteCount); - incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, -byteCount); + incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, -byteCount); } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java index d31536925a..e20e936454 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java @@ -76,6 +76,7 @@ import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; import static org.apache.hadoop.test.LambdaTestUtils.eval; @@ -683,7 +684,8 @@ public void testPartialDirDelete() throws Throwable { readOnlyFiles.size()); rejectionCount.assertDiffEquals("Wrong rejection count", readOnlyFiles.size()); - reset(rejectionCount, deleteVerbCount, deleteObjectCount); + reset(rejectionCount, deleteVerbCount, deleteObjectCount, + bulkDeleteVerbCount); } // all the files are still there? (avoid in scale test due to cost) if (!scaleTest) { @@ -692,9 +694,13 @@ public void testPartialDirDelete() throws Throwable { describe("Trying to delete upper-level directory"); ex = expectDeleteForbidden(basePath); + String iostats = ioStatisticsSourceToString(roleFS); + if (multiDelete) { // multi-delete status checks - deleteVerbCount.assertDiffEquals("Wrong delete count", 1); + deleteVerbCount.assertDiffEquals("Wrong delete request count", 0); + bulkDeleteVerbCount.assertDiffEquals( + "Wrong count of delete operations in " + iostats, 1); MultiObjectDeleteException mde = extractCause( MultiObjectDeleteException.class, ex); List undeletedKeyPaths = diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java index 4a2d695e6a..c4f8db7193 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/AbstractS3ACostTest.java @@ -475,7 +475,7 @@ protected OperationCostValidator.ExpectedProbe whenDeleting( /** * Execute a closure expecting a specific number of HEAD/LIST calls - * on raw S3 stores only. + * on raw S3 stores only. The operation is always evaluated. * @param cost expected cost * @param eval closure to evaluate * @param return type of closure @@ -484,7 +484,8 @@ protected OperationCostValidator.ExpectedProbe whenDeleting( protected T verifyRaw( OperationCost cost, Callable eval) throws Exception { - return verifyMetrics(eval, whenRaw(cost)); + return verifyMetrics(eval, + whenRaw(cost), OperationCostValidator.always()); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java index f5d223932d..2901767128 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3ADeleteCost.java @@ -121,17 +121,21 @@ public void testDeleteSingleFileInDir() throws Throwable { with(DIRECTORIES_DELETED, 0), with(FILES_DELETED, 1), + // a single DELETE call is made to delete the object + with(OBJECT_DELETE_REQUEST, DELETE_OBJECT_REQUEST), + // keeping: create no parent dirs or delete parents withWhenKeeping(DIRECTORIES_CREATED, 0), - withWhenKeeping(OBJECT_DELETE_OBJECTS, DELETE_OBJECT_REQUEST), + withWhenKeeping(OBJECT_BULK_DELETE_REQUEST, 0), // deleting: create a parent and delete any of its parents withWhenDeleting(DIRECTORIES_CREATED, 1), - // two objects will be deleted - withWhenDeleting(OBJECT_DELETE_OBJECTS, - DELETE_OBJECT_REQUEST - + DELETE_MARKER_REQUEST) + // a bulk delete for all parents is issued. + // the number of objects in it depends on the depth of the tree; + // don't worry about that + withWhenDeleting(OBJECT_BULK_DELETE_REQUEST, DELETE_MARKER_REQUEST) ); + // there is an empty dir for a parent S3AFileStatus status = verifyRawInnerGetFileStatus(dir, true, StatusProbeEnum.ALL, GET_FILE_STATUS_ON_DIR); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 851b1b16ee..2b3043f39a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -37,7 +37,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; @@ -49,10 +48,11 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; -import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics; -import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; /** * Scale test which creates a huge file. @@ -169,7 +169,8 @@ public void test_010_CreateHugeFile() throws IOException { // there's lots of logging here, so that a tail -f on the output log // can give a view of what is happening. S3AFileSystem fs = getFileSystem(); - StorageStatistics storageStatistics = fs.getStorageStatistics(); + IOStatistics iostats = fs.getIOStatistics(); + String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol(); String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol(); Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE; @@ -205,9 +206,9 @@ public void test_010_CreateHugeFile() throws IOException { percentage, writtenMB, filesizeMB, - storageStatistics.getLong(putBytes), + iostats.counters().get(putBytes), gaugeValue(putBytesPending), - storageStatistics.getLong(putRequests), + iostats.counters().get(putRequests), gaugeValue(putRequestsActive), elapsedTime, writtenMB / elapsedTime)); @@ -227,27 +228,27 @@ public void test_010_CreateHugeFile() throws IOException { logFSState(); bandwidth(timer, filesize); LOG.info("Statistics after stream closed: {}", streamStatistics); - IOStatistics iostats = snapshotIOStatistics( - retrieveIOStatistics(getFileSystem())); + LOG.info("IOStatistics after upload: {}", demandStringifyIOStatistics(iostats)); - long putRequestCount = storageStatistics.getLong(putRequests); - Long putByteCount = storageStatistics.getLong(putBytes); + long putRequestCount = lookupCounterStatistic(iostats, putRequests); + long putByteCount = lookupCounterStatistic(iostats, putBytes); Assertions.assertThat(putRequestCount) .describedAs("Put request count from filesystem stats %s", iostats) .isGreaterThan(0); Assertions.assertThat(putByteCount) - .describedAs("putByteCount count from filesystem stats %s", - iostats) + .describedAs("%s count from filesystem stats %s", + putBytes, iostats) .isGreaterThan(0); LOG.info("PUT {} bytes in {} operations; {} MB/operation", putByteCount, putRequestCount, putByteCount / (putRequestCount * _1MB)); LOG.info("Time per PUT {} nS", toHuman(timer.nanosPerOperation(putRequestCount))); - assertEquals("active put requests in \n" + fs, - 0, gaugeValue(putRequestsActive)); + verifyStatisticGaugeValue(iostats, putRequestsActive.getSymbol(), 0); + verifyStatisticGaugeValue(iostats, + STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0); progress.verifyNoFailures( "Put file " + fileToCreate + " of size " + filesize); if (streamStatistics != null) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java index 33f69aff6c..d95b46b10d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java @@ -23,11 +23,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.S3AInputStream; -import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.S3ATestConstants; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; -import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +33,7 @@ import java.io.InputStream; import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupGaugeStatistic; /** * Base class for scale tests; here is where the common scale configuration @@ -184,17 +183,15 @@ protected S3AInputStream getS3AInputStream( } /** - * Get the gauge value of a statistic. Raises an assertion if + * Get the gauge value of a statistic from the + * IOStatistics of the filesystem. Raises an assertion if * there is no such gauge. * @param statistic statistic to look up * @return the value. */ public long gaugeValue(Statistic statistic) { - S3AInstrumentation instrumentation = getFileSystem().getInstrumentation(); - MutableGaugeLong gauge = instrumentation.lookupGauge(statistic.getSymbol()); - assertNotNull("No gauge " + statistic - + " in " + instrumentation.dump("", " = ", "\n", true), gauge); - return gauge.value(); + return lookupGaugeStatistic(getFileSystem().getIOStatistics(), + statistic.getSymbol()); } /**