diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java index ed33357b51..a91b50f2e9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPFileSystem.java @@ -19,7 +19,6 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.net.URLDecoder; @@ -516,20 +515,21 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException { disconnect(channel); throw new IOException(String.format(E_PATH_DIR, f)); } - InputStream is; try { // the path could be a symbolic link, so get the real path absolute = new Path("/", channel.realpath(absolute.toUri().getPath())); - - is = channel.get(absolute.toUri().getPath()); } catch (SftpException e) { throw new IOException(e); } - return new FSDataInputStream(new SFTPInputStream(is, statistics)){ + return new FSDataInputStream( + new SFTPInputStream(channel, absolute, statistics)){ @Override public void close() throws IOException { - super.close(); - disconnect(channel); + try { + super.close(); + } finally { + disconnect(channel); + } } }; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java index 7af299bd11..d0f9a8d088 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/sftp/SFTPInputStream.java @@ -15,62 +15,107 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.fs.sftp; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import com.jcraft.jsch.ChannelSftp; +import com.jcraft.jsch.SftpATTRS; +import com.jcraft.jsch.SftpException; + +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; /** SFTP FileSystem input stream. */ class SFTPInputStream extends FSInputStream { - public static final String E_SEEK_NOTSUPPORTED = "Seek not supported"; - public static final String E_NULL_INPUTSTREAM = "Null InputStream"; - public static final String E_STREAM_CLOSED = "Stream closed"; - + private final ChannelSftp channel; + private final Path path; private InputStream wrappedStream; private FileSystem.Statistics stats; private boolean closed; private long pos; + private long nextPos; + private long contentLength; - SFTPInputStream(InputStream stream, FileSystem.Statistics stats) { - - if (stream == null) { - throw new IllegalArgumentException(E_NULL_INPUTSTREAM); + SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats) + throws IOException { + try { + this.channel = channel; + this.path = path; + this.stats = stats; + this.wrappedStream = channel.get(path.toUri().getPath()); + SftpATTRS stat = channel.lstat(path.toString()); + this.contentLength = stat.getSize(); + } catch (SftpException e) { + throw new IOException(e); } - this.wrappedStream = stream; - this.stats = stats; - - this.pos = 0; - this.closed = false; } @Override - public void seek(long position) throws IOException { - throw new IOException(E_SEEK_NOTSUPPORTED); + public synchronized void seek(long position) throws IOException { + checkNotClosed(); + if (position < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); + } + nextPos = position; + } + + @Override + public synchronized int available() throws IOException { + checkNotClosed(); + long remaining = contentLength - nextPos; + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int) remaining; + } + + private void seekInternal() throws IOException { + if (pos == nextPos) { + return; + } + if (nextPos > pos) { + long skipped = wrappedStream.skip(nextPos - pos); + pos = pos + skipped; + } + if (nextPos < pos) { + wrappedStream.close(); + try { + wrappedStream = channel.get(path.toUri().getPath()); + pos = wrappedStream.skip(nextPos); + } catch (SftpException e) { + throw new IOException(e); + } + } } @Override public boolean seekToNewSource(long targetPos) throws IOException { - throw new IOException(E_SEEK_NOTSUPPORTED); + return false; } @Override - public long getPos() throws IOException { - return pos; + public synchronized long getPos() throws IOException { + return nextPos; } @Override public synchronized int read() throws IOException { - if (closed) { - throw new IOException(E_STREAM_CLOSED); + checkNotClosed(); + if (this.contentLength == 0 || (nextPos >= contentLength)) { + return -1; } - + seekInternal(); int byteRead = wrappedStream.read(); if (byteRead >= 0) { pos++; + nextPos++; } if (stats != null & byteRead >= 0) { stats.incrementBytesRead(1); @@ -78,23 +123,6 @@ public synchronized int read() throws IOException { return byteRead; } - public synchronized int read(byte[] buf, int off, int len) - throws IOException { - if (closed) { - throw new IOException(E_STREAM_CLOSED); - } - - int result = wrappedStream.read(buf, off, len); - if (result > 0) { - pos += result; - } - if (stats != null & result > 0) { - stats.incrementBytesRead(result); - } - - return result; - } - public synchronized void close() throws IOException { if (closed) { return; @@ -103,4 +131,12 @@ public synchronized void close() throws IOException { wrappedStream.close(); closed = true; } + + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException( + path.toUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED + ); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java index f09496a608..76d3116c3a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContract.java @@ -69,6 +69,14 @@ public void init() throws IOException { } + /** + * Any teardown logic can go here. + * @throws IOException IO problems + */ + public void teardown() throws IOException { + + } + /** * Add a configuration resource to this instance's configuration * @param resource resource reference @@ -113,7 +121,7 @@ public FileSystem getFileSystem(URI uri) throws IOException { public abstract FileSystem getTestFileSystem() throws IOException; /** - * Get the scheme of this FS + * Get the scheme of this FS. * @return the scheme this FS supports */ public abstract String getScheme(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java index 60373f6799..ac9de6d7bf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java @@ -213,6 +213,9 @@ public void teardown() throws Exception { Thread.currentThread().setName("teardown"); LOG.debug("== Teardown =="); deleteTestDirInTeardown(); + if (contract != null) { + contract.teardown(); + } LOG.debug("== Teardown complete =="); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java new file mode 100644 index 0000000000..f72a2aec86 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/SFTPContract.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.sftp; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.sftp.SFTPFileSystem; +import org.apache.sshd.common.NamedFactory; +import org.apache.sshd.server.SshServer; +import org.apache.sshd.server.auth.UserAuth; +import org.apache.sshd.server.auth.password.UserAuthPasswordFactory; +import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; +import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory; + +public class SFTPContract extends AbstractFSContract { + + private static final String CONTRACT_XML = "contract/sftp.xml"; + private static final URI TEST_URI = + URI.create("sftp://user:password@localhost"); + private final String testDataDir = + new FileSystemTestHelper().getTestRootDir(); + private final Configuration conf; + private SshServer sshd; + + public SFTPContract(Configuration conf) { + super(conf); + addConfResource(CONTRACT_XML); + this.conf = conf; + } + + @Override + public void init() throws IOException { + sshd = SshServer.setUpDefaultServer(); + // ask OS to assign a port + sshd.setPort(0); + sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider()); + + List> userAuthFactories = new ArrayList<>(); + userAuthFactories.add(new UserAuthPasswordFactory()); + + sshd.setUserAuthFactories(userAuthFactories); + sshd.setPasswordAuthenticator((username, password, session) -> + username.equals("user") && password.equals("password") + ); + + sshd.setSubsystemFactories( + Collections.singletonList(new SftpSubsystemFactory())); + + sshd.start(); + int port = sshd.getPort(); + + conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class); + conf.setInt("fs.sftp.host.port", port); + conf.setBoolean("fs.sftp.impl.disable.cache", true); + } + + @Override + public void teardown() throws IOException { + if (sshd != null) { + sshd.stop(); + } + } + + @Override + public FileSystem getTestFileSystem() throws IOException { + return FileSystem.get(TEST_URI, conf); + } + + @Override + public String getScheme() { + return "sftp"; + } + + @Override + public Path getTestPath() { + try { + FileSystem fs = FileSystem.get( + URI.create("sftp://user:password@localhost"), conf + ); + return fs.makeQualified(new Path(testDataDir)); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java new file mode 100644 index 0000000000..20f4116b98 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/sftp/TestSFTPContractSeek.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.contract.sftp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractSeekTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestSFTPContractSeek extends AbstractContractSeekTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new SFTPContract(conf); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml new file mode 100644 index 0000000000..20a24b7e54 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/sftp.xml @@ -0,0 +1,79 @@ + + + + + + + fs.contract.test.root-tests-enabled + false + + + + fs.contract.is-case-sensitive + true + + + + fs.contract.supports-append + false + + + + fs.contract.supports-atomic-directory-delete + true + + + + fs.contract.supports-atomic-rename + true + + + + fs.contract.supports-block-locality + false + + + + fs.contract.supports-concat + false + + + + fs.contract.supports-seek + true + + + + fs.contract.rejects-seek-past-eof + true + + + + fs.contract.supports-strict-exceptions + true + + + + fs.contract.supports-unix-permissions + false + + +