From dee9e97075e67f53d033df522372064ca19d6b51 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 14 Oct 2019 16:56:50 +0100 Subject: [PATCH] Revert "HADOOP-15870. S3AInputStream.remainingInFile should use nextReadPos." This reverts commit 7a4b3d42c4e36e468c2a46fd48036a6fed547853. The patch broke TestRouterWebHDFSContractSeek as it turns out that WebHDFSInputStream.available() is always 0. --- .../markdown/filesystem/fsdatainputstream.md | 53 ------------- .../fs/contract/AbstractContractSeekTest.java | 75 ++----------------- .../apache/hadoop/fs/s3a/S3AInputStream.java | 29 +++---- .../fs/contract/s3a/ITestS3AContractSeek.java | 2 +- 4 files changed, 16 insertions(+), 143 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index b8f9e87e66..090696483b 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -119,59 +119,6 @@ Return the data at the current position. else result = -1 -### `InputStream.available()` - -Returns the number of bytes "estimated" to be readable on a stream before `read()` -blocks on any IO (i.e. the thread is potentially suspended for some time). - -That is: for all values `v` returned by `available()`, `read(buffer, 0, v)` -is should not block. - -#### Postconditions - -```python -if len(data) == 0: - result = 0 - -elif pos >= len(data): - result = 0 - -else: - d = "the amount of data known to be already buffered/cached locally" - result = min(1, d) # optional but recommended: see below. -``` - -As `0` is a number which is always meets this condition, it is nominally -possible for an implementation to simply return `0`. However, this is not -considered useful, and some applications/libraries expect a positive number. - -#### The GZip problem. - -[JDK-7036144](http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144), -"GZIPInputStream readTrailer uses faulty available() test for end-of-stream" -discusses how the JDK's GZip code it uses `available()` to detect an EOF, -in a loop similar to the the following - -```java -while(instream.available()) { - process(instream.read()); -} -``` - -The correct loop would have been: - -```java -int r; -while((r=instream.read()) >= 0) { - process(r); -} -``` - -If `available()` ever returns 0, then the gzip loop halts prematurely. - -For this reason, implementations *should* return a value >=1, even -if it breaks that requirement of `available()` returning the amount guaranteed -not to block on reads. ### `InputStream.read(buffer[], offset, length)` diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java index db3691611b..ca8e4a053b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java @@ -32,7 +32,6 @@ import java.io.IOException; import java.util.Random; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; @@ -100,18 +99,14 @@ public void testSeekZeroByteFile() throws Throwable { describe("seek and read a 0 byte file"); instream = getFileSystem().open(zeroByteFile); assertEquals(0, instream.getPos()); - assertAvailableIsZero(instream); //expect initial read to fai; int result = instream.read(); assertMinusOne("initial byte read", result); - assertAvailableIsZero(instream); byte[] buffer = new byte[1]; //expect that seek to 0 works instream.seek(0); - assertAvailableIsZero(instream); //reread, expect same exception result = instream.read(); - assertAvailableIsZero(instream); assertMinusOne("post-seek byte read", result); result = instream.read(buffer, 0, 1); assertMinusOne("post-seek buffer read", result); @@ -137,8 +132,8 @@ public void testBlockReadZeroByteFile() throws Throwable { @Test public void testSeekReadClosedFile() throws Throwable { instream = getFileSystem().open(smallSeekFile); - getLogger().debug("Stream is of type {}", - instream.getClass().getCanonicalName()); + getLogger().debug( + "Stream is of type " + instream.getClass().getCanonicalName()); instream.close(); try { instream.seek(0); @@ -173,26 +168,10 @@ public void testSeekReadClosedFile() throws Throwable { try { long offset = instream.getPos(); } catch (IOException e) { - // it is valid to raise error here; but the test is applied to make + // its valid to raise error here; but the test is applied to make // sure there's no other exception like an NPE. } - // a closed stream should either fail or return 0 bytes. - try { - int a = instream.available(); - LOG.info("available() returns a value on a closed file: {}", a); - assertAvailableIsZero(instream); - } catch (IOException | IllegalStateException expected) { - // expected - } - // a closed stream should either fail or return 0 bytes. - try { - int a = instream.available(); - LOG.info("available() returns a value on a closed file: {}", a); - assertAvailableIsZero(instream); - } catch (IOException | IllegalStateException expected) { - // expected - } //and close again instream.close(); } @@ -226,7 +205,6 @@ public void testSeekFile() throws Throwable { //expect that seek to 0 works instream.seek(0); int result = instream.read(); - assertAvailableIsPositive(instream); assertEquals(0, result); assertEquals(1, instream.read()); assertEquals(2, instream.getPos()); @@ -248,24 +226,13 @@ public void testSeekAndReadPastEndOfFile() throws Throwable { //go just before the end instream.seek(TEST_FILE_LEN - 2); assertTrue("Premature EOF", instream.read() != -1); - assertAvailableIsPositive(instream); assertTrue("Premature EOF", instream.read() != -1); - checkAvailabilityAtEOF(); assertMinusOne("read past end of file", instream.read()); } - /** - * This can be overridden if a filesystem always returns 01 - * @throws IOException - */ - protected void checkAvailabilityAtEOF() throws IOException { - assertAvailableIsZero(instream); - } - @Test public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable { - describe("do a seek past the EOF, " + - "then verify the stream recovers"); + describe("do a seek past the EOF, then verify the stream recovers"); instream = getFileSystem().open(smallSeekFile); //go just before the end. This may or may not fail; it may be delayed until the //read @@ -294,7 +261,6 @@ public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable { //now go back and try to read from a valid point in the file instream.seek(1); assertTrue("Premature EOF", instream.read() != -1); - assertAvailableIsPositive(instream); } /** @@ -312,7 +278,6 @@ public void testSeekBigFile() throws Throwable { //expect that seek to 0 works instream.seek(0); int result = instream.read(); - assertAvailableIsPositive(instream); assertEquals(0, result); assertEquals(1, instream.read()); assertEquals(2, instream.read()); @@ -331,7 +296,6 @@ public void testSeekBigFile() throws Throwable { instream.seek(0); assertEquals(0, instream.getPos()); instream.read(); - assertAvailableIsPositive(instream); assertEquals(1, instream.getPos()); byte[] buf = new byte[80 * 1024]; instream.readFully(1, buf, 0, buf.length); @@ -350,7 +314,7 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable { instream.seek(39999); assertTrue(-1 != instream.read()); assertEquals(40000, instream.getPos()); - assertAvailableIsPositive(instream); + int v = 256; byte[] readBuffer = new byte[v]; assertEquals(v, instream.read(128, readBuffer, 0, v)); @@ -358,7 +322,6 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable { assertEquals(40000, instream.getPos()); //content is the same too assertEquals("@40000", block[40000], (byte) instream.read()); - assertAvailableIsPositive(instream); //now verify the picked up data for (int i = 0; i < 256; i++) { assertEquals("@" + i, block[i + 128], readBuffer[i]); @@ -413,7 +376,6 @@ public void testReadFullyZeroByteFile() throws Throwable { assertEquals(0, instream.getPos()); byte[] buffer = new byte[1]; instream.readFully(0, buffer, 0, 0); - assertAvailableIsZero(instream); assertEquals(0, instream.getPos()); // seek to 0 read 0 bytes from it instream.seek(0); @@ -589,9 +551,7 @@ public void testReadSmallFile() throws Throwable { fail("Expected an exception, got " + r); } catch (EOFException e) { handleExpectedException(e); - } catch (IOException - | IllegalArgumentException - | IndexOutOfBoundsException e) { + } catch (IOException | IllegalArgumentException | IndexOutOfBoundsException e) { handleRelaxedException("read() with a negative position ", "EOFException", e); @@ -627,29 +587,6 @@ public void testReadAtExactEOF() throws Throwable { instream = getFileSystem().open(smallSeekFile); instream.seek(TEST_FILE_LEN -1); assertTrue("read at last byte", instream.read() > 0); - assertAvailableIsZero(instream); assertEquals("read just past EOF", -1, instream.read()); } - - /** - * Assert that the number of bytes available is zero. - * @param in input stream - */ - protected static void assertAvailableIsZero(FSDataInputStream in) - throws IOException { - assertEquals("stream.available() should be zero", - 0, in.available()); - } - - /** - * Assert that the number of bytes available is greater than zero. - * @param in input stream - */ - protected static void assertAvailableIsPositive(FSDataInputStream in) - throws IOException { - int available = in.available(); - assertTrue("stream.available() should be positive but is " - + available, - available > 0); - } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 8aac868853..c92a85ea57 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -218,7 +218,7 @@ private synchronized void reopen(String reason, long targetPos, long length, } @Override - public synchronized long getPos() { + public synchronized long getPos() throws IOException { return (nextReadPos < 0) ? 0 : nextReadPos; } @@ -620,26 +620,15 @@ public synchronized boolean resetConnection() throws IOException { return isObjectStreamOpen(); } - /** - * Return the number of bytes available. - * If the inner stream is closed, the value is 1 for consistency - * with S3ObjectStream -and so address the GZip bug - * http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144 . - * If the stream is open, then it is the amount returned by the - * wrapped stream. - * @return a value greater than or equal to zero. - * @throws IOException IO failure. - */ @Override public synchronized int available() throws IOException { checkNotClosed(); - if (contentLength == 0 || (nextReadPos >= contentLength)) { - return 0; - } - return wrappedStream == null - ? 1 - : wrappedStream.available(); + long remaining = remainingInFile(); + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int)remaining; } /** @@ -648,8 +637,8 @@ public synchronized int available() throws IOException { */ @InterfaceAudience.Private @InterfaceStability.Unstable - public synchronized long remainingInFile() throws IOException { - return contentLength - getPos(); + public synchronized long remainingInFile() { + return this.contentLength - this.pos; } /** @@ -660,7 +649,7 @@ public synchronized long remainingInFile() throws IOException { @InterfaceAudience.Private @InterfaceStability.Unstable public synchronized long remainingInCurrentRequest() { - return contentRangeFinish - getPos(); + return this.contentRangeFinish - this.pos; } @InterfaceAudience.Private diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java index 3513d0179c..9332621d11 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractSeek.java @@ -80,7 +80,7 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest { * which S3A Supports. * @return a list of seek policies to test. */ - @Parameterized.Parameters(name = "{0}-{1}") + @Parameterized.Parameters public static Collection params() { return Arrays.asList(new Object[][]{ {INPUT_FADV_SEQUENTIAL, Default_JSSE},