HDFS-1820. FTPFileSystem attempts to close the outputstream even when it is not initialised. (#1952)

Contributed by Mikhail Pryakhin.
This commit is contained in:
Mike 2020-04-27 16:43:51 +03:00 committed by GitHub
parent 9224568b0e
commit 18d7dfbf35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 204 additions and 11 deletions

View File

@ -276,6 +276,11 @@
<artifactId>sshd-core</artifactId> <artifactId>sshd-core</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.ftpserver</groupId>
<artifactId>ftpserver-core</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.htrace</groupId> <groupId>org.apache.htrace</groupId>

View File

@ -20,6 +20,7 @@
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.URI; import java.net.URI;
@ -41,6 +42,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -110,7 +112,9 @@ public void initialize(URI uri, Configuration conf) throws IOException { // get
// get port information from uri, (overrides info in conf) // get port information from uri, (overrides info in conf)
int port = uri.getPort(); int port = uri.getPort();
port = (port == -1) ? FTP.DEFAULT_PORT : port; if(port == -1){
port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT);
}
conf.setInt(FS_FTP_HOST_PORT, port); conf.setInt(FS_FTP_HOST_PORT, port);
// get user/password information from URI (overrides info in conf) // get user/password information from URI (overrides info in conf)
@ -340,8 +344,19 @@ public FSDataOutputStream create(Path file, FsPermission permission,
// file. The FTP client connection is closed when close() is called on the // file. The FTP client connection is closed when close() is called on the
// FSDataOutputStream. // FSDataOutputStream.
client.changeWorkingDirectory(parent.toUri().getPath()); client.changeWorkingDirectory(parent.toUri().getPath());
FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file OutputStream outputStream = client.storeFileStream(file.getName());
.getName()), statistics) {
if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
// The ftpClient is an inconsistent state. Must close the stream
// which in turn will logout and disconnect from FTP server
if (outputStream != null) {
IOUtils.closeStream(outputStream);
}
disconnect(client);
throw new IOException("Unable to create file: " + file + ", Aborting");
}
FSDataOutputStream fos = new FSDataOutputStream(outputStream, statistics) {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
super.close(); super.close();
@ -356,12 +371,6 @@ public void close() throws IOException {
} }
} }
}; };
if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
// The ftpClient is an inconsistent state. Must close the stream
// which in turn will logout and disconnect from FTP server
fos.close();
throw new IOException("Unable to create file: " + file + ", Aborting");
}
return fos; return fos;
} }

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ftp;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import org.apache.ftpserver.FtpServer;
import org.apache.ftpserver.FtpServerFactory;
import org.apache.ftpserver.ftplet.Authority;
import org.apache.ftpserver.ftplet.FtpException;
import org.apache.ftpserver.ftplet.UserManager;
import org.apache.ftpserver.impl.DefaultFtpServer;
import org.apache.ftpserver.listener.Listener;
import org.apache.ftpserver.listener.ListenerFactory;
import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory;
import org.apache.ftpserver.usermanager.impl.BaseUser;
/**
* Helper class facilitating to manage a local ftp
* server for unit tests purposes only.
*/
public class FtpTestServer {
private int port;
private Path ftpRoot;
private UserManager userManager;
private FtpServer server;
public FtpTestServer(Path ftpRoot) {
this.ftpRoot = ftpRoot;
this.userManager = new PropertiesUserManagerFactory().createUserManager();
FtpServerFactory serverFactory = createServerFactory();
serverFactory.setUserManager(userManager);
this.server = serverFactory.createServer();
}
public FtpTestServer start() throws Exception {
server.start();
Listener listener = ((DefaultFtpServer) server)
.getListeners()
.get("default");
port = listener.getPort();
return this;
}
public Path getFtpRoot() {
return ftpRoot;
}
public int getPort() {
return port;
}
public void stop() {
if (!server.isStopped()) {
server.stop();
}
}
public BaseUser addUser(String name, String password,
Authority... authorities) throws IOException, FtpException {
BaseUser user = new BaseUser();
user.setName(name);
user.setPassword(password);
Path userHome = Files.createDirectory(ftpRoot.resolve(name));
user.setHomeDirectory(userHome.toString());
user.setAuthorities(Arrays.asList(authorities));
userManager.save(user);
return user;
}
private FtpServerFactory createServerFactory() {
FtpServerFactory serverFactory = new FtpServerFactory();
ListenerFactory defaultListener = new ListenerFactory();
defaultListener.setPort(0);
serverFactory.addListener("default", defaultListener.createListener());
return serverFactory;
}
}

View File

@ -17,18 +17,35 @@
*/ */
package org.apache.hadoop.fs.ftp; package org.apache.hadoop.fs.ftp;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Comparator;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile; import org.apache.commons.net.ftp.FTPFile;
import org.apache.ftpserver.usermanager.impl.BaseUser;
import org.apache.ftpserver.usermanager.impl.WritePermission;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
/** /**
@ -37,9 +54,72 @@
*/ */
public class TestFTPFileSystem { public class TestFTPFileSystem {
private FtpTestServer server;
@Rule @Rule
public Timeout testTimeout = new Timeout(180000); public Timeout testTimeout = new Timeout(180000);
@Before
public void setUp() throws Exception {
server = new FtpTestServer(GenericTestUtils.getTestDir().toPath()).start();
}
@After
@SuppressWarnings("ResultOfMethodCallIgnored")
public void tearDown() throws Exception {
if (server != null) {
server.stop();
Files.walk(server.getFtpRoot())
.sorted(Comparator.reverseOrder())
.map(java.nio.file.Path::toFile)
.forEach(File::delete);
}
}
@Test
public void testCreateWithWritePermissions() throws Exception {
BaseUser user = server.addUser("test", "password", new WritePermission());
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "ftp:///");
configuration.set("fs.ftp.host", "localhost");
configuration.setInt("fs.ftp.host.port", server.getPort());
configuration.set("fs.ftp.user.localhost", user.getName());
configuration.set("fs.ftp.password.localhost", user.getPassword());
configuration.setBoolean("fs.ftp.impl.disable.cache", true);
FileSystem fs = FileSystem.get(configuration);
byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8);
try (FSDataOutputStream outputStream = fs.create(new Path("test1.txt"))) {
outputStream.write(bytesExpected);
}
try (FSDataInputStream input = fs.open(new Path("test1.txt"))) {
assertThat(bytesExpected, equalTo(IOUtils.readFullyToByteArray(input)));
}
}
@Test
public void testCreateWithoutWritePermissions() throws Exception {
BaseUser user = server.addUser("test", "password");
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "ftp:///");
configuration.set("fs.ftp.host", "localhost");
configuration.setInt("fs.ftp.host.port", server.getPort());
configuration.set("fs.ftp.user.localhost", user.getName());
configuration.set("fs.ftp.password.localhost", user.getPassword());
configuration.setBoolean("fs.ftp.impl.disable.cache", true);
FileSystem fs = FileSystem.get(configuration);
byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8);
LambdaTestUtils.intercept(
IOException.class, "Unable to create file: test1.txt, Aborting",
() -> {
try (FSDataOutputStream out = fs.create(new Path("test1.txt"))) {
out.write(bytesExpected);
}
}
);
}
@Test @Test
public void testFTPDefaultPort() throws Exception { public void testFTPDefaultPort() throws Exception {
FTPFileSystem ftp = new FTPFileSystem(); FTPFileSystem ftp = new FTPFileSystem();