diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java index 43eb783339..ed33357b51 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; +import com.google.common.annotations.VisibleForTesting; import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.ChannelSftp.LsEntry; import com.jcraft.jsch.SftpATTRS; @@ -219,7 +220,7 @@ private FileStatus getFileStatus(ChannelSftp client, Path file) Path root = new Path("/"); return new FileStatus(length, isDir, blockReplication, blockSize, modTime, - root.makeQualified(this.getUri(), this.getWorkingDirectory())); + root.makeQualified(this.getUri(), this.getWorkingDirectory(client))); } String pathName = parentPath.toUri().getPath(); Vector sftpFiles; @@ -289,7 +290,7 @@ private FileStatus getFileStatus(ChannelSftp channel, LsEntry sftpFile, return new FileStatus(length, isDir, blockReplication, blockSize, modTime, accessTime, permission, user, group, filePath.makeQualified( - this.getUri(), this.getWorkingDirectory())); + this.getUri(), this.getWorkingDirectory(channel))); } /** @@ -524,10 +525,13 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { } catch (SftpException e) { throw new IOException(e); } - - FSDataInputStream fis = - new FSDataInputStream(new SFTPInputStream(is, channel, statistics)); - return fis; + return new FSDataInputStream(new SFTPInputStream(is, statistics)){ + @Override + public void close() throws IOException { + super.close(); + disconnect(channel); + } + }; } /** @@ -636,6 +640,16 @@ public Path getWorkingDirectory() { return getHomeDirectory(); } + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + private Path getWorkingDirectory(ChannelSftp client) { + // Return home directory always since we do not maintain state. + return getHomeDirectory(client); + } + @Override public Path getHomeDirectory() { ChannelSftp channel = null; @@ -654,6 +668,19 @@ public Path getHomeDirectory() { } } + /** + * Convenience method, so that we don't open a new connection when using this + * method from within another method. Otherwise every API invocation incurs + * the overhead of opening/closing a TCP connection. + */ + private Path getHomeDirectory(ChannelSftp channel) { + try { + return new Path(channel.pwd()); + } catch (Exception ioe) { + return null; + } + } + @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { ChannelSftp client = connect(); @@ -675,4 +702,9 @@ public FileStatus getFileStatus(Path f) throws IOException { disconnect(channel); } } + + @VisibleForTesting + SFTPConnectionPool getConnectionPool() { + return connectionPool; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java index ece2c1c980..7af299bd11 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java @@ -22,39 +22,25 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.util.StringUtils; - -import com.jcraft.jsch.ChannelSftp; -import com.jcraft.jsch.JSchException; -import com.jcraft.jsch.Session; /** SFTP FileSystem input stream. */ class SFTPInputStream extends FSInputStream { public static final String E_SEEK_NOTSUPPORTED = "Seek not supported"; - public static final String E_CLIENT_NULL = - "SFTP client null or not connected"; public static final String E_NULL_INPUTSTREAM = "Null InputStream"; public static final String E_STREAM_CLOSED = "Stream closed"; - public static final String E_CLIENT_NOTCONNECTED = "Client not connected"; private InputStream wrappedStream; - private ChannelSftp channel; private FileSystem.Statistics stats; private boolean closed; private long pos; - SFTPInputStream(InputStream stream, ChannelSftp channel, - FileSystem.Statistics stats) { + SFTPInputStream(InputStream stream, FileSystem.Statistics stats) { if (stream == null) { throw new IllegalArgumentException(E_NULL_INPUTSTREAM); } - if (channel == null || !channel.isConnected()) { - throw new IllegalArgumentException(E_CLIENT_NULL); - } this.wrappedStream = stream; - this.channel = channel; this.stats = stats; this.pos = 0; @@ -114,17 +100,7 @@ public synchronized void close() throws IOException { return; } super.close(); + wrappedStream.close(); closed = true; - if (!channel.isConnected()) { - throw new IOException(E_CLIENT_NOTCONNECTED); - } - - try { - Session session = channel.getSession(); - channel.disconnect(); - session.disconnect(); - } catch (JSchException e) { - throw new IOException(StringUtils.stringifyException(e)); - } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java index 96f6fb9f02..693926242c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/sftp/TestSFTPFileSystem.java @@ -34,25 +34,31 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.sshd.server.SshServer; +import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; import org.apache.sshd.common.NamedFactory; import org.apache.sshd.server.Command; -import org.apache.sshd.server.auth.password.PasswordAuthenticator; +import org.apache.sshd.server.SshServer; import org.apache.sshd.server.auth.UserAuth; +import org.apache.sshd.server.auth.password.PasswordAuthenticator; import org.apache.sshd.server.auth.password.UserAuthPasswordFactory; import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; import org.apache.sshd.server.session.ServerSession; - import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory; +import static org.hamcrest.core.Is.is; +import org.junit.After; import org.junit.AfterClass; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; -import static org.junit.Assert.*; - public class TestSFTPFileSystem { private static final String TEST_SFTP_DIR = "testsftp"; @@ -64,8 +70,9 @@ public class TestSFTPFileSystem { private static final String connection = "sftp://user:password@localhost"; private static Path localDir = null; private static FileSystem localFs = null; - private static FileSystem sftpFs = null; + private FileSystem sftpFs = null; private static SshServer sshd = null; + private static Configuration conf = null; private static int port; private static void startSshdServer() throws IOException { @@ -98,6 +105,22 @@ public boolean authenticate(String username, String password, port = sshd.getPort(); } + @Before + public void init() throws Exception { + sftpFs = FileSystem.get(URI.create(connection), conf); + } + + @After + public void cleanUp() throws Exception { + if (sftpFs != null) { + try { + sftpFs.close(); + } catch (IOException e) { + // ignore + } + } + } + @BeforeClass public static void setUp() throws Exception { // skip all tests if running on Windows @@ -105,7 +128,7 @@ public static void setUp() throws Exception { startSshdServer(); - Configuration conf = new Configuration(); + conf = new Configuration(); conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class); conf.setInt("fs.sftp.host.port", port); conf.setBoolean("fs.sftp.impl.disable.cache", true); @@ -116,8 +139,6 @@ public static void setUp() throws Exception { localFs.delete(localDir, true); } localFs.mkdirs(localDir); - - sftpFs = FileSystem.get(URI.create(connection), conf); } @AfterClass @@ -130,13 +151,6 @@ public static void tearDown() { // ignore } } - if (sftpFs != null) { - try { - sftpFs.close(); - } catch (IOException e) { - // ignore - } - } if (sshd != null) { try { sshd.stop(true); @@ -179,6 +193,8 @@ public void testCreateFile() throws Exception { assertTrue(localFs.exists(file)); assertTrue(sftpFs.delete(file, false)); assertFalse(localFs.exists(file)); + assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(), + is(1)); } /** @@ -194,6 +210,8 @@ public void testFileExists() throws Exception { assertTrue(sftpFs.delete(file, false)); assertFalse(sftpFs.exists(file)); assertFalse(localFs.exists(file)); + assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(), + is(1)); } /** @@ -217,6 +235,8 @@ public void testReadFile() throws Exception { } } assertTrue(sftpFs.delete(file, false)); + assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(), + is(1)); } /** @@ -238,6 +258,8 @@ public void testStatFile() throws Exception { assertEquals(data.length, sstat.getLen()); assertEquals(lstat.getLen(), sstat.getLen()); assertTrue(sftpFs.delete(file, false)); + assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(), + is(1)); } /** @@ -249,6 +271,8 @@ public void testStatFile() throws Exception { public void testDeleteNonEmptyDir() throws Exception { Path file = touch(localFs, name.getMethodName().toLowerCase()); sftpFs.delete(localDir, false); + assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(), + is(1)); } /** @@ -260,6 +284,8 @@ public void testDeleteNonEmptyDir() throws Exception { public void testDeleteNonExistFile() throws Exception { Path file = new Path(localDir, name.getMethodName().toLowerCase()); assertFalse(sftpFs.delete(file, false)); + assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(), + is(1)); } /** @@ -282,6 +308,8 @@ public void testRenameFile() throws Exception { assertFalse(localFs.exists(file1)); assertTrue(sftpFs.delete(file2, false)); + assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(), + is(1)); } /** @@ -319,6 +347,8 @@ public void testGetAccessTime() throws IOException { accessTime1 = (accessTime1 / 1000) * 1000; long accessTime2 = sftpFs.getFileStatus(file).getAccessTime(); assertEquals(accessTime1, accessTime2); + assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(), + is(1)); } @Test @@ -330,6 +360,18 @@ public void testGetModifyTime() throws IOException { modifyTime1 = (modifyTime1 / 1000) * 1000; long modifyTime2 = sftpFs.getFileStatus(file).getModificationTime(); assertEquals(modifyTime1, modifyTime2); + assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(), + is(1)); } + @Test + public void testMkDirs() throws IOException { + Path path = new Path(localDir.toUri().getPath(), + new Path(name.getMethodName(), "subdirectory")); + sftpFs.mkdirs(path); + assertTrue(localFs.exists(path)); + assertTrue(localFs.getFileStatus(path).isDirectory()); + assertThat(((SFTPFileSystem) sftpFs).getConnectionPool().getLiveConnCount(), + is(1)); + } }