diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index b2c1af3e31..d9e6180bc1 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -488,6 +488,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11261 Set custom endpoint for S3A. (Thomas Demoor via stevel) + HADOOP-11171 Enable using a proxy server to connect to S3a. + (Thomas Demoor via stevel) + OPTIMIZATIONS HADOOP-11323. WritableComparator#compare keeps reference to byte array. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 0232961469..b6863bb162 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -31,6 +31,13 @@ public class Constants { //use a custom endpoint? public static final String ENDPOINT = "fs.s3a.endpoint"; + //connect to s3 through a proxy server? + public static final String PROXY_HOST = "fs.s3a.proxy.host"; + public static final String PROXY_PORT = "fs.s3a.proxy.port"; + public static final String PROXY_USERNAME = "fs.s3a.proxy.username"; + public static final String PROXY_PASSWORD = "fs.s3a.proxy.password"; + public static final String PROXY_DOMAIN = "fs.s3a.proxy.domain"; + public static final String PROXY_WORKSTATION = "fs.s3a.proxy.workstation"; // number of times we should retry errors public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index d8cf73f8f8..0e4d54f132 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -169,13 +169,54 @@ public void initialize(URI name, Configuration conf) throws IOException { ClientConfiguration awsConf = new ClientConfiguration(); awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS)); - awsConf.setProtocol(conf.getBoolean(SECURE_CONNECTIONS, - DEFAULT_SECURE_CONNECTIONS) ? Protocol.HTTPS : Protocol.HTTP); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, + DEFAULT_SECURE_CONNECTIONS); + awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES)); awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT)); + String proxyHost = conf.getTrimmed(PROXY_HOST,""); + int proxyPort = conf.getInt(PROXY_PORT, -1); + if (!proxyHost.isEmpty()) { + awsConf.setProxyHost(proxyHost); + if (proxyPort >= 0) { + awsConf.setProxyPort(proxyPort); + } else { + if (secureConnections) { + LOG.warn("Proxy host set without port. Using HTTPS default 443"); + awsConf.setProxyPort(443); + } else { + LOG.warn("Proxy host set without port. Using HTTP default 80"); + awsConf.setProxyPort(80); + } + } + String proxyUsername = conf.getTrimmed(PROXY_USERNAME); + String proxyPassword = conf.getTrimmed(PROXY_PASSWORD); + if ((proxyUsername == null) != (proxyPassword == null)) { + String msg = "Proxy error: " + PROXY_USERNAME + " or " + + PROXY_PASSWORD + " set without the other."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + awsConf.setProxyUsername(proxyUsername); + awsConf.setProxyPassword(proxyPassword); + awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); + awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); + if (LOG.isDebugEnabled()) { + LOG.debug("Using proxy server {}:{} as user {} with password {} on " + + "domain {} as workstation {}", awsConf.getProxyHost(), + awsConf.getProxyPort(), String.valueOf(awsConf.getProxyUsername()), + awsConf.getProxyPassword(), awsConf.getProxyDomain(), + awsConf.getProxyWorkstation()); + } + } else if (proxyPort >= 0) { + String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + s3 = new AmazonS3Client(credentials, awsConf); String endPoint = conf.getTrimmed(ENDPOINT,""); if (!endPoint.isEmpty()) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java index e4a14d0f0b..25068f8d9a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java @@ -20,6 +20,7 @@ import com.amazonaws.services.s3.AmazonS3Client; import org.apache.commons.lang.StringUtils; +import com.amazonaws.AmazonClientException; import org.apache.hadoop.conf.Configuration; import org.junit.Rule; @@ -82,4 +83,98 @@ public void TestEndpoint() throws Exception { endPointRegion, s3.getBucketLocation(fs.getUri().getHost())); } } + + @Test + public void TestProxyConnection() throws Exception { + conf = new Configuration(); + conf.setInt(Constants.MAX_ERROR_RETRIES, 2); + conf.set(Constants.PROXY_HOST, "127.0.0.1"); + conf.setInt(Constants.PROXY_PORT, 1); + String proxy = + conf.get(Constants.PROXY_HOST) + ":" + conf.get(Constants.PROXY_PORT); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a connection error for proxy server at " + proxy); + } catch (AmazonClientException e) { + if (!e.getMessage().contains(proxy + " refused")) { + throw e; + } + } + } + + @Test + public void TestProxyPortWithoutHost() throws Exception { + conf = new Configuration(); + conf.setInt(Constants.MAX_ERROR_RETRIES, 2); + conf.setInt(Constants.PROXY_PORT, 1); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a proxy configuration error"); + } catch (IllegalArgumentException e) { + String msg = e.toString(); + if (!msg.contains(Constants.PROXY_HOST) && + !msg.contains(Constants.PROXY_PORT)) { + throw e; + } + } + } + + @Test + public void TestAutomaticProxyPortSelection() throws Exception { + conf = new Configuration(); + conf.setInt(Constants.MAX_ERROR_RETRIES, 2); + conf.set(Constants.PROXY_HOST, "127.0.0.1"); + conf.set(Constants.SECURE_CONNECTIONS, "true"); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a connection error for proxy server"); + } catch (AmazonClientException e) { + if (!e.getMessage().contains("443")) { + throw e; + } + } + conf.set(Constants.SECURE_CONNECTIONS, "false"); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a connection error for proxy server"); + } catch (AmazonClientException e) { + if (!e.getMessage().contains("80")) { + throw e; + } + } + } + + @Test + public void TestUsernameInconsistentWithPassword() throws Exception { + conf = new Configuration(); + conf.setInt(Constants.MAX_ERROR_RETRIES, 2); + conf.set(Constants.PROXY_HOST, "127.0.0.1"); + conf.setInt(Constants.PROXY_PORT, 1); + conf.set(Constants.PROXY_USERNAME, "user"); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a connection error for proxy server"); + } catch (IllegalArgumentException e) { + String msg = e.toString(); + if (!msg.contains(Constants.PROXY_USERNAME) && + !msg.contains(Constants.PROXY_PASSWORD)) { + throw e; + } + } + conf = new Configuration(); + conf.setInt(Constants.MAX_ERROR_RETRIES, 2); + conf.set(Constants.PROXY_HOST, "127.0.0.1"); + conf.setInt(Constants.PROXY_PORT, 1); + conf.set(Constants.PROXY_PASSWORD, "password"); + try { + fs = S3ATestUtils.createTestFileSystem(conf); + fail("Expected a connection error for proxy server"); + } catch (IllegalArgumentException e) { + String msg = e.toString(); + if (!msg.contains(Constants.PROXY_USERNAME) && + !msg.contains(Constants.PROXY_PASSWORD)) { + throw e; + } + } + } }