HADOOP-17528. SFTP File System: close the connection pool when closing a FileSystem (#2701)

Contributed by Mike Pryakhin.

Change-Id: I59ef67c38c313f30c5e000b2fe41fcf715cf3a4b
This commit is contained in:
Mike 2021-02-23 20:03:27 +03:00 committed by Steve Loughran
parent f12293fba2
commit 5ffcee8979
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
2 changed files with 41 additions and 1 deletions

View File

@ -24,6 +24,7 @@
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@ -50,6 +51,7 @@ public class SFTPFileSystem extends FileSystem {
private SFTPConnectionPool connectionPool;
private URI uri;
private final AtomicBoolean closed = new AtomicBoolean(false);
private static final int DEFAULT_SFTP_PORT = 22;
private static final int DEFAULT_MAX_CONNECTION = 5;
@ -83,6 +85,7 @@ public class SFTPFileSystem extends FileSystem {
"Destination path %s already exist, cannot rename!";
public static final String E_FAILED_GETHOME = "Failed to get home directory";
public static final String E_FAILED_DISCONNECT = "Failed to disconnect";
public static final String E_FS_CLOSED = "FileSystem is closed!";
/**
* Set configuration from UI.
@ -138,8 +141,9 @@ private void setConfigurationFromURI(URI uriInfo, Configuration conf)
* @throws IOException
*/
private ChannelSftp connect() throws IOException {
Configuration conf = getConf();
checkNotClosed();
Configuration conf = getConf();
String host = conf.get(FS_SFTP_HOST, null);
int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT);
String user = conf.get(FS_SFTP_USER_PREFIX + host, null);
@ -703,6 +707,31 @@ public FileStatus getFileStatus(Path f) throws IOException {
}
}
@Override
public void close() throws IOException {
if (closed.getAndSet(true)) {
return;
}
try {
super.close();
} finally {
if (connectionPool != null) {
connectionPool.shutdown();
}
}
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (closed.get()) {
throw new IOException(uri + ": " + E_FS_CLOSED);
}
}
@VisibleForTesting
SFTPConnectionPool getConnectionPool() {
return connectionPool;

View File

@ -374,4 +374,15 @@ public void testMkDirs() throws IOException {
assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(),
is(1));
}
@Test
public void testCloseFileSystemClosesConnectionPool() throws Exception {
SFTPFileSystem fs = (SFTPFileSystem) sftpFs;
fs.getHomeDirectory();
assertThat(fs.getConnectionPool().getLiveConnCount(), is(1));
fs.close();
assertThat(fs.getConnectionPool().getLiveConnCount(), is(0));
///making sure that re-entrant close calls are safe
fs.close();
}
}