diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java index 28b07e8571..25fec31ca0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java @@ -23,6 +23,7 @@ import java.net.ConnectException; import java.net.URI; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,9 @@ public class FTPFileSystem extends FileSystem { public static final String FS_FTP_HOST = "fs.ftp.host"; public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port"; public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password."; + public static final String FS_FTP_DATA_CONNECTION_MODE = + "fs.ftp.data.connection.mode"; + public static final String FS_FTP_TRANSFER_MODE = "fs.ftp.transfer.mode"; public static final String E_SAME_DIRECTORY_ONLY = "only same directory renames are supported"; @@ -143,9 +147,10 @@ private FTPClient connect() throws IOException { NetUtils.UNKNOWN_HOST, 0, new ConnectException("Server response " + reply)); } else if (client.login(user, password)) { - client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); + client.setFileTransferMode(getTransferMode(conf)); client.setFileType(FTP.BINARY_FILE_TYPE); client.setBufferSize(DEFAULT_BUFFER_SIZE); + setDataConnectionMode(client, conf); } else { throw new IOException("Login failed on server - " + host + ", port - " + port + " as user '" + user + "'"); @@ -154,6 +159,69 @@ private FTPClient connect() throws IOException { return client; } + /** + * Set FTP's transfer mode based on configuration. Valid values are + * STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE. + *
+ * Defaults to BLOCK_TRANSFER_MODE. + * + * @param conf + * @return + */ + @VisibleForTesting + int getTransferMode(Configuration conf) { + final String mode = conf.get(FS_FTP_TRANSFER_MODE); + // FTP default is STREAM_TRANSFER_MODE, but Hadoop FTPFS's default is + // FTP.BLOCK_TRANSFER_MODE historically. + int ret = FTP.BLOCK_TRANSFER_MODE; + if (mode == null) { + return ret; + } + final String upper = mode.toUpperCase(); + if (upper.equals("STREAM_TRANSFER_MODE")) { + ret = FTP.STREAM_TRANSFER_MODE; + } else if (upper.equals("COMPRESSED_TRANSFER_MODE")) { + ret = FTP.COMPRESSED_TRANSFER_MODE; + } else { + if (!upper.equals("BLOCK_TRANSFER_MODE")) { + LOG.warn("Cannot parse the value for " + FS_FTP_TRANSFER_MODE + ": " + + mode + ". Using default."); + } + } + return ret; + } + + /** + * Set the FTPClient's data connection mode based on configuration. Valid + * values are ACTIVE_LOCAL_DATA_CONNECTION_MODE, + * PASSIVE_LOCAL_DATA_CONNECTION_MODE and PASSIVE_REMOTE_DATA_CONNECTION_MODE. + * + * Defaults to ACTIVE_LOCAL_DATA_CONNECTION_MODE. + * + * @param client + * @param conf + * @throws IOException + */ + @VisibleForTesting + void setDataConnectionMode(FTPClient client, Configuration conf) + throws IOException { + final String mode = conf.get(FS_FTP_DATA_CONNECTION_MODE); + if (mode == null) { + return; + } + final String upper = mode.toUpperCase(); + if (upper.equals("PASSIVE_LOCAL_DATA_CONNECTION_MODE")) { + client.enterLocalPassiveMode(); + } else if (upper.equals("PASSIVE_REMOTE_DATA_CONNECTION_MODE")) { + client.enterRemotePassiveMode(); + } else { + if (!upper.equals("ACTIVE_LOCAL_DATA_CONNECTION_MODE")) { + LOG.warn("Cannot parse the value for " + FS_FTP_DATA_CONNECTION_MODE + + ": " + mode + ". Using default."); + } + } + } + /** * Logout and disconnect the given FTPClient. * * diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index ee2cc2e2d8..9355a3c854 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -782,6 +782,24 @@ +