HADOOP-14566. Add seek support for SFTP FileSystem. (#1999)

Contributed by Mikhail Pryakhin
This commit is contained in:
Mike 2020-06-03 13:37:40 +03:00 committed by GitHub
parent 9c290c08db
commit 97c98ce531
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 314 additions and 46 deletions

View File

@ -19,7 +19,6 @@
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.net.URLDecoder; import java.net.URLDecoder;
@ -516,20 +515,21 @@ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
disconnect(channel); disconnect(channel);
throw new IOException(String.format(E_PATH_DIR, f)); throw new IOException(String.format(E_PATH_DIR, f));
} }
InputStream is;
try { try {
// the path could be a symbolic link, so get the real path // the path could be a symbolic link, so get the real path
absolute = new Path("/", channel.realpath(absolute.toUri().getPath())); absolute = new Path("/", channel.realpath(absolute.toUri().getPath()));
is = channel.get(absolute.toUri().getPath());
} catch (SftpException e) { } catch (SftpException e) {
throw new IOException(e); throw new IOException(e);
} }
return new FSDataInputStream(new SFTPInputStream(is, statistics)){ return new FSDataInputStream(
new SFTPInputStream(channel, absolute, statistics)){
@Override @Override
public void close() throws IOException { public void close() throws IOException {
super.close(); try {
disconnect(channel); super.close();
} finally {
disconnect(channel);
}
} }
}; };
} }

View File

@ -15,62 +15,107 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.fs.sftp; package org.apache.hadoop.fs.sftp;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.SftpATTRS;
import com.jcraft.jsch.SftpException;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/** SFTP FileSystem input stream. */ /** SFTP FileSystem input stream. */
class SFTPInputStream extends FSInputStream { class SFTPInputStream extends FSInputStream {
public static final String E_SEEK_NOTSUPPORTED = "Seek not supported"; private final ChannelSftp channel;
public static final String E_NULL_INPUTSTREAM = "Null InputStream"; private final Path path;
public static final String E_STREAM_CLOSED = "Stream closed";
private InputStream wrappedStream; private InputStream wrappedStream;
private FileSystem.Statistics stats; private FileSystem.Statistics stats;
private boolean closed; private boolean closed;
private long pos; private long pos;
private long nextPos;
private long contentLength;
SFTPInputStream(InputStream stream, FileSystem.Statistics stats) { SFTPInputStream(ChannelSftp channel, Path path, FileSystem.Statistics stats)
throws IOException {
if (stream == null) { try {
throw new IllegalArgumentException(E_NULL_INPUTSTREAM); this.channel = channel;
this.path = path;
this.stats = stats;
this.wrappedStream = channel.get(path.toUri().getPath());
SftpATTRS stat = channel.lstat(path.toString());
this.contentLength = stat.getSize();
} catch (SftpException e) {
throw new IOException(e);
} }
this.wrappedStream = stream;
this.stats = stats;
this.pos = 0;
this.closed = false;
} }
@Override @Override
public void seek(long position) throws IOException { public synchronized void seek(long position) throws IOException {
throw new IOException(E_SEEK_NOTSUPPORTED); checkNotClosed();
if (position < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
nextPos = position;
}
@Override
public synchronized int available() throws IOException {
checkNotClosed();
long remaining = contentLength - nextPos;
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int) remaining;
}
private void seekInternal() throws IOException {
if (pos == nextPos) {
return;
}
if (nextPos > pos) {
long skipped = wrappedStream.skip(nextPos - pos);
pos = pos + skipped;
}
if (nextPos < pos) {
wrappedStream.close();
try {
wrappedStream = channel.get(path.toUri().getPath());
pos = wrappedStream.skip(nextPos);
} catch (SftpException e) {
throw new IOException(e);
}
}
} }
@Override @Override
public boolean seekToNewSource(long targetPos) throws IOException { public boolean seekToNewSource(long targetPos) throws IOException {
throw new IOException(E_SEEK_NOTSUPPORTED); return false;
} }
@Override @Override
public long getPos() throws IOException { public synchronized long getPos() throws IOException {
return pos; return nextPos;
} }
@Override @Override
public synchronized int read() throws IOException { public synchronized int read() throws IOException {
if (closed) { checkNotClosed();
throw new IOException(E_STREAM_CLOSED); if (this.contentLength == 0 || (nextPos >= contentLength)) {
return -1;
} }
seekInternal();
int byteRead = wrappedStream.read(); int byteRead = wrappedStream.read();
if (byteRead >= 0) { if (byteRead >= 0) {
pos++; pos++;
nextPos++;
} }
if (stats != null & byteRead >= 0) { if (stats != null & byteRead >= 0) {
stats.incrementBytesRead(1); stats.incrementBytesRead(1);
@ -78,23 +123,6 @@ public synchronized int read() throws IOException {
return byteRead; return byteRead;
} }
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
if (closed) {
throw new IOException(E_STREAM_CLOSED);
}
int result = wrappedStream.read(buf, off, len);
if (result > 0) {
pos += result;
}
if (stats != null & result > 0) {
stats.incrementBytesRead(result);
}
return result;
}
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
if (closed) { if (closed) {
return; return;
@ -103,4 +131,12 @@ public synchronized void close() throws IOException {
wrappedStream.close(); wrappedStream.close();
closed = true; closed = true;
} }
private void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
path.toUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED
);
}
}
} }

