HADOOP-17451. IOStatistics test failures in S3A code. (#2594)
Caused by HADOOP-16830 and HADOOP-17271. Fixes tests which fail intermittently based on configs and in the case of the HugeFile tests, bulk runs with existing FS instances meant statistic probes sometimes ended up probing those of a previous FS. Contributed by Steve Loughran. Change-Id: I65ba3f44444e59d298df25ac5c8dc5a8781dfb7d
This commit is contained in:
parent
240b25310e
commit
56576f080b
@ -67,23 +67,46 @@ public Iterator<LongStatistic> iterator() {
|
|||||||
public Iterator<LongStatistic> getLongStatistics() {
|
public Iterator<LongStatistic> getLongStatistics() {
|
||||||
final Set<Map.Entry<String, Long>> counters = counters()
|
final Set<Map.Entry<String, Long>> counters = counters()
|
||||||
.entrySet();
|
.entrySet();
|
||||||
return counters.stream().map(e ->
|
final Set<LongStatistic> statisticSet = counters.stream().map(
|
||||||
new StorageStatistics.LongStatistic(e.getKey(), e.getValue()))
|
this::toLongStatistic)
|
||||||
.collect(Collectors.toSet()).iterator();
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
// add the gauges
|
||||||
|
gauges().entrySet().forEach(entry ->
|
||||||
|
statisticSet.add(toLongStatistic(entry)));
|
||||||
|
return statisticSet.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a counter/gauge entry to a long statistics.
|
||||||
|
* @param e entry
|
||||||
|
* @return statistic
|
||||||
|
*/
|
||||||
|
private LongStatistic toLongStatistic(final Map.Entry<String, Long> e) {
|
||||||
|
return new LongStatistic(e.getKey(), e.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, Long> counters() {
|
private Map<String, Long> counters() {
|
||||||
return ioStatistics.counters();
|
return ioStatistics.counters();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String, Long> gauges() {
|
||||||
|
return ioStatistics.gauges();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long getLong(final String key) {
|
public Long getLong(final String key) {
|
||||||
return counters().get(key);
|
Long l = counters().get(key);
|
||||||
|
if (l == null) {
|
||||||
|
l = gauges().get(key);
|
||||||
|
}
|
||||||
|
return l;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isTracked(final String key) {
|
public boolean isTracked(final String key) {
|
||||||
return counters().containsKey(key);
|
return counters().containsKey(key)
|
||||||
|
|| gauges().containsKey(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -64,10 +64,8 @@
|
|||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
@ -182,20 +180,6 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
|
|||||||
*/
|
*/
|
||||||
private final IOStatisticsStore instanceIOStatistics;
|
private final IOStatisticsStore instanceIOStatistics;
|
||||||
|
|
||||||
/**
|
|
||||||
* Gauges to create.
|
|
||||||
* <p></p>
|
|
||||||
* All statistics which are not gauges or quantiles
|
|
||||||
* are registered as counters.
|
|
||||||
*/
|
|
||||||
private static final Statistic[] GAUGES_TO_CREATE = {
|
|
||||||
OBJECT_PUT_REQUESTS_ACTIVE,
|
|
||||||
OBJECT_PUT_BYTES_PENDING,
|
|
||||||
STREAM_WRITE_BLOCK_UPLOADS_ACTIVE,
|
|
||||||
STREAM_WRITE_BLOCK_UPLOADS_PENDING,
|
|
||||||
STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING,
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct the instrumentation for a filesystem.
|
* Construct the instrumentation for a filesystem.
|
||||||
* @param name URI of filesystem.
|
* @param name URI of filesystem.
|
||||||
@ -211,10 +195,6 @@ public S3AInstrumentation(URI name) {
|
|||||||
// create the builder
|
// create the builder
|
||||||
IOStatisticsStoreBuilder storeBuilder = iostatisticsStore();
|
IOStatisticsStoreBuilder storeBuilder = iostatisticsStore();
|
||||||
|
|
||||||
// add the gauges
|
|
||||||
List<Statistic> gauges = Arrays.asList(GAUGES_TO_CREATE);
|
|
||||||
gauges.forEach(this::gauge);
|
|
||||||
|
|
||||||
// declare all counter statistics
|
// declare all counter statistics
|
||||||
EnumSet.allOf(Statistic.class).stream()
|
EnumSet.allOf(Statistic.class).stream()
|
||||||
.filter(statistic ->
|
.filter(statistic ->
|
||||||
@ -223,6 +203,14 @@ public S3AInstrumentation(URI name) {
|
|||||||
counter(stat);
|
counter(stat);
|
||||||
storeBuilder.withCounters(stat.getSymbol());
|
storeBuilder.withCounters(stat.getSymbol());
|
||||||
});
|
});
|
||||||
|
// declare all gauge statistics
|
||||||
|
EnumSet.allOf(Statistic.class).stream()
|
||||||
|
.filter(statistic ->
|
||||||
|
statistic.getType() == StatisticTypeEnum.TYPE_GAUGE)
|
||||||
|
.forEach(stat -> {
|
||||||
|
gauge(stat);
|
||||||
|
storeBuilder.withGauges(stat.getSymbol());
|
||||||
|
});
|
||||||
|
|
||||||
// and durations
|
// and durations
|
||||||
EnumSet.allOf(Statistic.class).stream()
|
EnumSet.allOf(Statistic.class).stream()
|
||||||
@ -1352,15 +1340,13 @@ private OutputStreamStatistics(
|
|||||||
this.filesystemStatistics = filesystemStatistics;
|
this.filesystemStatistics = filesystemStatistics;
|
||||||
IOStatisticsStore st = iostatisticsStore()
|
IOStatisticsStore st = iostatisticsStore()
|
||||||
.withCounters(
|
.withCounters(
|
||||||
StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS,
|
STREAM_WRITE_BLOCK_UPLOADS.getSymbol(),
|
||||||
STREAM_WRITE_BYTES.getSymbol(),
|
STREAM_WRITE_BYTES.getSymbol(),
|
||||||
STREAM_WRITE_EXCEPTIONS.getSymbol(),
|
STREAM_WRITE_EXCEPTIONS.getSymbol(),
|
||||||
StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING,
|
STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol(),
|
||||||
STREAM_WRITE_TOTAL_TIME.getSymbol(),
|
|
||||||
STREAM_WRITE_QUEUE_DURATION.getSymbol(),
|
STREAM_WRITE_QUEUE_DURATION.getSymbol(),
|
||||||
STREAM_WRITE_TOTAL_DATA.getSymbol(),
|
STREAM_WRITE_TOTAL_DATA.getSymbol(),
|
||||||
STREAM_WRITE_EXCEPTIONS.getSymbol(),
|
STREAM_WRITE_TOTAL_TIME.getSymbol())
|
||||||
STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS.getSymbol())
|
|
||||||
.withGauges(
|
.withGauges(
|
||||||
STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(),
|
STREAM_WRITE_BLOCK_UPLOADS_PENDING.getSymbol(),
|
||||||
STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol())
|
STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol())
|
||||||
@ -1470,7 +1456,7 @@ public void blockUploadFailed(
|
|||||||
@Override
|
@Override
|
||||||
public void bytesTransferred(long byteCount) {
|
public void bytesTransferred(long byteCount) {
|
||||||
bytesUploaded.addAndGet(byteCount);
|
bytesUploaded.addAndGet(byteCount);
|
||||||
incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, -byteCount);
|
incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, -byteCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -77,6 +77,7 @@
|
|||||||
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList;
|
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList;
|
||||||
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount;
|
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount;
|
||||||
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
|
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
|
||||||
|
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
|
||||||
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.eval;
|
import static org.apache.hadoop.test.LambdaTestUtils.eval;
|
||||||
|
|
||||||
@ -685,7 +686,8 @@ public void testPartialDirDelete() throws Throwable {
|
|||||||
readOnlyFiles.size());
|
readOnlyFiles.size());
|
||||||
rejectionCount.assertDiffEquals("Wrong rejection count",
|
rejectionCount.assertDiffEquals("Wrong rejection count",
|
||||||
readOnlyFiles.size());
|
readOnlyFiles.size());
|
||||||
reset(rejectionCount, deleteVerbCount, deleteObjectCount);
|
reset(rejectionCount, deleteVerbCount, deleteObjectCount,
|
||||||
|
bulkDeleteVerbCount);
|
||||||
}
|
}
|
||||||
// all the files are still there? (avoid in scale test due to cost)
|
// all the files are still there? (avoid in scale test due to cost)
|
||||||
if (!scaleTest) {
|
if (!scaleTest) {
|
||||||
@ -694,9 +696,13 @@ public void testPartialDirDelete() throws Throwable {
|
|||||||
|
|
||||||
describe("Trying to delete upper-level directory");
|
describe("Trying to delete upper-level directory");
|
||||||
ex = expectDeleteForbidden(basePath);
|
ex = expectDeleteForbidden(basePath);
|
||||||
|
String iostats = ioStatisticsSourceToString(roleFS);
|
||||||
|
|
||||||
if (multiDelete) {
|
if (multiDelete) {
|
||||||
// multi-delete status checks
|
// multi-delete status checks
|
||||||
deleteVerbCount.assertDiffEquals("Wrong delete count", 1);
|
deleteVerbCount.assertDiffEquals("Wrong delete request count", 0);
|
||||||
|
bulkDeleteVerbCount.assertDiffEquals(
|
||||||
|
"Wrong count of delete operations in " + iostats, 1);
|
||||||
MultiObjectDeleteException mde = extractCause(
|
MultiObjectDeleteException mde = extractCause(
|
||||||
MultiObjectDeleteException.class, ex);
|
MultiObjectDeleteException.class, ex);
|
||||||
List<MultiObjectDeleteSupport.KeyPath> undeletedKeyPaths =
|
List<MultiObjectDeleteSupport.KeyPath> undeletedKeyPaths =
|
||||||
|
@ -475,7 +475,7 @@ protected OperationCostValidator.ExpectedProbe whenDeleting(
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Execute a closure expecting a specific number of HEAD/LIST calls
|
* Execute a closure expecting a specific number of HEAD/LIST calls
|
||||||
* on <i>raw</i> S3 stores only.
|
* on <i>raw</i> S3 stores only. The operation is always evaluated.
|
||||||
* @param cost expected cost
|
* @param cost expected cost
|
||||||
* @param eval closure to evaluate
|
* @param eval closure to evaluate
|
||||||
* @param <T> return type of closure
|
* @param <T> return type of closure
|
||||||
@ -484,7 +484,8 @@ protected OperationCostValidator.ExpectedProbe whenDeleting(
|
|||||||
protected <T> T verifyRaw(
|
protected <T> T verifyRaw(
|
||||||
OperationCost cost,
|
OperationCost cost,
|
||||||
Callable<T> eval) throws Exception {
|
Callable<T> eval) throws Exception {
|
||||||
return verifyMetrics(eval, whenRaw(cost));
|
return verifyMetrics(eval,
|
||||||
|
whenRaw(cost), OperationCostValidator.always());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -121,17 +121,21 @@ public void testDeleteSingleFileInDir() throws Throwable {
|
|||||||
with(DIRECTORIES_DELETED, 0),
|
with(DIRECTORIES_DELETED, 0),
|
||||||
with(FILES_DELETED, 1),
|
with(FILES_DELETED, 1),
|
||||||
|
|
||||||
|
// a single DELETE call is made to delete the object
|
||||||
|
with(OBJECT_DELETE_REQUEST, DELETE_OBJECT_REQUEST),
|
||||||
|
|
||||||
// keeping: create no parent dirs or delete parents
|
// keeping: create no parent dirs or delete parents
|
||||||
withWhenKeeping(DIRECTORIES_CREATED, 0),
|
withWhenKeeping(DIRECTORIES_CREATED, 0),
|
||||||
withWhenKeeping(OBJECT_DELETE_OBJECTS, DELETE_OBJECT_REQUEST),
|
withWhenKeeping(OBJECT_BULK_DELETE_REQUEST, 0),
|
||||||
|
|
||||||
// deleting: create a parent and delete any of its parents
|
// deleting: create a parent and delete any of its parents
|
||||||
withWhenDeleting(DIRECTORIES_CREATED, 1),
|
withWhenDeleting(DIRECTORIES_CREATED, 1),
|
||||||
// two objects will be deleted
|
// a bulk delete for all parents is issued.
|
||||||
withWhenDeleting(OBJECT_DELETE_OBJECTS,
|
// the number of objects in it depends on the depth of the tree;
|
||||||
DELETE_OBJECT_REQUEST
|
// don't worry about that
|
||||||
+ DELETE_MARKER_REQUEST)
|
withWhenDeleting(OBJECT_BULK_DELETE_REQUEST, DELETE_MARKER_REQUEST)
|
||||||
);
|
);
|
||||||
|
|
||||||
// there is an empty dir for a parent
|
// there is an empty dir for a parent
|
||||||
S3AFileStatus status = verifyRawInnerGetFileStatus(dir, true,
|
S3AFileStatus status = verifyRawInnerGetFileStatus(dir, true,
|
||||||
StatusProbeEnum.ALL, GET_FILE_STATUS_ON_DIR);
|
StatusProbeEnum.ALL, GET_FILE_STATUS_ON_DIR);
|
||||||
|
@ -37,7 +37,6 @@
|
|||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.StorageStatistics;
|
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||||
@ -49,10 +48,11 @@
|
|||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING;
|
||||||
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
|
||||||
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
|
||||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
|
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
|
||||||
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
|
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
|
||||||
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
|
|
||||||
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scale test which creates a huge file.
|
* Scale test which creates a huge file.
|
||||||
@ -169,7 +169,8 @@ public void test_010_CreateHugeFile() throws IOException {
|
|||||||
// there's lots of logging here, so that a tail -f on the output log
|
// there's lots of logging here, so that a tail -f on the output log
|
||||||
// can give a view of what is happening.
|
// can give a view of what is happening.
|
||||||
S3AFileSystem fs = getFileSystem();
|
S3AFileSystem fs = getFileSystem();
|
||||||
StorageStatistics storageStatistics = fs.getStorageStatistics();
|
IOStatistics iostats = fs.getIOStatistics();
|
||||||
|
|
||||||
String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol();
|
String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol();
|
||||||
String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol();
|
String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol();
|
||||||
Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE;
|
Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE;
|
||||||
@ -205,9 +206,9 @@ public void test_010_CreateHugeFile() throws IOException {
|
|||||||
percentage,
|
percentage,
|
||||||
writtenMB,
|
writtenMB,
|
||||||
filesizeMB,
|
filesizeMB,
|
||||||
storageStatistics.getLong(putBytes),
|
iostats.counters().get(putBytes),
|
||||||
gaugeValue(putBytesPending),
|
gaugeValue(putBytesPending),
|
||||||
storageStatistics.getLong(putRequests),
|
iostats.counters().get(putRequests),
|
||||||
gaugeValue(putRequestsActive),
|
gaugeValue(putRequestsActive),
|
||||||
elapsedTime,
|
elapsedTime,
|
||||||
writtenMB / elapsedTime));
|
writtenMB / elapsedTime));
|
||||||
@ -227,27 +228,27 @@ public void test_010_CreateHugeFile() throws IOException {
|
|||||||
logFSState();
|
logFSState();
|
||||||
bandwidth(timer, filesize);
|
bandwidth(timer, filesize);
|
||||||
LOG.info("Statistics after stream closed: {}", streamStatistics);
|
LOG.info("Statistics after stream closed: {}", streamStatistics);
|
||||||
IOStatistics iostats = snapshotIOStatistics(
|
|
||||||
retrieveIOStatistics(getFileSystem()));
|
|
||||||
LOG.info("IOStatistics after upload: {}",
|
LOG.info("IOStatistics after upload: {}",
|
||||||
demandStringifyIOStatistics(iostats));
|
demandStringifyIOStatistics(iostats));
|
||||||
long putRequestCount = storageStatistics.getLong(putRequests);
|
long putRequestCount = lookupCounterStatistic(iostats, putRequests);
|
||||||
Long putByteCount = storageStatistics.getLong(putBytes);
|
long putByteCount = lookupCounterStatistic(iostats, putBytes);
|
||||||
Assertions.assertThat(putRequestCount)
|
Assertions.assertThat(putRequestCount)
|
||||||
.describedAs("Put request count from filesystem stats %s",
|
.describedAs("Put request count from filesystem stats %s",
|
||||||
iostats)
|
iostats)
|
||||||
.isGreaterThan(0);
|
.isGreaterThan(0);
|
||||||
Assertions.assertThat(putByteCount)
|
Assertions.assertThat(putByteCount)
|
||||||
.describedAs("putByteCount count from filesystem stats %s",
|
.describedAs("%s count from filesystem stats %s",
|
||||||
iostats)
|
putBytes, iostats)
|
||||||
.isGreaterThan(0);
|
.isGreaterThan(0);
|
||||||
LOG.info("PUT {} bytes in {} operations; {} MB/operation",
|
LOG.info("PUT {} bytes in {} operations; {} MB/operation",
|
||||||
putByteCount, putRequestCount,
|
putByteCount, putRequestCount,
|
||||||
putByteCount / (putRequestCount * _1MB));
|
putByteCount / (putRequestCount * _1MB));
|
||||||
LOG.info("Time per PUT {} nS",
|
LOG.info("Time per PUT {} nS",
|
||||||
toHuman(timer.nanosPerOperation(putRequestCount)));
|
toHuman(timer.nanosPerOperation(putRequestCount)));
|
||||||
assertEquals("active put requests in \n" + fs,
|
verifyStatisticGaugeValue(iostats, putRequestsActive.getSymbol(), 0);
|
||||||
0, gaugeValue(putRequestsActive));
|
verifyStatisticGaugeValue(iostats,
|
||||||
|
STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING.getSymbol(), 0);
|
||||||
progress.verifyNoFailures(
|
progress.verifyNoFailures(
|
||||||
"Put file " + fileToCreate + " of size " + filesize);
|
"Put file " + fileToCreate + " of size " + filesize);
|
||||||
if (streamStatistics != null) {
|
if (streamStatistics != null) {
|
||||||
|
@ -23,11 +23,9 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
||||||
import org.apache.hadoop.fs.s3a.S3AInputStream;
|
import org.apache.hadoop.fs.s3a.S3AInputStream;
|
||||||
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
|
|
||||||
import org.apache.hadoop.fs.s3a.S3ATestConstants;
|
import org.apache.hadoop.fs.s3a.S3ATestConstants;
|
||||||
import org.apache.hadoop.fs.s3a.Statistic;
|
import org.apache.hadoop.fs.s3a.Statistic;
|
||||||
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -35,6 +33,7 @@
|
|||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||||
|
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupGaugeStatistic;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for scale tests; here is where the common scale configuration
|
* Base class for scale tests; here is where the common scale configuration
|
||||||
@ -184,17 +183,15 @@ protected S3AInputStream getS3AInputStream(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the gauge value of a statistic. Raises an assertion if
|
* Get the gauge value of a statistic from the
|
||||||
|
* IOStatistics of the filesystem. Raises an assertion if
|
||||||
* there is no such gauge.
|
* there is no such gauge.
|
||||||
* @param statistic statistic to look up
|
* @param statistic statistic to look up
|
||||||
* @return the value.
|
* @return the value.
|
||||||
*/
|
*/
|
||||||
public long gaugeValue(Statistic statistic) {
|
public long gaugeValue(Statistic statistic) {
|
||||||
S3AInstrumentation instrumentation = getFileSystem().getInstrumentation();
|
return lookupGaugeStatistic(getFileSystem().getIOStatistics(),
|
||||||
MutableGaugeLong gauge = instrumentation.lookupGauge(statistic.getSymbol());
|
statistic.getSymbol());
|
||||||
assertNotNull("No gauge " + statistic
|
|
||||||
+ " in " + instrumentation.dump("", " = ", "\n", true), gauge);
|
|
||||||
return gauge.value();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user