diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java index 42cd17bbd4..7be1646703 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSCopyFileTask.java @@ -32,13 +32,16 @@ public class AliyunOSSCopyFileTask implements Runnable { private AliyunOSSFileSystemStore store; private String srcKey; + private long srcLen; private String dstKey; private AliyunOSSCopyFileContext copyFileContext; public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store, - String srcKey, String dstKey, AliyunOSSCopyFileContext copyFileContext) { + String srcKey, long srcLen, + String dstKey, AliyunOSSCopyFileContext copyFileContext) { this.store = store; this.srcKey = srcKey; + this.srcLen = srcLen; this.dstKey = dstKey; this.copyFileContext = copyFileContext; } @@ -47,7 +50,7 @@ public AliyunOSSCopyFileTask(AliyunOSSFileSystemStore store, public void run() { boolean fail = false; try { - store.copyFile(srcKey, dstKey); + store.copyFile(srcKey, srcLen, dstKey); } catch (Exception e) { LOG.warn("Exception thrown when copy from " + srcKey + " to " + dstKey + ", exception: " + e); diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index 9ed8d93364..cc7efed0c8 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -653,7 +653,7 @@ public boolean rename(Path srcPath, Path dstPath) throws IOException { if (srcStatus.isDirectory()) { copyDirectory(srcPath, dstPath); } else { - copyFile(srcPath, dstPath); + copyFile(srcPath, srcStatus.getLen(), dstPath); } return srcPath.equals(dstPath) || delete(srcPath, true); @@ -664,13 +664,14 @@ public boolean rename(Path srcPath, Path dstPath) throws IOException { * (the caller should make sure srcPath is a file and dstPath is valid) * * @param srcPath source path. + * @param srcLen source path length if it is a file. * @param dstPath destination path. * @return true if file is successfully copied. */ - private boolean copyFile(Path srcPath, Path dstPath) { + private boolean copyFile(Path srcPath, long srcLen, Path dstPath) { String srcKey = pathToKey(srcPath); String dstKey = pathToKey(dstPath); - return store.copyFile(srcKey, dstKey); + return store.copyFile(srcKey, srcLen, dstKey); } /** @@ -709,7 +710,8 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { //copy operation just copies metadata, oss will support shallow copy executorService.execute(new AliyunOSSCopyFileTask( - store, objectSummary.getKey(), newKey, copyFileContext)); + store, objectSummary.getKey(), + objectSummary.getSize(), newKey, copyFileContext)); copiesToFinish++; // No need to call lock() here. // It's ok to copy one more file if the rename operation failed diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java index 0d169da49a..77bd684b92 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java @@ -87,7 +87,6 @@ public class AliyunOSSFileSystemStore { private OSSClient ossClient; private String bucketName; private long uploadPartSize; - private long multipartThreshold; private int maxKeys; private String serverSideEncryptionAlgorithm; @@ -155,21 +154,10 @@ public void initialize(URI uri, Configuration conf, String user, ossClient = new OSSClient(endPoint, provider, clientConf); uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf, MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT); - multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, - MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); + serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, ""); - if (multipartThreshold < 5 * 1024 * 1024) { - LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB"); - multipartThreshold = 5 * 1024 * 1024; - } - - if (multipartThreshold > 1024 * 1024 * 1024) { - LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB"); - multipartThreshold = 1024 * 1024 * 1024; - } - bucketName = uri.getHost(); String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT); @@ -305,18 +293,19 @@ public void storeEmptyFile(String key) throws IOException { * Copy an object from source key to destination key. * * @param srcKey source key. + * @param srcLen source file length. * @param dstKey destination key. * @return true if file is successfully copied. */ - public boolean copyFile(String srcKey, String dstKey) { - ObjectMetadata objectMeta = - ossClient.getObjectMetadata(bucketName, srcKey); - statistics.incrementReadOps(1); - long contentLength = objectMeta.getContentLength(); - if (contentLength <= multipartThreshold) { + public boolean copyFile(String srcKey, long srcLen, String dstKey) { + try { + //1, try single copy first return singleCopy(srcKey, dstKey); - } else { - return multipartCopy(srcKey, contentLength, dstKey); + } catch (Exception e) { + //2, if failed(shallow copy not supported), then multi part copy + LOG.debug("Exception thrown when copy file: " + srcKey + + ", exception: " + e + ", use multipartCopy instead"); + return multipartCopy(srcKey, srcLen, dstKey); } } diff --git a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md index 425fee5183..30f7e9d93a 100644 --- a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md +++ b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md @@ -282,7 +282,9 @@ please raise your issues with them. fs.oss.multipart.upload.threshold 20971520 - Minimum size in bytes before we start a multipart uploads or copy. + Minimum size in bytes before we start a multipart uploads or copy. + Notice: This property is deprecated and will be removed in further version. + diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java index 47a2494f44..14bd8e6328 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java @@ -49,7 +49,6 @@ public class TestAliyunOSSBlockOutputStream { @Before public void setUp() throws Exception { Configuration conf = new Configuration(); - conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024); conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024); conf.setInt(IO_CHUNK_BUFFER_SIZE, conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0)); diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java index 81759a554f..d428e7cd4a 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemContract.java @@ -178,13 +178,13 @@ public void testRenameDirectoryCopyTaskAllSucceed() throws Exception { AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore(); store.storeEmptyFile("test/new/file/"); AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask( - store, srcOne.toUri().getPath().substring(1), + store, srcOne.toUri().getPath().substring(1), data.length, dstOne.toUri().getPath().substring(1), copyFileContext); oneCopyFileTask.run(); assumeFalse(copyFileContext.isCopyFailure()); AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask( - store, srcOne.toUri().getPath().substring(1), + store, srcOne.toUri().getPath().substring(1), data.length, dstTwo.toUri().getPath().substring(1), copyFileContext); twoCopyFileTask.run(); assumeFalse(copyFileContext.isCopyFailure()); @@ -212,13 +212,13 @@ public void testRenameDirectoryCopyTaskAllFailed() throws Exception { AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore(); //store.storeEmptyFile("test/new/file/"); AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask( - store, srcOne.toUri().getPath().substring(1), + store, srcOne.toUri().getPath().substring(1), data.length, dstOne.toUri().getPath().substring(1), copyFileContext); oneCopyFileTask.run(); assumeTrue(copyFileContext.isCopyFailure()); AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask( - store, srcOne.toUri().getPath().substring(1), + store, srcOne.toUri().getPath().substring(1), data.length, dstTwo.toUri().getPath().substring(1), copyFileContext); twoCopyFileTask.run(); assumeTrue(copyFileContext.isCopyFailure()); @@ -247,19 +247,19 @@ public void testRenameDirectoryCopyTaskPartialFailed() throws Exception { AliyunOSSFileSystemStore store = ((AliyunOSSFileSystem)this.fs).getStore(); //store.storeEmptyFile("test/new/file/"); AliyunOSSCopyFileTask oneCopyFileTask = new AliyunOSSCopyFileTask( - store, srcOne.toUri().getPath().substring(1), + store, srcOne.toUri().getPath().substring(1), data.length, dstOne.toUri().getPath().substring(1), copyFileContext); oneCopyFileTask.run(); assumeTrue(copyFileContext.isCopyFailure()); AliyunOSSCopyFileTask twoCopyFileTask = new AliyunOSSCopyFileTask( - store, srcOne.toUri().getPath().substring(1), + store, srcOne.toUri().getPath().substring(1), data.length, dstTwo.toUri().getPath().substring(1), copyFileContext); twoCopyFileTask.run(); assumeTrue(copyFileContext.isCopyFailure()); AliyunOSSCopyFileTask threeCopyFileTask = new AliyunOSSCopyFileTask( - store, srcOne.toUri().getPath().substring(1), + store, srcOne.toUri().getPath().substring(1), data.length, dstThree.toUri().getPath().substring(1), copyFileContext); threeCopyFileTask.run(); assumeTrue(copyFileContext.isCopyFailure()); diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java index 7f4bac2564..ebd555d643 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSFileSystemStore.java @@ -78,8 +78,6 @@ public static void checkSettings() throws Exception { protected void writeRenameReadCompare(Path path, long len) throws IOException, NoSuchAlgorithmException { - // If len > fs.oss.multipart.upload.threshold, - // we'll use a multipart upload copy MessageDigest digest = MessageDigest.getInstance("MD5"); OutputStream out = new BufferedOutputStream( new DigestOutputStream(fs.create(path, false), digest)); @@ -92,10 +90,12 @@ protected void writeRenameReadCompare(Path path, long len) assertTrue("Exists", fs.exists(path)); Path copyPath = path.suffix(".copy"); + long start = System.currentTimeMillis(); fs.rename(path, copyPath); assertTrue("Copy exists", fs.exists(copyPath)); - + // should less than 1 second + assertTrue(System.currentTimeMillis() - start < 1000); // Download file from Aliyun OSS and compare the digest against the original MessageDigest digest2 = MessageDigest.getInstance("MD5"); InputStream in = new BufferedInputStream( @@ -119,7 +119,7 @@ public void testSmallUpload() throws IOException, NoSuchAlgorithmException { @Test public void testLargeUpload() throws IOException, NoSuchAlgorithmException { - // Multipart upload, multipart copy - writeRenameReadCompare(new Path("/test/xlarge"), 52428800L); // 50MB byte + // Multipart upload, shallow copy + writeRenameReadCompare(new Path("/test/xlarge"), 2147483648L); // 2GB } } diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java index e9a98b3307..1e3daae385 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/contract/TestAliyunOSSContractDistCp.java @@ -32,7 +32,6 @@ public class TestAliyunOSSContractDistCp extends AbstractContractDistCpTest { @Override protected Configuration createConfiguration() { Configuration newConf = super.createConfiguration(); - newConf.setLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MULTIPART_SETTING); newConf.setLong(MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_SETTING); return newConf; }