From 0a212a40fcbd12a11294bff7a31e7433111733c9 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Mon, 9 Jan 2017 15:18:26 -0800 Subject: [PATCH] HADOOP-13953. Make FTPFileSystem's data connection mode and transfer mode configurable. Contributed by Xiao Chen. --- .../apache/hadoop/fs/ftp/FTPFileSystem.java | 70 ++++++++++++++++++- .../src/main/resources/core-default.xml | 18 +++++ .../conf/TestCommonConfigurationFields.java | 2 + .../hadoop/fs/ftp/TestFTPFileSystem.java | 56 ++++++++++++++- 4 files changed, 143 insertions(+), 3 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java index 28b07e8571..25fec31ca0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java @@ -23,6 +23,7 @@ import java.net.ConnectException; import java.net.URI; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,9 @@ public class FTPFileSystem extends FileSystem { public static final String FS_FTP_HOST = "fs.ftp.host"; public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port"; public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password."; + public static final String FS_FTP_DATA_CONNECTION_MODE = + "fs.ftp.data.connection.mode"; + public static final String FS_FTP_TRANSFER_MODE = "fs.ftp.transfer.mode"; public static final String E_SAME_DIRECTORY_ONLY = "only same directory renames are supported"; @@ -143,9 +147,10 @@ private FTPClient connect() throws IOException { NetUtils.UNKNOWN_HOST, 0, new ConnectException("Server response " + reply)); } else if (client.login(user, password)) { - client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); + client.setFileTransferMode(getTransferMode(conf)); client.setFileType(FTP.BINARY_FILE_TYPE); client.setBufferSize(DEFAULT_BUFFER_SIZE); + setDataConnectionMode(client, conf); } else { throw new IOException("Login failed on server - " + host + ", port - " + port + " as user '" + user + "'"); @@ -154,6 +159,69 @@ private FTPClient connect() throws IOException { return client; } + /** + * Set FTP's transfer mode based on configuration. Valid values are + * STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE. + *

