diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 9431f1790d..f461c9e1ee 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -187,6 +187,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { private long readAhead; private S3AInputPolicy inputPolicy; private final AtomicBoolean closed = new AtomicBoolean(false); + private volatile boolean isClosed = false; private MetadataStore metadataStore; private boolean allowAuthoritative; @@ -678,7 +679,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { */ public FSDataInputStream open(Path f, int bufferSize) throws IOException { - + checkNotClosed(); LOG.debug("Opening '{}' for reading; input policy = {}", f, inputPolicy); final FileStatus fileStatus = getFileStatus(f); if (fileStatus.isDirectory()) { @@ -722,6 +723,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + checkNotClosed(); final Path path = qualify(f); String key = pathToKey(path); FileStatus status = null; @@ -871,7 +873,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { Path dst = qualify(dest); LOG.debug("Rename path {} to {}", src, dst); - incrementStatistic(INVOCATION_RENAME); + entryPoint(INVOCATION_RENAME); String srcKey = pathToKey(src); String dstKey = pathToKey(dst); @@ -1097,6 +1099,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { metadataStore = ms; } + /** + * Entry point to an operation. + * Increments the statistic; verifies the FS is active. + * @param operation The operation to increment + * @throws IOException if the + */ + protected void entryPoint(Statistic operation) throws IOException { + checkNotClosed(); + incrementStatistic(operation); + } + /** * Increment a statistic by 1. * @param statistic The operation to increment @@ -1660,6 +1673,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { @Retries.RetryTranslated public boolean delete(Path f, boolean recursive) throws IOException { try { + checkNotClosed(); return innerDelete(innerGetFileStatus(f, true), recursive); } catch (FileNotFoundException e) { LOG.debug("Couldn't delete {} - does not exist", f); @@ -1838,7 +1852,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { Path path = qualify(f); String key = pathToKey(path); LOG.debug("List status for path: {}", path); - incrementStatistic(INVOCATION_LIST_STATUS); + entryPoint(INVOCATION_LIST_STATUS); List result; final FileStatus fileStatus = getFileStatus(path); @@ -1981,7 +1995,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { throws IOException, FileAlreadyExistsException, AmazonClientException { Path f = qualify(p); LOG.debug("Making directory: {}", f); - incrementStatistic(INVOCATION_MKDIRS); + entryPoint(INVOCATION_MKDIRS); FileStatus fileStatus; List metadataStoreDirs = null; if (hasMetadataStore()) { @@ -2058,7 +2072,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { @Retries.RetryTranslated S3AFileStatus innerGetFileStatus(final Path f, boolean needEmptyDirectoryFlag) throws IOException { - incrementStatistic(INVOCATION_GET_FILE_STATUS); + entryPoint(INVOCATION_GET_FILE_STATUS); final Path path = qualify(f); String key = pathToKey(path); LOG.debug("Getting path status for {} ({})", path, key); @@ -2319,7 +2333,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException, FileAlreadyExistsException, AmazonClientException { - incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE); + entryPoint(INVOCATION_COPY_FROM_LOCAL_FILE); LOG.debug("Copying local file from {} to {}", src, dst); // Since we have a local file, we don't need to stream into a temporary file @@ -2418,6 +2432,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { // already closed return; } + isClosed = true; + LOG.debug("Filesystem {} is closed", uri); try { super.close(); } finally { @@ -2434,6 +2450,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { } } + /** + * Verify that the input stream is open. Non blocking; this gives + * the last state of the volatile {@link #closed} field. + * @throws IOException if the connection is closed. + */ + private void checkNotClosed() throws IOException { + if (isClosed) { + throw new IOException(uri + ": " + E_FS_CLOSED); + } + } + /** * Override getCanonicalServiceName because we don't support token in S3A. */ @@ -2860,7 +2887,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { */ @Override public FileStatus[] globStatus(Path pathPattern) throws IOException { - incrementStatistic(INVOCATION_GLOB_STATUS); + entryPoint(INVOCATION_GLOB_STATUS); return super.globStatus(pathPattern); } @@ -2871,7 +2898,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { @Override public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException { - incrementStatistic(INVOCATION_GLOB_STATUS); + entryPoint(INVOCATION_GLOB_STATUS); return super.globStatus(pathPattern, filter); } @@ -2881,7 +2908,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { */ @Override public boolean exists(Path f) throws IOException { - incrementStatistic(INVOCATION_EXISTS); + entryPoint(INVOCATION_EXISTS); return super.exists(f); } @@ -2892,7 +2919,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { @Override @SuppressWarnings("deprecation") public boolean isDirectory(Path f) throws IOException { - incrementStatistic(INVOCATION_IS_DIRECTORY); + entryPoint(INVOCATION_IS_DIRECTORY); return super.isDirectory(f); } @@ -2903,7 +2930,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { @Override @SuppressWarnings("deprecation") public boolean isFile(Path f) throws IOException { - incrementStatistic(INVOCATION_IS_FILE); + entryPoint(INVOCATION_IS_FILE); return super.isFile(f); } @@ -2948,7 +2975,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { private RemoteIterator innerListFiles(Path f, boolean recursive, Listing.FileStatusAcceptor acceptor) throws IOException { - incrementStatistic(INVOCATION_LIST_FILES); + entryPoint(INVOCATION_LIST_FILES); Path path = qualify(f); LOG.debug("listFiles({}, {})", path, recursive); try { @@ -3033,7 +3060,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { public RemoteIterator listLocatedStatus(final Path f, final PathFilter filter) throws FileNotFoundException, IOException { - incrementStatistic(INVOCATION_LIST_LOCATED_STATUS); + entryPoint(INVOCATION_LIST_LOCATED_STATUS); Path path = qualify(f); LOG.debug("listLocatedStatus({}, {}", path, filter); try { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 245721796c..6d6673984b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -89,6 +89,9 @@ public final class S3AUtils { "is abstract and therefore cannot be created"; static final String ENDPOINT_KEY = "Endpoint"; + /** Filesystem is closed; kept here to keep the errors close. */ + static final String E_FS_CLOSED = "FileSystem is closed!"; + /** * Core property for provider path. Duplicated here for consistent * code across Hadoop version: {@value}. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java new file mode 100644 index 0000000000..6e81452e26 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.test.LambdaTestUtils.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.E_FS_CLOSED; + +/** + * Tests of the S3A FileSystem which is closed; just make sure + * that that basic file Ops fail meaningfully. + */ +public class ITestS3AClosedFS extends AbstractS3ATestBase { + + private Path root = new Path("/"); + + @Override + public void setup() throws Exception { + super.setup(); + root = getFileSystem().makeQualified(new Path("/")); + getFileSystem().close(); + } + + @Override + public void teardown() { + // no op, as the FS is closed + } + + @Test + public void testClosedGetFileStatus() throws Exception { + intercept(IOException.class, E_FS_CLOSED, + () -> getFileSystem().getFileStatus(root)); + } + + @Test + public void testClosedListStatus() throws Exception { + intercept(IOException.class, E_FS_CLOSED, + () -> getFileSystem().listStatus(root)); + } + + @Test + public void testClosedListFile() throws Exception { + intercept(IOException.class, E_FS_CLOSED, + () -> getFileSystem().listFiles(root, false)); + } + + @Test + public void testClosedListLocatedStatus() throws Exception { + intercept(IOException.class, E_FS_CLOSED, + () -> getFileSystem().listLocatedStatus(root)); + } + + @Test + public void testClosedCreate() throws Exception { + intercept(IOException.class, E_FS_CLOSED, + () -> getFileSystem().create(path("to-create")).close()); + } + + @Test + public void testClosedDelete() throws Exception { + intercept(IOException.class, E_FS_CLOSED, + () -> getFileSystem().delete(path("to-delete"), false)); + } + + @Test + public void testClosedOpen() throws Exception { + intercept(IOException.class, E_FS_CLOSED, + () -> getFileSystem().open(path("to-open"))); + } + +}