HADOOP-15961. S3A committers: make sure there's regular progress() calls.

Contributed by lqjacklee.

Change-Id: I13ca153e1e32b21dbe64d6fb25e260e0ff66154d
This commit is contained in:
lqjacklee 2020-02-17 22:06:34 +00:00 committed by Steve Loughran
parent 84f7638840
commit c77fc6971b
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
5 changed files with 79 additions and 13 deletions

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@ -437,13 +438,15 @@ public void revertCommit(SinglePendingCommit commit,
* @param destPath destination path * @param destPath destination path
* @param partition partition/subdir. Not used * @param partition partition/subdir. Not used
* @param uploadPartSize size of upload * @param uploadPartSize size of upload
* @param progress progress callback
* @return a pending upload entry * @return a pending upload entry
* @throws IOException failure * @throws IOException failure
*/ */
public SinglePendingCommit uploadFileToPendingCommit(File localFile, public SinglePendingCommit uploadFileToPendingCommit(File localFile,
Path destPath, Path destPath,
String partition, String partition,
long uploadPartSize) long uploadPartSize,
Progressable progress)
throws IOException { throws IOException {
LOG.debug("Initiating multipart upload from {} to {}", LOG.debug("Initiating multipart upload from {} to {}",
@ -502,6 +505,7 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile,
commitData.bindCommitData(parts); commitData.bindCommitData(parts);
statistics.commitUploaded(length); statistics.commitUploaded(length);
progress.progress();
threw = false; threw = false;
return commitData; return commitData;
} finally { } finally {

View File

@ -712,7 +712,8 @@ protected int commitTaskInternal(final TaskAttemptContext context,
localFile, localFile,
destPath, destPath,
partition, partition,
uploadPartSize); uploadPartSize,
context);
LOG.debug("{}: adding pending commit {}", getRole(), commit); LOG.debug("{}: adding pending commit {}", getRole(), commit);
commits.add(commit); commits.add(commit);
}); });

View File

