From d5403747b57b1e294e533ce17f197e7be8f5339c Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Tue, 4 Aug 2015 18:51:52 -0700 Subject: [PATCH] HADOOP-12269. Update aws-sdk dependency to 1.10.6 (Thomas Demoor via Lei (Eddy) Xu) --- .../src/main/resources/core-default.xml | 6 +++++ .../hadoop-hdfs-httpfs/pom.xml | 4 +-- hadoop-project/pom.xml | 4 +-- hadoop-tools/hadoop-aws/pom.xml | 2 +- .../org/apache/hadoop/fs/s3a/Constants.java | 9 ++++--- .../hadoop/fs/s3a/S3AFastOutputStream.java | 2 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 26 +++++++++++-------- .../apache/hadoop/fs/s3a/S3AOutputStream.java | 15 +++++++---- .../site/markdown/tools/hadoop-aws/index.md | 6 +++++ 9 files changed, 49 insertions(+), 25 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index bfdd453ade..8efeee2739 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -865,6 +865,12 @@ for ldap providers in the same way as above does. Minimum age in seconds of multipart uploads to purge + + fs.s3a.signing-algorithm + Override the default signing algorithm so legacy + implementations can still be used + + fs.s3a.buffer.dir ${hadoop.tmp.dir}/s3a diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml index 4457597e12..9be99ae8a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/pom.xml @@ -132,7 +132,7 @@ com.amazonaws - aws-java-sdk + aws-java-sdk-s3 org.eclipse.jdt @@ -175,7 +175,7 @@ com.amazonaws - aws-java-sdk + aws-java-sdk-s3 org.eclipse.jdt diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index d56342044c..023b1c4f03 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -657,8 +657,8 @@ com.amazonaws - aws-java-sdk - 1.7.4 + aws-java-sdk-s3 + 1.10.6 org.apache.mina diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 590778dbc2..9ef99fb160 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -119,7 +119,7 @@ com.amazonaws - aws-java-sdk + aws-java-sdk-s3 compile diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 3486dfbedf..fe8dd77b7b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -82,8 +82,8 @@ public class Constants { // minimum size in bytes before we start a multipart uploads or copy public static final String MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold"; - public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE; - + public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE; + // comma separated list of directories public static final String BUFFER_DIR = "fs.s3a.buffer.dir"; @@ -111,7 +111,10 @@ public class Constants { // s3 server-side encryption public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "fs.s3a.server-side-encryption-algorithm"; - + + //override signature algorithm used for signing requests + public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm"; + public static final String S3N_FOLDER_SUFFIX = "_$folder$"; public static final String FS_S3A_BLOCK_SIZE = "fs.s3a.block.size"; public static final String FS_S3A = "s3a"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java index 68195817ee..2e06fba275 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java @@ -267,7 +267,7 @@ public synchronized void close() throws IOException { private ObjectMetadata createDefaultMetadata() { ObjectMetadata om = new ObjectMetadata(); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setServerSideEncryption(serverSideEncryptionAlgorithm); + om.setSSEAlgorithm(serverSideEncryptionAlgorithm); } return om; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 91a606cf1f..f9e937f8f4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -87,7 +87,7 @@ public class S3AFileSystem extends FileSystem { private long partSize; private TransferManager transfers; private ThreadPoolExecutor threadPoolExecutor; - private int multiPartThreshold; + private long multiPartThreshold; public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); private CannedAccessControlList cannedACL; private String serverSideEncryptionAlgorithm; @@ -191,8 +191,12 @@ public void initialize(URI name, Configuration conf) throws IOException { DEFAULT_ESTABLISH_TIMEOUT)); awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT)); + String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); + if(!signerOverride.isEmpty()) { + awsConf.setSignerOverride(signerOverride); + } - String proxyHost = conf.getTrimmed(PROXY_HOST,""); + String proxyHost = conf.getTrimmed(PROXY_HOST, ""); int proxyPort = conf.getInt(PROXY_PORT, -1); if (!proxyHost.isEmpty()) { awsConf.setProxyHost(proxyHost); @@ -246,7 +250,7 @@ public void initialize(URI name, Configuration conf) throws IOException { maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS); partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - multiPartThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, + multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD); if (partSize < 5 * 1024 * 1024) { @@ -403,7 +407,7 @@ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwr if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) { return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket, key, progress, statistics, cannedACL, - serverSideEncryptionAlgorithm, partSize, (long)multiPartThreshold, + serverSideEncryptionAlgorithm, partSize, multiPartThreshold, threadPoolExecutor), statistics); } // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file @@ -1027,7 +1031,7 @@ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, final ObjectMetadata om = new ObjectMetadata(); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setServerSideEncryption(serverSideEncryptionAlgorithm); + om.setSSEAlgorithm(serverSideEncryptionAlgorithm); } PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile); putObjectRequest.setCannedAcl(cannedACL); @@ -1035,8 +1039,8 @@ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, ProgressListener progressListener = new ProgressListener() { public void progressChanged(ProgressEvent progressEvent) { - switch (progressEvent.getEventCode()) { - case ProgressEvent.PART_COMPLETED_EVENT_CODE: + switch (progressEvent.getEventType()) { + case TRANSFER_PART_COMPLETED_EVENT: statistics.incrementWriteOps(1); break; default: @@ -1091,7 +1095,7 @@ private void copyFile(String srcKey, String dstKey) throws IOException { ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey); final ObjectMetadata dstom = srcom.clone(); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - dstom.setServerSideEncryption(serverSideEncryptionAlgorithm); + dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); } CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey); copyObjectRequest.setCannedAccessControlList(cannedACL); @@ -1099,8 +1103,8 @@ private void copyFile(String srcKey, String dstKey) throws IOException { ProgressListener progressListener = new ProgressListener() { public void progressChanged(ProgressEvent progressEvent) { - switch (progressEvent.getEventCode()) { - case ProgressEvent.PART_COMPLETED_EVENT_CODE: + switch (progressEvent.getEventType()) { + case TRANSFER_PART_COMPLETED_EVENT: statistics.incrementWriteOps(1); break; default: @@ -1187,7 +1191,7 @@ public int read() throws IOException { final ObjectMetadata om = new ObjectMetadata(); om.setContentLength(0L); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setServerSideEncryption(serverSideEncryptionAlgorithm); + om.setSSEAlgorithm(serverSideEncryptionAlgorithm); } PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om); putObjectRequest.setCannedAcl(cannedACL); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java index 2b611b6fdf..3e079f2367 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressEventType; import com.amazonaws.event.ProgressListener; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.CannedAccessControlList; @@ -41,6 +42,8 @@ import java.io.IOException; import java.io.OutputStream; +import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT; +import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT; import static org.apache.hadoop.fs.s3a.Constants.*; public class S3AOutputStream extends OutputStream { @@ -52,7 +55,7 @@ public class S3AOutputStream extends OutputStream { private TransferManager transfers; private Progressable progress; private long partSize; - private int partSizeThreshold; + private long partSizeThreshold; private S3AFileSystem fs; private CannedAccessControlList cannedACL; private FileSystem.Statistics statistics; @@ -76,7 +79,8 @@ public S3AOutputStream(Configuration conf, TransferManager transfers, this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD); + partSizeThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, + DEFAULT_MIN_MULTIPART_THRESHOLD); if (conf.get(BUFFER_DIR, null) != null) { lDirAlloc = new LocalDirAllocator(BUFFER_DIR); @@ -116,7 +120,7 @@ public synchronized void close() throws IOException { try { final ObjectMetadata om = new ObjectMetadata(); if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setServerSideEncryption(serverSideEncryptionAlgorithm); + om.setSSEAlgorithm(serverSideEncryptionAlgorithm); } PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, backupFile); putObjectRequest.setCannedAcl(cannedACL); @@ -184,8 +188,9 @@ public void progressChanged(ProgressEvent progressEvent) { } // There are 3 http ops here, but this should be close enough for now - if (progressEvent.getEventCode() == ProgressEvent.PART_STARTED_EVENT_CODE || - progressEvent.getEventCode() == ProgressEvent.COMPLETED_EVENT_CODE) { + ProgressEventType pet = progressEvent.getEventType(); + if (pet == TRANSFER_PART_STARTED_EVENT || + pet == TRANSFER_COMPLETED_EVENT) { statistics.incrementWriteOps(1); } diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index e0389c05ca..5d45e0ab64 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -288,6 +288,12 @@ If you do any of these: change your credentials immediately! Minimum age in seconds of multipart uploads to purge + + fs.s3a.signing-algorithm + Override the default signing algorithm so legacy + implementations can still be used + + fs.s3a.buffer.dir ${hadoop.tmp.dir}/s3a