From 3fade865ce84dcf68bcd7de5a5ed1c7d904796e9 Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Wed, 14 Nov 2018 12:58:57 +0800 Subject: [PATCH] HADOOP-15917. AliyunOSS: fix incorrect ReadOps and WriteOps in statistics. Contributed by Jinhu Wu. --- .../fs/aliyun/oss/AliyunOSSFileSystem.java | 4 -- .../aliyun/oss/AliyunOSSFileSystemStore.java | 24 +++++-- .../markdown/tools/hadoop-aliyun/index.md | 5 ++ .../oss/TestAliyunOSSBlockOutputStream.java | 70 ++++++++++++++++--- 4 files changed, 84 insertions(+), 19 deletions(-) diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index 4fbb6fb8b1..9c4435c11f 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -405,7 +405,6 @@ public FileStatus[] listStatus(Path path) throws IOException { ObjectListing objects = store.listObjects(key, maxKeys, null, false); while (true) { - statistics.incrementReadOps(1); for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) { String objKey = objectSummary.getKey(); if (objKey.equals(key + "/")) { @@ -446,7 +445,6 @@ public FileStatus[] listStatus(Path path) throws IOException { } String nextMarker = objects.getNextMarker(); objects = store.listObjects(key, maxKeys, nextMarker, false); - statistics.incrementReadOps(1); } else { break; } @@ -694,7 +692,6 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { new SemaphoredDelegatingExecutor(boundedCopyThreadPool, maxConcurrentCopyTasksPerDir, true)); ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true); - statistics.incrementReadOps(1); // Copy files from src folder to dst int copiesToFinish = 0; while (true) { @@ -717,7 +714,6 @@ private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException { if (objects.isTruncated()) { String nextMarker = objects.getNextMarker(); objects = store.listObjects(srcKey, maxKeys, nextMarker, true); - statistics.incrementReadOps(1); } else { break; } diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java index 7639eb398c..4fc1325278 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java @@ -175,6 +175,7 @@ public void initialize(URI uri, Configuration conf, CannedAccessControlList cannedACL = CannedAccessControlList.valueOf(cannedACLName); ossClient.setBucketAcl(bucketName, cannedACL); + statistics.incrementWriteOps(1); } maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT); @@ -216,6 +217,7 @@ public void deleteObjects(List keysToDelete) throws IOException { // Here, we choose the simple mode to do batch delete. deleteRequest.setQuiet(true); DeleteObjectsResult result = ossClient.deleteObjects(deleteRequest); + statistics.incrementWriteOps(1); deleteFailed = result.getDeletedObjects(); tries++; if (tries == retry) { @@ -268,11 +270,13 @@ public void deleteDirs(String key) throws IOException { */ public ObjectMetadata getObjectMetadata(String key) { try { - return ossClient.getObjectMetadata(bucketName, key); - } catch (OSSException osse) { - return null; - } finally { + ObjectMetadata objectMeta = ossClient.getObjectMetadata(bucketName, key); statistics.incrementReadOps(1); + return objectMeta; + } catch (OSSException osse) { + LOG.error("Exception thrown when get object meta: " + + key + ", exception: " + osse); + return null; } } @@ -289,6 +293,7 @@ public void storeEmptyFile(String key) throws IOException { dirMeta.setContentLength(0); try { ossClient.putObject(bucketName, key, in, dirMeta); + statistics.incrementWriteOps(1); } finally { in.close(); } @@ -304,6 +309,7 @@ public void storeEmptyFile(String key) throws IOException { public boolean copyFile(String srcKey, String dstKey) { ObjectMetadata objectMeta = ossClient.getObjectMetadata(bucketName, srcKey); + statistics.incrementReadOps(1); long contentLength = objectMeta.getContentLength(); if (contentLength <= multipartThreshold) { return singleCopy(srcKey, dstKey); @@ -323,6 +329,7 @@ public boolean copyFile(String srcKey, String dstKey) { private boolean singleCopy(String srcKey, String dstKey) { CopyObjectResult copyResult = ossClient.copyObject(bucketName, srcKey, bucketName, dstKey); + statistics.incrementWriteOps(1); LOG.debug(copyResult.getETag()); return true; } @@ -372,6 +379,7 @@ private boolean multipartCopy(String srcKey, long contentLength, UploadPartCopyResult partCopyResult = ossClient.uploadPartCopy(partCopyRequest); statistics.incrementWriteOps(1); + statistics.incrementBytesWritten(size); partETags.add(partCopyResult.getPartETag()); } CompleteMultipartUploadRequest completeMultipartUploadRequest = @@ -408,6 +416,7 @@ public void uploadObject(String key, File file) throws IOException { PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta); LOG.debug(result.getETag()); statistics.incrementWriteOps(1); + statistics.incrementBytesWritten(file.length()); } finally { fis.close(); } @@ -449,7 +458,9 @@ public InputStream retrieve(String key, long byteStart, long byteEnd) { try { GetObjectRequest request = new GetObjectRequest(bucketName, key); request.setRange(byteStart, byteEnd); - return ossClient.getObject(request).getObjectContent(); + InputStream in = ossClient.getObject(request).getObjectContent(); + statistics.incrementReadOps(1); + return in; } catch (OSSException | ClientException e) { LOG.error("Exception thrown when store retrieves key: " + key + ", exception: " + e); @@ -480,6 +491,7 @@ public void purge(String prefix) throws IOException { for (OSSObjectSummary object : objects.getObjectSummaries()) { key = object.getKey(); ossClient.deleteObject(bucketName, key); + statistics.incrementWriteOps(1); } for (String dir: objects.getCommonPrefixes()) { @@ -604,6 +616,8 @@ public PartETag uploadPart(File file, String key, String uploadId, int idx) uploadRequest.setPartSize(file.length()); uploadRequest.setPartNumber(idx); UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest); + statistics.incrementWriteOps(1); + statistics.incrementBytesWritten(file.length()); return uploadResult.getPartETag(); } catch (Exception e) { LOG.debug("Failed to upload "+ file.getPath() +", " + diff --git a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md index 0c3131d49c..87aa90bf89 100644 --- a/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md +++ b/hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md @@ -117,6 +117,11 @@ please raise your issues with them. + + fs.oss.impl + org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem + + fs.oss.assumed.role.arn diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java index 6fe6f03107..c3387a3d84 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java @@ -32,6 +32,7 @@ import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT; import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; +import static org.junit.Assert.assertEquals; /** * Tests regular and multi-part upload functionality for @@ -74,24 +75,73 @@ public void testZeroByteUpload() throws IOException { @Test public void testRegularUpload() throws IOException { - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 - 1); - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024); - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024 + 1); + long size = 1024 * 1024; + FileSystem.Statistics statistics = + FileSystem.getStatistics("oss", AliyunOSSFileSystem.class); + // This test is a little complicated for statistics, lifecycle is + // generateTestFile + // fs.create(getFileStatus) read 1 + // output stream write write 1 + // path exists(fs.exists) read 1 + // verifyReceivedData + // fs.open(getFileStatus) read 1 + // input stream read read 2(part size is 512K) + // fs.delete + // getFileStatus & delete & exists & create fake dir read 2, write 2 + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1); + assertEquals(7, statistics.getReadOps()); + assertEquals(size - 1, statistics.getBytesRead()); + assertEquals(3, statistics.getWriteOps()); + assertEquals(size - 1, statistics.getBytesWritten()); + + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size); + assertEquals(14, statistics.getReadOps()); + assertEquals(2 * size - 1, statistics.getBytesRead()); + assertEquals(6, statistics.getWriteOps()); + assertEquals(2 * size - 1, statistics.getBytesWritten()); + + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1); + + assertEquals(22, statistics.getReadOps()); + assertEquals(3 * size, statistics.getBytesRead()); + assertEquals(10, statistics.getWriteOps()); + assertEquals(3 * size, statistics.getBytesWritten()); } @Test public void testMultiPartUpload() throws IOException { - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), - 6 * 1024 * 1024 - 1); - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 * 1024); - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), - 6 * 1024 * 1024 + 1); + long size = 6 * 1024 * 1024; + FileSystem.Statistics statistics = + FileSystem.getStatistics("oss", AliyunOSSFileSystem.class); + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1); + assertEquals(17, statistics.getReadOps()); + assertEquals(size - 1, statistics.getBytesRead()); + assertEquals(8, statistics.getWriteOps()); + assertEquals(size - 1, statistics.getBytesWritten()); + + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size); + assertEquals(34, statistics.getReadOps()); + assertEquals(2 * size - 1, statistics.getBytesRead()); + assertEquals(16, statistics.getWriteOps()); + assertEquals(2 * size - 1, statistics.getBytesWritten()); + + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1); + assertEquals(52, statistics.getReadOps()); + assertEquals(3 * size, statistics.getBytesRead()); + assertEquals(25, statistics.getWriteOps()); + assertEquals(3 * size, statistics.getBytesWritten()); } @Test public void testMultiPartUploadConcurrent() throws IOException { - ContractTestUtils.createAndVerifyFile(fs, getTestPath(), - 50 * 1024 * 1024 - 1); + long size = 50 * 1024 * 1024 - 1; + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size); + FileSystem.Statistics statistics = + FileSystem.getStatistics("oss", AliyunOSSFileSystem.class); + assertEquals(105, statistics.getReadOps()); + assertEquals(size, statistics.getBytesRead()); + assertEquals(52, statistics.getWriteOps()); + assertEquals(size, statistics.getBytesWritten()); } @Test