diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index e067b078b3..090696483b 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -275,6 +275,43 @@ class, which can react to a checksum error in a read by attempting to source the data elsewhere. If a new source can be found it attempts to reread and recheck that portion of the file. +### `CanUnbuffer.unbuffer()` + +This operation instructs the source to release any system resources they are +currently holding on to, such as buffers, sockets, file descriptors, etc. Any +subsequent IO operation will likely have to reacquire these resources. +Unbuffering is useful in situation where streams need to remain open, but no IO +operation is expected from the stream in the immediate future (examples include +file handle cacheing). + +#### Preconditions + +Not all subclasses implement this operation. In addition to implementing +`CanUnbuffer`. Subclasses must implement the `StreamCapabilities` interface and +`StreamCapabilities.hasCapability(UNBUFFER)` must return true. If a subclass +implements `CanUnbuffer` but does not report the functionality via +`StreamCapabilities` then the call to `unbuffer` does nothing. If a subclass +reports that it does implement `UNBUFFER`, but does not implement the +`CanUnbuffer` interface, an `UnsupportedOperationException` is thrown. + + supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer) + +This method is not thread-safe. If `unbuffer` is called while a `read` is in +progress, the outcome is undefined. + +`unbuffer` can be called on a closed file, in which case `unbuffer` will do +nothing. + +#### Postconditions + +The majority of subclasses that do not implement this operation simply +do nothing. + +If the operation is supported, `unbuffer` releases any and all system resources +associated with the stream. The exact list of what these resources are is +generally implementation dependent, however, in general, it may include +buffers, sockets, file descriptors, etc. + ## interface `PositionedReadable` The `PositionedReadable` operations supply "positioned reads" ("pread"). diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java new file mode 100644 index 0000000000..7ba32bafa5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; + +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; + +/** + * Contract tests for {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer}. + */ +public abstract class AbstractContractUnbufferTest extends AbstractFSContractTestBase { + + private Path file; + + @Override + public void setup() throws Exception { + super.setup(); + skipIfUnsupported(SUPPORTS_UNBUFFER); + file = path("unbufferFile"); + createFile(getFileSystem(), file, true, + dataset(TEST_FILE_LEN, 0, 255)); + } + + @Test + public void testUnbufferAfterRead() throws IOException { + describe("unbuffer a file after a single read"); + try (FSDataInputStream stream = getFileSystem().open(file)) { + assertEquals(128, stream.read(new byte[128])); + unbuffer(stream); + } + } + + @Test + public void testUnbufferBeforeRead() throws IOException { + describe("unbuffer a file before a read"); + try (FSDataInputStream stream = getFileSystem().open(file)) { + unbuffer(stream); + assertEquals(128, stream.read(new byte[128])); + } + } + + @Test + public void testUnbufferEmptyFile() throws IOException { + Path emptyFile = path("emptyUnbufferFile"); + createFile(getFileSystem(), emptyFile, true, + dataset(TEST_FILE_LEN, 0, 255)); + describe("unbuffer an empty file"); + try (FSDataInputStream stream = getFileSystem().open(emptyFile)) { + unbuffer(stream); + } + } + + @Test + public void testUnbufferOnClosedFile() throws IOException { + describe("unbuffer a file before a read"); + FSDataInputStream stream = null; + try { + stream = getFileSystem().open(file); + assertEquals(128, stream.read(new byte[128])); + } finally { + if (stream != null) { + stream.close(); + } + } + unbuffer(stream); + } + + @Test + public void testMultipleUnbuffers() throws IOException { + describe("unbuffer a file multiple times"); + try (FSDataInputStream stream = getFileSystem().open(file)) { + unbuffer(stream); + unbuffer(stream); + assertEquals(128, stream.read(new byte[128])); + unbuffer(stream); + unbuffer(stream); + } + } + + @Test + public void testUnbufferMultipleReads() throws IOException { + describe("unbuffer a file multiple times"); + try (FSDataInputStream stream = getFileSystem().open(file)) { + unbuffer(stream); + assertEquals(128, stream.read(new byte[128])); + unbuffer(stream); + assertEquals(128, stream.read(new byte[128])); + assertEquals(128, stream.read(new byte[128])); + unbuffer(stream); + assertEquals(128, stream.read(new byte[128])); + assertEquals(128, stream.read(new byte[128])); + assertEquals(128, stream.read(new byte[128])); + unbuffer(stream); + } + } + + private void unbuffer(FSDataInputStream stream) throws IOException { + long pos = stream.getPos(); + stream.unbuffer(); + assertEquals(pos, stream.getPos()); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java index cca3d4ca36..91a112141e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java @@ -201,6 +201,11 @@ public interface ContractOptions { */ String SUPPORTS_CONTENT_CHECK = "supports-content-check"; + /** + * Indicates that FS supports unbuffer. + */ + String SUPPORTS_UNBUFFER = "supports-unbuffer"; + /** * Maximum path length * {@value} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractUnbuffer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractUnbuffer.java new file mode 100644 index 0000000000..54b8bf1c70 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractUnbuffer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +public class TestHDFSContractUnbuffer extends AbstractContractUnbufferTest { + + @BeforeClass + public static void createCluster() throws IOException { + HDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + HDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new HDFSContract(conf); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml index 261d4ba136..3c9fcccc73 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/contract/hdfs.xml @@ -111,4 +111,9 @@ true + + fs.contract.supports-unbuffer + true + + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index d096601307..cbe796bdcd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -25,14 +25,17 @@ import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.SSECustomerKey; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.CanSetReadahead; +import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.slf4j.Logger; @@ -43,6 +46,7 @@ import java.net.SocketTimeoutException; import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.util.StringUtils.toLowerCase; /** * The input stream for an S3A object. @@ -63,7 +67,8 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class S3AInputStream extends FSInputStream implements CanSetReadahead { +public class S3AInputStream extends FSInputStream implements CanSetReadahead, + CanUnbuffer, StreamCapabilities { public static final String E_NEGATIVE_READAHEAD_VALUE = "Negative readahead value"; @@ -175,7 +180,7 @@ private void setInputPolicy(S3AInputPolicy inputPolicy) { private synchronized void reopen(String reason, long targetPos, long length, boolean forceAbort) throws IOException { - if (wrappedStream != null) { + if (isObjectStreamOpen()) { closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort); } @@ -542,7 +547,7 @@ public synchronized void close() throws IOException { */ @Retries.OnceRaw private void closeStream(String reason, long length, boolean forceAbort) { - if (wrappedStream != null) { + if (isObjectStreamOpen()) { // if the amount of data remaining in the current request is greater // than the readahead value: abort. @@ -605,12 +610,11 @@ private void closeStream(String reason, long length, boolean forceAbort) { @InterfaceStability.Unstable public synchronized boolean resetConnection() throws IOException { checkNotClosed(); - boolean connectionOpen = wrappedStream != null; - if (connectionOpen) { + if (isObjectStreamOpen()) { LOG.info("Forced reset of connection to {}", uri); closeStream("reset()", contentRangeFinish, true); } - return connectionOpen; + return isObjectStreamOpen(); } @Override @@ -677,7 +681,7 @@ public String toString() { "S3AInputStream{"); sb.append(uri); sb.append(" wrappedStream=") - .append(wrappedStream != null ? "open" : "closed"); + .append(isObjectStreamOpen() ? "open" : "closed"); sb.append(" read policy=").append(inputPolicy); sb.append(" pos=").append(pos); sb.append(" nextReadPos=").append(nextReadPos); @@ -814,4 +818,25 @@ public static long validateReadahead(@Nullable Long readahead) { return readahead; } } + + @Override + public synchronized void unbuffer() { + closeStream("unbuffer()", contentRangeFinish, false); + } + + @Override + public boolean hasCapability(String capability) { + switch (toLowerCase(capability)) { + case StreamCapabilities.READAHEAD: + case StreamCapabilities.UNBUFFER: + return true; + default: + return false; + } + } + + @VisibleForTesting + boolean isObjectStreamOpen() { + return wrappedStream != null; + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java new file mode 100644 index 0000000000..d6dbce9233 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractUnbuffer.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard; + +public class ITestS3AContractUnbuffer extends AbstractContractUnbufferTest { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + // patch in S3Guard options + maybeEnableS3Guard(conf); + return conf; + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java new file mode 100644 index 0000000000..b04b9da486 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +import org.junit.Test; + +import java.io.IOException; + +/** + * Integration test for calling + * {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} on {@link S3AInputStream}. + * Validates that the object has been closed using the + * {@link S3AInputStream#isObjectStreamOpen()} method. Unlike the + * {@link org.apache.hadoop.fs.contract.s3a.ITestS3AContractUnbuffer} tests, + * these tests leverage the fact that isObjectStreamOpen exposes if the + * underlying stream has been closed or not. + */ +public class ITestS3AUnbuffer extends AbstractS3ATestBase { + + @Test + public void testUnbuffer() throws IOException { + // Setup test file + Path dest = path("testUnbuffer"); + describe("testUnbuffer"); + try (FSDataOutputStream outputStream = getFileSystem().create(dest, true)) { + byte[] data = ContractTestUtils.dataset(16, 'a', 26); + outputStream.write(data); + } + + // Open file, read half the data, and then call unbuffer + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream); + assertEquals(8, inputStream.read(new byte[8])); + assertTrue(isObjectStreamOpen(inputStream)); + inputStream.unbuffer(); + + // Check the the wrapped stream is closed + assertFalse(isObjectStreamOpen(inputStream)); + } + } + + private boolean isObjectStreamOpen(FSDataInputStream inputStream) { + return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java new file mode 100644 index 0000000000..c858c9933f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AUnbuffer.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Date; + +import static org.junit.Assert.assertEquals; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Uses mocks to check that the {@link S3ObjectInputStream} is closed when + * {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} is called. Unlike the + * other unbuffer tests, this specifically tests that the underlying S3 object + * stream is closed. + */ +public class TestS3AUnbuffer extends AbstractS3AMockTest { + + @Test + public void testUnbuffer() throws IOException { + // Create mock ObjectMetadata for getFileStatus() + Path path = new Path("/file"); + ObjectMetadata meta = mock(ObjectMetadata.class); + when(meta.getContentLength()).thenReturn(1L); + when(meta.getLastModified()).thenReturn(new Date(2L)); + when(meta.getETag()).thenReturn("mock-etag"); + when(s3.getObjectMetadata(any())).thenReturn(meta); + + // Create mock S3ObjectInputStream and S3Object for open() + S3ObjectInputStream objectStream = mock(S3ObjectInputStream.class); + when(objectStream.read()).thenReturn(-1); + + S3Object s3Object = mock(S3Object.class); + when(s3Object.getObjectContent()).thenReturn(objectStream); + when(s3Object.getObjectMetadata()).thenReturn(meta); + when(s3.getObject(any())).thenReturn(s3Object); + + // Call read and then unbuffer + FSDataInputStream stream = fs.open(path); + assertEquals(0, stream.read(new byte[8])); // mocks read 0 bytes + stream.unbuffer(); + + // Verify that unbuffer closed the object stream + verify(objectStream, times(1)).close(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml index ec4c54ae39..0e12897266 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml +++ b/hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml @@ -122,4 +122,9 @@ false + + fs.contract.supports-unbuffer + true + +