HADOOP-19043. S3A: Regression: ITestS3AOpenCost fails on prefetch test runs (#6465)

Disables the new tests added in:

HADOOP-19027. S3A: S3AInputStream doesn't recover from HTTP/channel exceptions #6425

The underlying issue here is that the block prefetch code can identify
when there's a mismatch between declared and actual length, and doesn't
store any of the incomplete buffer.

This should be addressed in HADOOP-18184.

Contributed by Steve Loughran
This commit is contained in:
Steve Loughran 2024-03-08 12:48:38 +00:00 committed by GitHub
parent fc166d3aec
commit bb32aec88e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 82 additions and 26 deletions

View File

@ -370,6 +370,14 @@ protected OperationCostValidator.ExpectedProbe always(
return expect(true, cost); return expect(true, cost);
} }
/**
* Always run a metrics operation.
* @return a probe.
*/
protected OperationCostValidator.ExpectedProbe always() {
return OperationCostValidator.always();
}
/** /**
* A metric diff which must hold when the fs is keeping markers. * A metric diff which must hold when the fs is keeping markers.
* @param cost expected cost * @param cost expected cost

View File

@ -52,6 +52,8 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION; import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
@ -60,10 +62,12 @@
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST; import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED; import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
@ -84,6 +88,11 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
private int fileLength; private int fileLength;
/**
* Is prefetching enabled?
*/
private boolean prefetching;
public ITestS3AOpenCost() { public ITestS3AOpenCost() {
super(true); super(true);
} }
@ -111,6 +120,7 @@ public void setup() throws Exception {
writeTextFile(fs, testFile, TEXT, true); writeTextFile(fs, testFile, TEXT, true);
testFileStatus = fs.getFileStatus(testFile); testFileStatus = fs.getFileStatus(testFile);
fileLength = (int)testFileStatus.getLen(); fileLength = (int)testFileStatus.getLen();
prefetching = prefetching();
} }
/** /**
@ -161,7 +171,11 @@ public void testOpenFileWithStatusOfOtherFS() throws Throwable {
@Test @Test
public void testStreamIsNotChecksummed() throws Throwable { public void testStreamIsNotChecksummed() throws Throwable {
describe("Verify that an opened stream is not checksummed"); describe("Verify that an opened stream is not checksummed");
// if prefetching is enabled, skip this test
assumeNoPrefetching();
S3AFileSystem fs = getFileSystem(); S3AFileSystem fs = getFileSystem();
// open the file // open the file
try (FSDataInputStream in = verifyMetrics(() -> try (FSDataInputStream in = verifyMetrics(() ->
fs.openFile(testFile) fs.openFile(testFile)
@ -173,12 +187,6 @@ public void testStreamIsNotChecksummed() throws Throwable {
always(NO_HEAD_OR_LIST), always(NO_HEAD_OR_LIST),
with(STREAM_READ_OPENED, 0))) { with(STREAM_READ_OPENED, 0))) {
// if prefetching is enabled, skip this test
final InputStream wrapped = in.getWrappedStream();
if (!(wrapped instanceof S3AInputStream)) {
skip("Not an S3AInputStream: " + wrapped);
}
// open the stream. // open the stream.
in.read(); in.read();
// now examine the innermost stream and make sure it doesn't have a checksum // now examine the innermost stream and make sure it doesn't have a checksum
@ -239,16 +247,20 @@ public void testOpenFileLongerLengthReadFully() throws Throwable {
try (FSDataInputStream in = openFile(longLen, try (FSDataInputStream in = openFile(longLen,
FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) { FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
byte[] out = new byte[(int) (longLen)]; byte[] out = new byte[(int) (longLen)];
intercept(EOFException.class, () -> in.readFully(0, out)); intercept(EOFException.class, () -> {
in.readFully(0, out);
return in;
});
in.seek(longLen - 1); in.seek(longLen - 1);
assertEquals("read past real EOF on " + in, -1, in.read()); assertEquals("read past real EOF on " + in, -1, in.read());
return in.toString(); return in.toString();
} }
}, },
always(),
// two GET calls were made, one for readFully, // two GET calls were made, one for readFully,
// the second on the read() past the EOF // the second on the read() past the EOF
// the operation has got as far as S3 // the operation has got as far as S3
with(STREAM_READ_OPENED, 1 + 1)); probe(!prefetching(), STREAM_READ_OPENED, 1 + 1));
// now on a new stream, try a full read from after the EOF // now on a new stream, try a full read from after the EOF
verifyMetrics(() -> { verifyMetrics(() -> {
@ -293,15 +305,19 @@ private FSDataInputStream openFile(final long longLen, String policy)
public void testReadPastEOF() throws Throwable { public void testReadPastEOF() throws Throwable {
// set a length past the actual file length // set a length past the actual file length
describe("read() up to the end of the real file");
assumeNoPrefetching();
final int extra = 10; final int extra = 10;
int longLen = fileLength + extra; int longLen = fileLength + extra;
try (FSDataInputStream in = openFile(longLen, try (FSDataInputStream in = openFile(longLen,
FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
for (int i = 0; i < fileLength; i++) { for (int i = 0; i < fileLength; i++) {
Assertions.assertThat(in.read()) Assertions.assertThat(in.read())
.describedAs("read() at %d", i) .describedAs("read() at %d from stream %s", i, in)
.isEqualTo(TEXT.charAt(i)); .isEqualTo(TEXT.charAt(i));
} }
LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
} }
// now open and read after the EOF; this is // now open and read after the EOF; this is
@ -323,10 +339,12 @@ public void testReadPastEOF() throws Throwable {
.describedAs("read() at %d", p) .describedAs("read() at %d", p)
.isEqualTo(-1); .isEqualTo(-1);
} }
LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
return in.toString(); return in.toString();
} }
}, },
with(Statistic.ACTION_HTTP_GET_REQUEST, extra)); always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra));
} }
/** /**
@ -353,10 +371,12 @@ public void testPositionedReadableReadFullyPastEOF() throws Throwable {
return in; return in;
}); });
assertS3StreamClosed(in); assertS3StreamClosed(in);
return "readFully past EOF"; return "readFully past EOF with statistics"
+ ioStatisticsToPrettyString(in.getIOStatistics());
} }
}, },
with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
} }
/** /**
@ -370,6 +390,7 @@ public void testPositionedReadableReadPastEOF() throws Throwable {
int longLen = fileLength + extra; int longLen = fileLength + extra;
describe("PositionedReadable.read() past the end of the file"); describe("PositionedReadable.read() past the end of the file");
assumeNoPrefetching();
verifyMetrics(() -> { verifyMetrics(() -> {
try (FSDataInputStream in = try (FSDataInputStream in =
@ -388,10 +409,11 @@ public void testPositionedReadableReadPastEOF() throws Throwable {
// stream is closed as part of this failure // stream is closed as part of this failure
assertS3StreamClosed(in); assertS3StreamClosed(in);
return "PositionedReadable.read()) past EOF"; return "PositionedReadable.read()) past EOF with " + in;
} }
}, },
with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
} }
/** /**
@ -405,7 +427,8 @@ public void testVectorReadPastEOF() throws Throwable {
final int extra = 10; final int extra = 10;
int longLen = fileLength + extra; int longLen = fileLength + extra;
describe("Vector read past the end of the file"); describe("Vector read past the end of the file, expecting an EOFException");
verifyMetrics(() -> { verifyMetrics(() -> {
try (FSDataInputStream in = try (FSDataInputStream in =
openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
@ -420,10 +443,29 @@ public void testVectorReadPastEOF() throws Throwable {
TimeUnit.SECONDS, TimeUnit.SECONDS,
range.getData()); range.getData());
assertS3StreamClosed(in); assertS3StreamClosed(in);
return "vector read past EOF"; return "vector read past EOF with " + in;
} }
}, },
with(Statistic.ACTION_HTTP_GET_REQUEST, 1)); always(),
probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1));
}
/**
* Probe the FS for supporting prefetching.
* @return true if the fs has prefetching enabled.
*/
private boolean prefetching() {
return getFileSystem().getConf().getBoolean(
PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
}
/**
* Skip the test if prefetching is enabled.
*/
private void assumeNoPrefetching(){
if (prefetching) {
skip("Prefetching is enabled");
}
} }
/** /**
@ -431,20 +473,26 @@ public void testVectorReadPastEOF() throws Throwable {
* @param in input stream * @param in input stream
*/ */
private static void assertS3StreamClosed(final FSDataInputStream in) { private static void assertS3StreamClosed(final FSDataInputStream in) {
S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); final InputStream wrapped = in.getWrappedStream();
if (wrapped instanceof S3AInputStream) {
S3AInputStream s3ain = (S3AInputStream) wrapped;
Assertions.assertThat(s3ain.isObjectStreamOpen()) Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is open") .describedAs("stream is open: %s", s3ain)
.isFalse(); .isFalse();
} }
}
/** /**
* Assert that the inner S3 Stream is open. * Assert that the inner S3 Stream is closed.
* @param in input stream * @param in input stream
*/ */
private static void assertS3StreamOpen(final FSDataInputStream in) { private static void assertS3StreamOpen(final FSDataInputStream in) {
S3AInputStream s3ain = (S3AInputStream) in.getWrappedStream(); final InputStream wrapped = in.getWrappedStream();
if (wrapped instanceof S3AInputStream) {
S3AInputStream s3ain = (S3AInputStream) wrapped;
Assertions.assertThat(s3ain.isObjectStreamOpen()) Assertions.assertThat(s3ain.isObjectStreamOpen())
.describedAs("stream is closed") .describedAs("stream is closed: %s", s3ain)
.isTrue(); .isTrue();
} }
}
} }