+ * Defaults to BLOCK_TRANSFER_MODE. + * + * @param conf + * @return + */ + @VisibleForTesting + int getTransferMode(Configuration conf) { + final String mode = conf.get(FS_FTP_TRANSFER_MODE); + // FTP default is STREAM_TRANSFER_MODE, but Hadoop FTPFS's default is + // FTP.BLOCK_TRANSFER_MODE historically. + int ret = FTP.BLOCK_TRANSFER_MODE; + if (mode == null) { + return ret; + } + final String upper = mode.toUpperCase(); + if (upper.equals("STREAM_TRANSFER_MODE")) { + ret = FTP.STREAM_TRANSFER_MODE; + } else if (upper.equals("COMPRESSED_TRANSFER_MODE")) { + ret = FTP.COMPRESSED_TRANSFER_MODE; + } else { + if (!upper.equals("BLOCK_TRANSFER_MODE")) { + LOG.warn("Cannot parse the value for " + FS_FTP_TRANSFER_MODE + ": " + + mode + ". Using default."); + } + } + return ret; + } + + /** + * Set the FTPClient's data connection mode based on configuration. Valid + * values are ACTIVE_LOCAL_DATA_CONNECTION_MODE, + * PASSIVE_LOCAL_DATA_CONNECTION_MODE and PASSIVE_REMOTE_DATA_CONNECTION_MODE. + *

+ * Defaults to ACTIVE_LOCAL_DATA_CONNECTION_MODE. + * + * @param client + * @param conf + * @throws IOException + */ + @VisibleForTesting + void setDataConnectionMode(FTPClient client, Configuration conf) + throws IOException { + final String mode = conf.get(FS_FTP_DATA_CONNECTION_MODE); + if (mode == null) { + return; + } + final String upper = mode.toUpperCase(); + if (upper.equals("PASSIVE_LOCAL_DATA_CONNECTION_MODE")) { + client.enterLocalPassiveMode(); + } else if (upper.equals("PASSIVE_REMOTE_DATA_CONNECTION_MODE")) { + client.enterRemotePassiveMode(); + } else { + if (!upper.equals("ACTIVE_LOCAL_DATA_CONNECTION_MODE")) { + LOG.warn("Cannot parse the value for " + FS_FTP_DATA_CONNECTION_MODE + + ": " + mode + ". Using default."); + } + } + } + /** * Logout and disconnect the given FTPClient. * * diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index ee2cc2e2d8..9355a3c854 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -782,6 +782,24 @@ + + fs.ftp.data.connection.mode + ACTIVE_LOCAL_DATA_CONNECTION_MODE + Set the FTPClient's data connection mode based on configuration. + Valid values are ACTIVE_LOCAL_DATA_CONNECTION_MODE, + PASSIVE_LOCAL_DATA_CONNECTION_MODE and PASSIVE_REMOTE_DATA_CONNECTION_MODE. + + + + + fs.ftp.transfer.mode + BLOCK_TRANSFER_MODE + + Set FTP's transfer mode based on configuration. Valid values are + STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE. + + + fs.df.interval 60000 diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java index a3a4026eee..966a8ac0d0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java @@ -89,6 +89,8 @@ public void initializeMemberVariables() { // Lots of properties not in the above classes xmlPropsToSkipCompare.add("fs.ftp.password.localhost"); xmlPropsToSkipCompare.add("fs.ftp.user.localhost"); + xmlPropsToSkipCompare.add("fs.ftp.data.connection.mode"); + xmlPropsToSkipCompare.add("fs.ftp.transfer.mode"); xmlPropsToSkipCompare.add("hadoop.tmp.dir"); xmlPropsToSkipCompare.add("nfs3.mountd.port"); xmlPropsToSkipCompare.add("nfs3.server.port"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java index 0ce2a9b057..06046046e3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java @@ -19,15 +19,67 @@ import org.apache.commons.net.ftp.FTP; -import org.junit.Assert; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.hadoop.conf.Configuration; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; +import static org.junit.Assert.assertEquals; + +/** + * Test basic @{link FTPFileSystem} class methods. Contract tests are in + * TestFTPContractXXXX. + */ public class TestFTPFileSystem { + @Rule + public Timeout testTimeout = new Timeout(180000); + @Test public void testFTPDefaultPort() throws Exception { FTPFileSystem ftp = new FTPFileSystem(); - Assert.assertEquals(FTP.DEFAULT_PORT, ftp.getDefaultPort()); + assertEquals(FTP.DEFAULT_PORT, ftp.getDefaultPort()); } + @Test + public void testFTPTransferMode() throws Exception { + Configuration conf = new Configuration(); + FTPFileSystem ftp = new FTPFileSystem(); + assertEquals(FTP.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf)); + + conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "STREAM_TRANSFER_MODE"); + assertEquals(FTP.STREAM_TRANSFER_MODE, ftp.getTransferMode(conf)); + + conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "COMPRESSED_TRANSFER_MODE"); + assertEquals(FTP.COMPRESSED_TRANSFER_MODE, ftp.getTransferMode(conf)); + + conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "invalid"); + assertEquals(FTPClient.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf)); + } + + @Test + public void testFTPDataConnectionMode() throws Exception { + Configuration conf = new Configuration(); + FTPClient client = new FTPClient(); + FTPFileSystem ftp = new FTPFileSystem(); + assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE, + client.getDataConnectionMode()); + + ftp.setDataConnectionMode(client, conf); + assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE, + client.getDataConnectionMode()); + + conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE, "invalid"); + ftp.setDataConnectionMode(client, conf); + assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE, + client.getDataConnectionMode()); + + conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE, + "PASSIVE_LOCAL_DATA_CONNECTION_MODE"); + ftp.setDataConnectionMode(client, conf); + assertEquals(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE, + client.getDataConnectionMode()); + + } } \ No newline at end of file