From c77fc6971b5194c9dae184703caa87da271a85eb Mon Sep 17 00:00:00 2001 From: lqjacklee Date: Mon, 17 Feb 2020 22:06:34 +0000 Subject: [PATCH] HADOOP-15961. S3A committers: make sure there's regular progress() calls. Contributed by lqjacklee. Change-Id: I13ca153e1e32b21dbe64d6fb25e260e0ff66154d --- .../fs/s3a/commit/CommitOperations.java | 6 ++- .../s3a/commit/staging/StagingCommitter.java | 3 +- .../hadoop/fs/s3a/auth/ITestAssumeRole.java | 9 +++- .../hadoop/fs/s3a/auth/ProgressCounter.java | 43 +++++++++++++++++++ .../fs/s3a/commit/ITestCommitOperations.java | 31 +++++++++---- 5 files changed, 79 insertions(+), 13 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ProgressCounter.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java index 7d7be95c14..8592ad4901 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java @@ -53,6 +53,7 @@ import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.Progressable; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; @@ -437,13 +438,15 @@ public void revertCommit(SinglePendingCommit commit, * @param destPath destination path * @param partition partition/subdir. Not used * @param uploadPartSize size of upload + * @param progress progress callback * @return a pending upload entry * @throws IOException failure */ public SinglePendingCommit uploadFileToPendingCommit(File localFile, Path destPath, String partition, - long uploadPartSize) + long uploadPartSize, + Progressable progress) throws IOException { LOG.debug("Initiating multipart upload from {} to {}", @@ -502,6 +505,7 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile, commitData.bindCommitData(parts); statistics.commitUploaded(length); + progress.progress(); threw = false; return commitData; } finally { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java index 6cc9e48852..7eca1b4265 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java @@ -712,7 +712,8 @@ protected int commitTaskInternal(final TaskAttemptContext context, localFile, destPath, partition, - uploadPartSize); + uploadPartSize, + context); LOG.debug("{}: adding pending commit {}", getRole(), commit); commits.add(commit); }); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java index 82589fa6aa..cf935d2859 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java @@ -550,6 +550,8 @@ public void testRestrictedCommitActions() throws Throwable { conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true); final int uploadPartSize = 5 * 1024 * 1024; + ProgressCounter progress = new ProgressCounter(); + progress.assertCount("Progress counter should be zero", 0); Path basePath = methodPath(); Path readOnlyDir = new Path(basePath, "readOnlyDir"); Path writeableDir = new Path(basePath, "writeableDir"); @@ -577,8 +579,9 @@ public void testRestrictedCommitActions() throws Throwable { forbidden("initiate MultiPartUpload", () -> { return operations.uploadFileToPendingCommit(localSrc, - uploadDest, "", uploadPartSize); + uploadDest, "", uploadPartSize, progress); }); + progress.assertCount("progress counter not expected.", 0); // delete the file localSrc.delete(); // create a directory there @@ -596,11 +599,13 @@ public void testRestrictedCommitActions() throws Throwable { writeCSVData(src); SinglePendingCommit pending = fullOperations.uploadFileToPendingCommit(src, dest, "", - uploadPartSize); + uploadPartSize, progress); pending.save(fs, new Path(readOnlyDir, name + CommitConstants.PENDING_SUFFIX), true); assertTrue(src.delete()); })); + progress.assertCount("Process counter is not expected", + range); try { // we expect to be able to list all the files here diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ProgressCounter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ProgressCounter.java new file mode 100644 index 0000000000..15a5715209 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ProgressCounter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.auth; + +import org.apache.hadoop.util.Progressable; + +import static org.junit.Assert.assertEquals; + +/** + * A progress callback for testing. + */ +public class ProgressCounter implements Progressable { + + private long count; + + public void progress() { + count++; + } + + public long getCount() { + return count; + } + + public void assertCount(String message, int expected) { + assertEquals(message, expected, getCount()); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java index 74fe45d72d..d199337df1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java @@ -26,7 +26,6 @@ import com.amazonaws.services.s3.model.PartETag; import com.google.common.collect.Lists; -import org.junit.Assume; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.auth.ProgressCounter; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker; import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; @@ -69,6 +69,7 @@ public class ITestCommitOperations extends AbstractCommitITest { private static final byte[] DATASET = dataset(1000, 'a', 32); private static final String S3A_FACTORY_KEY = String.format( COMMITTER_FACTORY_SCHEME_PATTERN, "s3a"); + private ProgressCounter progress; /** * A compile time flag which allows you to disable failure reset before @@ -105,6 +106,8 @@ public void setup() throws Exception { verifyIsMagicCommitFS(getFileSystem()); // abort,; rethrow on failure setThrottling(HIGH_THROTTLE, STANDARD_FAILURE_LIMIT); + progress = new ProgressCounter(); + progress.assertCount("progress", 0); } @Test @@ -366,7 +369,7 @@ private void commitOrFail(final Path destFile, private void validateIntermediateAndFinalPaths(Path magicFilePath, Path destFile) throws IOException { - assertPathDoesNotExist("dest file was found", destFile); + assertPathDoesNotExist("dest file was created", destFile); } /** @@ -452,8 +455,10 @@ public void testUploadEmptyFile() throws Throwable { SinglePendingCommit pendingCommit = actions.uploadFileToPendingCommit(tempFile, - dest, null, - DEFAULT_MULTIPART_SIZE); + dest, + null, + DEFAULT_MULTIPART_SIZE, + progress); resetFailures(); assertPathDoesNotExist("pending commit", dest); fullThrottle(); @@ -461,6 +466,8 @@ public void testUploadEmptyFile() throws Throwable { resetFailures(); FileStatus status = verifyPathExists(fs, "uploaded file commit", dest); + progress.assertCount("Progress counter should be 1.", + 1); assertEquals("File length in " + status, 0, status.getLen()); } @@ -477,10 +484,11 @@ public void testUploadSmallFile() throws Throwable { assertPathDoesNotExist("test setup", dest); SinglePendingCommit pendingCommit = actions.uploadFileToPendingCommit(tempFile, - dest, null, - DEFAULT_MULTIPART_SIZE); + dest, + null, + DEFAULT_MULTIPART_SIZE, + progress); resetFailures(); - LOG.debug("Precommit validation"); assertPathDoesNotExist("pending commit", dest); fullThrottle(); LOG.debug("Postcommit validation"); @@ -488,6 +496,8 @@ public void testUploadSmallFile() throws Throwable { resetFailures(); String s = readUTF8(fs, dest, -1); assertEquals(text, s); + progress.assertCount("Progress counter should be 1.", + 1); } @Test(expected = FileNotFoundException.class) @@ -498,7 +508,9 @@ public void testUploadMissingFile() throws Throwable { Path dest = methodPath("testUploadMissingile"); fullThrottle(); actions.uploadFileToPendingCommit(tempFile, dest, null, - DEFAULT_MULTIPART_SIZE); + DEFAULT_MULTIPART_SIZE, progress); + progress.assertCount("Progress counter should be 1.", + 1); } @Test @@ -598,7 +610,8 @@ public void testBulkCommitFiles() throws Throwable { SinglePendingCommit commit1 = actions.uploadFileToPendingCommit(localFile, destination, null, - DEFAULT_MULTIPART_SIZE); + DEFAULT_MULTIPART_SIZE, + progress); commits.add(commit1); } resetFailures();