From b6b259066fd56cbeb0186adeba613d7841f2d61f Mon Sep 17 00:00:00 2001 From: Moditha Hewasinghage <33624668+modithah@users.noreply.github.com> Date: Wed, 19 Jul 2023 11:03:41 +0200 Subject: [PATCH] HADOOP-18757. S3A Committer only finalizes the commits in a single thread (#5706) Contributed by Moditha Hewasinghage --- .../org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java | 2 +- .../staging/integration/ITestStagingCommitProtocolFailure.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java index c93d2d8f73..2a529fc864 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java @@ -236,7 +236,7 @@ private ExecutorService buildThreadPool( .setDaemon(true) .setNameFormat(THREAD_PREFIX + jobId + "-%d") .build(); - return new HadoopThreadPoolExecutor(0, numThreads, + return new HadoopThreadPoolExecutor(numThreads, numThreads, THREAD_KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java index a6d2c57d1d..08b6c21a86 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; @@ -54,6 +55,7 @@ protected Configuration createConfiguration() { conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY); conf.set(FS_S3A_COMMITTER_NAME, InternalCommitterConstants.COMMITTER_NAME_STAGING); + disableFilesystemCaching(conf); return conf; }