From 18d7dfbf35564694e24bf2b7c99fea1bee1c790e Mon Sep 17 00:00:00 2001 From: Mike Date: Mon, 27 Apr 2020 16:43:51 +0300 Subject: [PATCH] HDFS-1820. FTPFileSystem attempts to close the outputstream even when it is not initialised. (#1952) Contributed by Mikhail Pryakhin. --- hadoop-common-project/hadoop-common/pom.xml | 5 + .../apache/hadoop/fs/ftp/FTPFileSystem.java | 27 +++-- .../apache/hadoop/fs/ftp/FtpTestServer.java | 99 +++++++++++++++++++ .../hadoop/fs/ftp/TestFTPFileSystem.java | 84 +++++++++++++++- 4 files changed, 204 insertions(+), 11 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/FtpTestServer.java diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index 777e001c7e..d768907850 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -276,6 +276,11 @@ sshd-core test + + org.apache.ftpserver + ftpserver-core + test + org.apache.htrace diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java index 4b144bfddf..28db2c9a1a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java @@ -20,6 +20,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.ConnectException; import java.net.URI; @@ -41,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; @@ -110,7 +112,9 @@ public void initialize(URI uri, Configuration conf) throws IOException { // get // get port information from uri, (overrides info in conf) int port = uri.getPort(); - port = (port == -1) ? FTP.DEFAULT_PORT : port; + if(port == -1){ + port = conf.getInt(FS_FTP_HOST_PORT, FTP.DEFAULT_PORT); + } conf.setInt(FS_FTP_HOST_PORT, port); // get user/password information from URI (overrides info in conf) @@ -340,8 +344,19 @@ public FSDataOutputStream create(Path file, FsPermission permission, // file. The FTP client connection is closed when close() is called on the // FSDataOutputStream. client.changeWorkingDirectory(parent.toUri().getPath()); - FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file - .getName()), statistics) { + OutputStream outputStream = client.storeFileStream(file.getName()); + + if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { + // The ftpClient is an inconsistent state. Must close the stream + // which in turn will logout and disconnect from FTP server + if (outputStream != null) { + IOUtils.closeStream(outputStream); + } + disconnect(client); + throw new IOException("Unable to create file: " + file + ", Aborting"); + } + + FSDataOutputStream fos = new FSDataOutputStream(outputStream, statistics) { @Override public void close() throws IOException { super.close(); @@ -356,12 +371,6 @@ public void close() throws IOException { } } }; - if (!FTPReply.isPositivePreliminary(client.getReplyCode())) { - // The ftpClient is an inconsistent state. Must close the stream - // which in turn will logout and disconnect from FTP server - fos.close(); - throw new IOException("Unable to create file: " + file + ", Aborting"); - } return fos; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/FtpTestServer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/FtpTestServer.java new file mode 100644 index 0000000000..eca26dea5b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/FtpTestServer.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.ftp; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; + +import org.apache.ftpserver.FtpServer; +import org.apache.ftpserver.FtpServerFactory; +import org.apache.ftpserver.ftplet.Authority; +import org.apache.ftpserver.ftplet.FtpException; +import org.apache.ftpserver.ftplet.UserManager; +import org.apache.ftpserver.impl.DefaultFtpServer; +import org.apache.ftpserver.listener.Listener; +import org.apache.ftpserver.listener.ListenerFactory; +import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory; +import org.apache.ftpserver.usermanager.impl.BaseUser; + +/** + * Helper class facilitating to manage a local ftp + * server for unit tests purposes only. + */ +public class FtpTestServer { + + private int port; + private Path ftpRoot; + private UserManager userManager; + private FtpServer server; + + public FtpTestServer(Path ftpRoot) { + this.ftpRoot = ftpRoot; + this.userManager = new PropertiesUserManagerFactory().createUserManager(); + FtpServerFactory serverFactory = createServerFactory(); + serverFactory.setUserManager(userManager); + this.server = serverFactory.createServer(); + } + + public FtpTestServer start() throws Exception { + server.start(); + Listener listener = ((DefaultFtpServer) server) + .getListeners() + .get("default"); + port = listener.getPort(); + return this; + } + + public Path getFtpRoot() { + return ftpRoot; + } + + public int getPort() { + return port; + } + + public void stop() { + if (!server.isStopped()) { + server.stop(); + } + } + + public BaseUser addUser(String name, String password, + Authority... authorities) throws IOException, FtpException { + + BaseUser user = new BaseUser(); + user.setName(name); + user.setPassword(password); + Path userHome = Files.createDirectory(ftpRoot.resolve(name)); + user.setHomeDirectory(userHome.toString()); + user.setAuthorities(Arrays.asList(authorities)); + userManager.save(user); + return user; + } + + private FtpServerFactory createServerFactory() { + FtpServerFactory serverFactory = new FtpServerFactory(); + ListenerFactory defaultListener = new ListenerFactory(); + defaultListener.setPort(0); + serverFactory.addListener("default", defaultListener.createListener()); + return serverFactory; + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java index 3d41ccb91d..02d5a4852b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java @@ -17,18 +17,35 @@ */ package org.apache.hadoop.fs.ftp; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Comparator; + import com.google.common.base.Preconditions; import org.apache.commons.net.ftp.FTP; - import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPFile; +import org.apache.ftpserver.usermanager.impl.BaseUser; +import org.apache.ftpserver.usermanager.impl.WritePermission; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; - +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; /** @@ -37,9 +54,72 @@ */ public class TestFTPFileSystem { + private FtpTestServer server; + @Rule public Timeout testTimeout = new Timeout(180000); + @Before + public void setUp() throws Exception { + server = new FtpTestServer(GenericTestUtils.getTestDir().toPath()).start(); + } + + @After + @SuppressWarnings("ResultOfMethodCallIgnored") + public void tearDown() throws Exception { + if (server != null) { + server.stop(); + Files.walk(server.getFtpRoot()) + .sorted(Comparator.reverseOrder()) + .map(java.nio.file.Path::toFile) + .forEach(File::delete); + } + } + + @Test + public void testCreateWithWritePermissions() throws Exception { + BaseUser user = server.addUser("test", "password", new WritePermission()); + Configuration configuration = new Configuration(); + configuration.set("fs.defaultFS", "ftp:///"); + configuration.set("fs.ftp.host", "localhost"); + configuration.setInt("fs.ftp.host.port", server.getPort()); + configuration.set("fs.ftp.user.localhost", user.getName()); + configuration.set("fs.ftp.password.localhost", user.getPassword()); + configuration.setBoolean("fs.ftp.impl.disable.cache", true); + + FileSystem fs = FileSystem.get(configuration); + byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8); + try (FSDataOutputStream outputStream = fs.create(new Path("test1.txt"))) { + outputStream.write(bytesExpected); + } + try (FSDataInputStream input = fs.open(new Path("test1.txt"))) { + assertThat(bytesExpected, equalTo(IOUtils.readFullyToByteArray(input))); + } + } + + @Test + public void testCreateWithoutWritePermissions() throws Exception { + BaseUser user = server.addUser("test", "password"); + Configuration configuration = new Configuration(); + configuration.set("fs.defaultFS", "ftp:///"); + configuration.set("fs.ftp.host", "localhost"); + configuration.setInt("fs.ftp.host.port", server.getPort()); + configuration.set("fs.ftp.user.localhost", user.getName()); + configuration.set("fs.ftp.password.localhost", user.getPassword()); + configuration.setBoolean("fs.ftp.impl.disable.cache", true); + + FileSystem fs = FileSystem.get(configuration); + byte[] bytesExpected = "hello world".getBytes(StandardCharsets.UTF_8); + LambdaTestUtils.intercept( + IOException.class, "Unable to create file: test1.txt, Aborting", + () -> { + try (FSDataOutputStream out = fs.create(new Path("test1.txt"))) { + out.write(bytesExpected); + } + } + ); + } + @Test public void testFTPDefaultPort() throws Exception { FTPFileSystem ftp = new FTPFileSystem();