View File

@ -69,6 +69,14 @@ public void init() throws IOException {
} }
/**
* Any teardown logic can go here.
* @throws IOException IO problems
*/
public void teardown() throws IOException {
}
/** /**
* Add a configuration resource to this instance's configuration * Add a configuration resource to this instance's configuration
* @param resource resource reference * @param resource resource reference
@ -113,7 +121,7 @@ public FileSystem getFileSystem(URI uri) throws IOException {
public abstract FileSystem getTestFileSystem() throws IOException; public abstract FileSystem getTestFileSystem() throws IOException;
/** /**
* Get the scheme of this FS * Get the scheme of this FS.
* @return the scheme this FS supports * @return the scheme this FS supports
*/ */
public abstract String getScheme(); public abstract String getScheme();

View File

@ -213,6 +213,9 @@ public void teardown() throws Exception {
Thread.currentThread().setName("teardown"); Thread.currentThread().setName("teardown");
LOG.debug("== Teardown =="); LOG.debug("== Teardown ==");
deleteTestDirInTeardown(); deleteTestDirInTeardown();
if (contract != null) {
contract.teardown();
}
LOG.debug("== Teardown complete =="); LOG.debug("== Teardown complete ==");
} }

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.sftp;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.sftp.SFTPFileSystem;
import org.apache.sshd.common.NamedFactory;
import org.apache.sshd.server.SshServer;
import org.apache.sshd.server.auth.UserAuth;
import org.apache.sshd.server.auth.password.UserAuthPasswordFactory;
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
import org.apache.sshd.server.subsystem.sftp.SftpSubsystemFactory;
public class SFTPContract extends AbstractFSContract {
private static final String CONTRACT_XML = "contract/sftp.xml";
private static final URI TEST_URI =
URI.create("sftp://user:password@localhost");
private final String testDataDir =
new FileSystemTestHelper().getTestRootDir();
private final Configuration conf;
private SshServer sshd;
public SFTPContract(Configuration conf) {
super(conf);
addConfResource(CONTRACT_XML);
this.conf = conf;
}
@Override
public void init() throws IOException {
sshd = SshServer.setUpDefaultServer();
// ask OS to assign a port
sshd.setPort(0);
sshd.setKeyPairProvider(new SimpleGeneratorHostKeyProvider());
List<NamedFactory<UserAuth>> userAuthFactories = new ArrayList<>();
userAuthFactories.add(new UserAuthPasswordFactory());
sshd.setUserAuthFactories(userAuthFactories);
sshd.setPasswordAuthenticator((username, password, session) ->
username.equals("user") && password.equals("password")
);
sshd.setSubsystemFactories(
Collections.singletonList(new SftpSubsystemFactory()));
sshd.start();
int port = sshd.getPort();
conf.setClass("fs.sftp.impl", SFTPFileSystem.class, FileSystem.class);
conf.setInt("fs.sftp.host.port", port);
conf.setBoolean("fs.sftp.impl.disable.cache", true);
}
@Override
public void teardown() throws IOException {
if (sshd != null) {
sshd.stop();
}
}
@Override
public FileSystem getTestFileSystem() throws IOException {
return FileSystem.get(TEST_URI, conf);
}
@Override
public String getScheme() {
return "sftp";
}
@Override
public Path getTestPath() {
try {
FileSystem fs = FileSystem.get(
URI.create("sftp://user:password@localhost"), conf
);
return fs.makeQualified(new Path(testDataDir));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract.sftp;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.contract.AbstractContractSeekTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
public class TestSFTPContractSeek extends AbstractContractSeekTest {
@Override
protected AbstractFSContract createContract(Configuration conf) {
return new SFTPContract(conf);
}
}

View File

@ -0,0 +1,79 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<!--
SFTP -these options are for testing against a remote unix filesystem.
-->
<property>
<name>fs.contract.test.root-tests-enabled</name>
<value>false</value>
</property>
<property>
<name>fs.contract.is-case-sensitive</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-append</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-atomic-directory-delete</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-atomic-rename</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-block-locality</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-concat</name>
<value>false</value>
</property>
<property>
<name>fs.contract.supports-seek</name>
<value>true</value>
</property>
<property>
<name>fs.contract.rejects-seek-past-eof</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-strict-exceptions</name>
<value>true</value>
</property>
<property>
<name>fs.contract.supports-unix-permissions</name>
<value>false</value>
</property>
</configuration>