@ -550,6 +550,8 @@ public void testRestrictedCommitActions() throws Throwable {
conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true); conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
final int uploadPartSize = 5 * 1024 * 1024; final int uploadPartSize = 5 * 1024 * 1024;
ProgressCounter progress = new ProgressCounter();
progress.assertCount("Progress counter should be zero", 0);
Path basePath = methodPath(); Path basePath = methodPath();
Path readOnlyDir = new Path(basePath, "readOnlyDir"); Path readOnlyDir = new Path(basePath, "readOnlyDir");
Path writeableDir = new Path(basePath, "writeableDir"); Path writeableDir = new Path(basePath, "writeableDir");
@ -577,8 +579,9 @@ public void testRestrictedCommitActions() throws Throwable {
forbidden("initiate MultiPartUpload", forbidden("initiate MultiPartUpload",
() -> { () -> {
return operations.uploadFileToPendingCommit(localSrc, return operations.uploadFileToPendingCommit(localSrc,
uploadDest, "", uploadPartSize); uploadDest, "", uploadPartSize, progress);
}); });
progress.assertCount("progress counter not expected.", 0);
// delete the file // delete the file
localSrc.delete(); localSrc.delete();
// create a directory there // create a directory there
@ -596,11 +599,13 @@ public void testRestrictedCommitActions() throws Throwable {
writeCSVData(src); writeCSVData(src);
SinglePendingCommit pending = SinglePendingCommit pending =
fullOperations.uploadFileToPendingCommit(src, dest, "", fullOperations.uploadFileToPendingCommit(src, dest, "",
uploadPartSize); uploadPartSize, progress);
pending.save(fs, new Path(readOnlyDir, pending.save(fs, new Path(readOnlyDir,
name + CommitConstants.PENDING_SUFFIX), true); name + CommitConstants.PENDING_SUFFIX), true);
assertTrue(src.delete()); assertTrue(src.delete());
})); }));
progress.assertCount("Process counter is not expected",
range);
try { try {
// we expect to be able to list all the files here // we expect to be able to list all the files here

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import org.apache.hadoop.util.Progressable;
import static org.junit.Assert.assertEquals;
/**
* A progress callback for testing.
*/
public class ProgressCounter implements Progressable {
private long count;
public void progress() {
count++;
}
public long getCount() {
return count;
}
public void assertCount(String message, int expected) {
assertEquals(message, expected, getCount());
}
}

View File

@ -26,7 +26,6 @@
import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PartETag;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -39,6 +38,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.auth.ProgressCounter;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker; import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
@ -69,6 +69,7 @@ public class ITestCommitOperations extends AbstractCommitITest {
private static final byte[] DATASET = dataset(1000, 'a', 32); private static final byte[] DATASET = dataset(1000, 'a', 32);
private static final String S3A_FACTORY_KEY = String.format( private static final String S3A_FACTORY_KEY = String.format(
COMMITTER_FACTORY_SCHEME_PATTERN, "s3a"); COMMITTER_FACTORY_SCHEME_PATTERN, "s3a");
private ProgressCounter progress;
/** /**
* A compile time flag which allows you to disable failure reset before * A compile time flag which allows you to disable failure reset before
@ -105,6 +106,8 @@ public void setup() throws Exception {
verifyIsMagicCommitFS(getFileSystem()); verifyIsMagicCommitFS(getFileSystem());
// abort,; rethrow on failure // abort,; rethrow on failure
setThrottling(HIGH_THROTTLE, STANDARD_FAILURE_LIMIT); setThrottling(HIGH_THROTTLE, STANDARD_FAILURE_LIMIT);
progress = new ProgressCounter();
progress.assertCount("progress", 0);
} }
@Test @Test
@ -366,7 +369,7 @@ private void commitOrFail(final Path destFile,
private void validateIntermediateAndFinalPaths(Path magicFilePath, private void validateIntermediateAndFinalPaths(Path magicFilePath,
Path destFile) Path destFile)
throws IOException { throws IOException {
assertPathDoesNotExist("dest file was found", destFile); assertPathDoesNotExist("dest file was created", destFile);
} }
/** /**
@ -452,8 +455,10 @@ public void testUploadEmptyFile() throws Throwable {
SinglePendingCommit pendingCommit = SinglePendingCommit pendingCommit =
actions.uploadFileToPendingCommit(tempFile, actions.uploadFileToPendingCommit(tempFile,
dest, null, dest,
DEFAULT_MULTIPART_SIZE); null,
DEFAULT_MULTIPART_SIZE,
progress);
resetFailures(); resetFailures();
assertPathDoesNotExist("pending commit", dest); assertPathDoesNotExist("pending commit", dest);
fullThrottle(); fullThrottle();
@ -461,6 +466,8 @@ public void testUploadEmptyFile() throws Throwable {
resetFailures(); resetFailures();
FileStatus status = verifyPathExists(fs, FileStatus status = verifyPathExists(fs,
"uploaded file commit", dest); "uploaded file commit", dest);
progress.assertCount("Progress counter should be 1.",
1);
assertEquals("File length in " + status, 0, status.getLen()); assertEquals("File length in " + status, 0, status.getLen());
} }
@ -477,10 +484,11 @@ public void testUploadSmallFile() throws Throwable {
assertPathDoesNotExist("test setup", dest); assertPathDoesNotExist("test setup", dest);
SinglePendingCommit pendingCommit = SinglePendingCommit pendingCommit =
actions.uploadFileToPendingCommit(tempFile, actions.uploadFileToPendingCommit(tempFile,
dest, null, dest,
DEFAULT_MULTIPART_SIZE); null,
DEFAULT_MULTIPART_SIZE,
progress);
resetFailures(); resetFailures();
LOG.debug("Precommit validation");
assertPathDoesNotExist("pending commit", dest); assertPathDoesNotExist("pending commit", dest);
fullThrottle(); fullThrottle();
LOG.debug("Postcommit validation"); LOG.debug("Postcommit validation");
@ -488,6 +496,8 @@ public void testUploadSmallFile() throws Throwable {
resetFailures(); resetFailures();
String s = readUTF8(fs, dest, -1); String s = readUTF8(fs, dest, -1);
assertEquals(text, s); assertEquals(text, s);
progress.assertCount("Progress counter should be 1.",
1);
} }
@Test(expected = FileNotFoundException.class) @Test(expected = FileNotFoundException.class)
@ -498,7 +508,9 @@ public void testUploadMissingFile() throws Throwable {
Path dest = methodPath("testUploadMissingile"); Path dest = methodPath("testUploadMissingile");
fullThrottle(); fullThrottle();
actions.uploadFileToPendingCommit(tempFile, dest, null, actions.uploadFileToPendingCommit(tempFile, dest, null,
DEFAULT_MULTIPART_SIZE); DEFAULT_MULTIPART_SIZE, progress);
progress.assertCount("Progress counter should be 1.",
1);
} }
@Test @Test
@ -598,7 +610,8 @@ public void testBulkCommitFiles() throws Throwable {
SinglePendingCommit commit1 = SinglePendingCommit commit1 =
actions.uploadFileToPendingCommit(localFile, actions.uploadFileToPendingCommit(localFile,
destination, null, destination, null,
DEFAULT_MULTIPART_SIZE); DEFAULT_MULTIPART_SIZE,
progress);
commits.add(commit1); commits.add(commit1);
} }
resetFailures(); resetFailures();