diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index 99a60dbf66..afe7242780 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -71,7 +71,6 @@ * Aliyun OSS, used to access OSS blob system in a filesystem style. */ public class AliyunOSSFileSystem extends FileSystem { - private static final Logger LOG = LoggerFactory.getLogger(AliyunOSSFileSystem.class); private URI uri; @@ -560,18 +559,18 @@ public FileStatus[] listStatus(Path path) throws IOException { * Used to create an empty file that represents an empty directory. * * @param bucket the bucket this directory belongs to - * @param objectName directory path + * @param key directory path * @return true if directory successfully created * @throws IOException */ - private boolean mkdir(final String bucket, final String objectName) + private boolean mkdir(final String bucket, final String key) throws IOException { - String dirName = objectName; + String dirName = key; ObjectMetadata dirMeta = new ObjectMetadata(); byte[] buffer = new byte[0]; ByteArrayInputStream in = new ByteArrayInputStream(buffer); dirMeta.setContentLength(0); - if (!objectName.endsWith("/")) { + if (!key.endsWith("/")) { dirName += "/"; } try { diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java index 654b81dab4..1e16df9edb 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java @@ -84,6 +84,9 @@ public AliyunOSSOutputStream(Configuration conf, OSSClient client, partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY, MULTIPART_UPLOAD_SIZE_DEFAULT); + if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) { + partSize = MIN_MULTIPART_UPLOAD_PART_SIZE; + } partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT); @@ -151,6 +154,12 @@ private void uploadObject() throws IOException { private void multipartUploadObject() throws IOException { File object = tmpFile.getAbsoluteFile(); long dataLen = object.length(); + long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize); + int partNum = (int)(dataLen / realPartSize); + if (dataLen % realPartSize != 0) { + partNum += 1; + } + InitiateMultipartUploadRequest initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(bucketName, key); ObjectMetadata meta = new ObjectMetadata(); @@ -161,14 +170,6 @@ private void multipartUploadObject() throws IOException { initiateMultipartUploadRequest.setObjectMetadata(meta); InitiateMultipartUploadResult initiateMultipartUploadResult = ossClient.initiateMultipartUpload(initiateMultipartUploadRequest); - int partNum = (int)(dataLen / partSize); - if (dataLen % partSize != 0) { - partNum += 1; - } - if (partNum > MULTIPART_UPLOAD_PART_NUM_LIMIT) { - throw new IOException("Number of parts " + partNum + " should not be " + - "bigger than limit " + MULTIPART_UPLOAD_PART_NUM_LIMIT); - } List partETags = new ArrayList(); String uploadId = initiateMultipartUploadResult.getUploadId(); @@ -177,10 +178,10 @@ private void multipartUploadObject() throws IOException { // TODO: Optimize this, avoid opening the object multiple times FileInputStream fis = new FileInputStream(object); try { - long skipBytes = partSize * i; + long skipBytes = realPartSize * i; AliyunOSSUtils.skipFully(fis, skipBytes); - long size = (partSize < dataLen - skipBytes) ? - partSize : dataLen - skipBytes; + long size = (realPartSize < dataLen - skipBytes) ? + realPartSize : dataLen - skipBytes; UploadPartRequest uploadPartRequest = new UploadPartRequest(); uploadPartRequest.setBucketName(bucketName); uploadPartRequest.setKey(key); diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java index 9acde00606..d54dd9c44a 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java @@ -28,6 +28,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT; + /** * Utility methods for Aliyun OSS code. */ @@ -172,4 +174,17 @@ public static void skipFully(InputStream is, long n) throws IOException { "to EOF."); } } + + /** + * Calculate a proper size of multipart piece. If minPartSize + * is too small, the number of multipart pieces may exceed the limit of + * {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}. + * @param contentLength the size of file. + * @param minPartSize the minimum size of multipart piece. + * @return a revisional size of multipart piece. + */ + public static long calculatePartSize(long contentLength, long minPartSize) { + long tmpPartSize = contentLength / MULTIPART_UPLOAD_PART_NUM_LIMIT + 1; + return Math.max(minPartSize, tmpPartSize); + } } diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java index 0bc6d578ed..99022754fb 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java @@ -79,7 +79,7 @@ private Constants() { "fs.oss.multipart.upload.size"; public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024; - public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 1000; + public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000; // Minimum size in bytes before we start a multipart uploads or copy public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY = @@ -108,4 +108,6 @@ private Constants() { public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024; public static final String FS_OSS = "oss"; + public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L; + } diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java index 3951529dc6..b33ab99cfc 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestOSSOutputStream.java @@ -68,4 +68,23 @@ public void testRegularUpload() throws IOException { public void testMultiPartUpload() throws IOException { ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024); } + + @Test + public void testMultiPartUploadLimit() throws IOException { + long partSize1 = AliyunOSSUtils.calculatePartSize(10 * 1024, 100 * 1024); + assert(10 * 1024 / partSize1 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); + + long partSize2 = AliyunOSSUtils.calculatePartSize(200 * 1024, 100 * 1024); + assert(200 * 1024 / partSize2 < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); + + long partSize3 = AliyunOSSUtils.calculatePartSize(10000 * 100 * 1024, + 100 * 1024); + assert(10000 * 100 * 1024 / partSize3 + < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); + + long partSize4 = AliyunOSSUtils.calculatePartSize(10001 * 100 * 1024, + 100 * 1024); + assert(10001 * 100 * 1024 / partSize4 + < Constants.MULTIPART_UPLOAD_PART_NUM_LIMIT); + } }