From 81e533de8f7075b920aa20f00c5d626d0f36e8e2 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Sat, 12 Dec 2020 09:37:13 -0800 Subject: [PATCH] HADOOP-16080. hadoop-aws does not work with hadoop-client-api. Contributed by Chao Sun (#2522) --- .../apache/hadoop/fs/cosn/CosNFileSystem.java | 6 +++--- .../BlockingThreadPoolExecutorService.java | 5 +---- .../util/SemaphoredDelegatingExecutor.java | 19 +++++++++---------- .../hadoop/fs/TestFileSystemCaching.java | 6 ++++-- .../fs/aliyun/oss/AliyunOSSFileSystem.java | 5 ++--- .../dev-support/findbugs-exclude.xml | 6 ++++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 3 ++- .../hadoop/fs/s3a/impl/DeleteOperation.java | 4 +++- .../hadoop/fs/s3a/impl/StoreContext.java | 12 +++++++----- .../fs/s3a/impl/StoreContextBuilder.java | 7 +++---- .../fs/s3a/s3guard/DynamoDBMetadataStore.java | 19 +++++++++++-------- ...TestBlockingThreadPoolExecutorService.java | 4 ++-- .../s3a/impl/ITestPartialRenamesDeletes.java | 12 +++++++----- 13 files changed, 60 insertions(+), 48 deletions(-) diff --git a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileSystem.java b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileSystem.java index 94b10ad440..4dda126073 100644 --- a/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileSystem.java +++ b/hadoop-cloud-storage-project/hadoop-cos/src/main/java/org/apache/hadoop/fs/cosn/CosNFileSystem.java @@ -28,11 +28,11 @@ import java.util.Map; import java.util.HashMap; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -71,8 +71,8 @@ public class CosNFileSystem extends FileSystem { private String owner = "Unknown"; private String group = "Unknown"; - private ListeningExecutorService boundedIOThreadPool; - private ListeningExecutorService boundedCopyThreadPool; + private ExecutorService boundedIOThreadPool; + private ExecutorService boundedCopyThreadPool; public CosNFileSystem() { } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java index 451b5f5d6c..d08e84f99d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java @@ -28,8 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; - import org.apache.hadoop.classification.InterfaceAudience; /** @@ -105,8 +103,7 @@ public final class BlockingThreadPoolExecutorService private BlockingThreadPoolExecutorService(int permitCount, ThreadPoolExecutor eventProcessingExecutor) { - super(MoreExecutors.listeningDecorator(eventProcessingExecutor), - permitCount, false); + super(eventProcessingExecutor, permitCount, false); this.eventProcessingExecutor = eventProcessingExecutor; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java index 1f29ba8b5e..45b9a98c68 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java @@ -18,10 +18,8 @@ package org.apache.hadoop.util; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.classification.InterfaceAudience; @@ -29,6 +27,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -49,10 +48,10 @@ import java.util.concurrent.TimeoutException; @SuppressWarnings("NullableProblems") @InterfaceAudience.Private public class SemaphoredDelegatingExecutor extends - ForwardingListeningExecutorService { + ForwardingExecutorService { private final Semaphore queueingPermits; - private final ListeningExecutorService executorDelegatee; + private final ExecutorService executorDelegatee; private final int permitCount; /** @@ -62,7 +61,7 @@ public class SemaphoredDelegatingExecutor extends * @param fair should the semaphore be "fair" */ public SemaphoredDelegatingExecutor( - ListeningExecutorService executorDelegatee, + ExecutorService executorDelegatee, int permitCount, boolean fair) { this.permitCount = permitCount; @@ -71,7 +70,7 @@ public class SemaphoredDelegatingExecutor extends } @Override - protected ListeningExecutorService delegate() { + protected ExecutorService delegate() { return executorDelegatee; } @@ -102,7 +101,7 @@ public class SemaphoredDelegatingExecutor extends } @Override - public ListenableFuture submit(Callable task) { + public Future submit(Callable task) { try { queueingPermits.acquire(); } catch (InterruptedException e) { @@ -113,7 +112,7 @@ public class SemaphoredDelegatingExecutor extends } @Override - public ListenableFuture submit(Runnable task, T result) { + public Future submit(Runnable task, T result) { try { queueingPermits.acquire(); } catch (InterruptedException e) { @@ -124,7 +123,7 @@ public class SemaphoredDelegatingExecutor extends } @Override - public ListenableFuture submit(Runnable task) { + public Future submit(Runnable task) { try { queueingPermits.acquire(); } catch (InterruptedException e) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java index 01abeaaf57..67a933bb9e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -423,9 +424,10 @@ public class TestFileSystemCaching extends HadoopTestBase { // only one instance can be created at a time. URI uri = new URI("blocking://a"); ListeningExecutorService pool = - BlockingThreadPoolExecutorService.newInstance(count * 2, 0, + MoreExecutors.listeningDecorator( + BlockingThreadPoolExecutorService.newInstance(count * 2, 0, 10, TimeUnit.SECONDS, - "creation-threads"); + "creation-threads")); // submit a set of requests to create an FS instance. // the semaphore will block all but one, and that will block until diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java index 66fbd89b32..759484e423 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -78,8 +77,8 @@ public class AliyunOSSFileSystem extends FileSystem { private int maxKeys; private int maxReadAheadPartNumber; private int maxConcurrentCopyTasksPerDir; - private ListeningExecutorService boundedThreadPool; - private ListeningExecutorService boundedCopyThreadPool; + private ExecutorService boundedThreadPool; + private ExecutorService boundedCopyThreadPool; private static final PathFilter DEFAULT_FILTER = new PathFilter() { @Override diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml index b51053603f..2bdd63d9bf 100644 --- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml @@ -74,4 +74,10 @@ + + + + + + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index a02947b015..beb4b97523 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -42,6 +42,7 @@ import java.util.Optional; import java.util.Set; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -243,7 +244,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private long partSize; private boolean enableMultiObjectsDelete; private TransferManager transfers; - private ListeningExecutorService boundedThreadPool; + private ExecutorService boundedThreadPool; private ThreadPoolExecutor unboundedThreadPool; private int executorCapacity; private long multiPartThreshold; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java index b47c7ad3aa..2292179b3f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsResult; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,7 +208,8 @@ public class DeleteOperation extends ExecutingStoreOperation { "page size out of range: %s", pageSize); this.pageSize = pageSize; metadataStore = context.getMetadataStore(); - executor = context.createThrottledExecutor(1); + executor = MoreExecutors.listeningDecorator( + context.createThrottledExecutor(1)); } public long getFilesDeleted() { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java index b375c31bb5..17eb27f426 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java @@ -23,9 +23,11 @@ import java.io.IOException; import java.net.URI; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -126,7 +128,7 @@ public class StoreContext { final Configuration configuration, final String username, final UserGroupInformation owner, - final ListeningExecutorService executor, + final ExecutorService executor, final int executorCapacity, final Invoker invoker, final S3AInstrumentation instrumentation, @@ -143,7 +145,7 @@ public class StoreContext { this.configuration = configuration; this.username = username; this.owner = owner; - this.executor = executor; + this.executor = MoreExecutors.listeningDecorator(executor); this.executorCapacity = executorCapacity; this.invoker = invoker; this.instrumentation = instrumentation; @@ -178,7 +180,7 @@ public class StoreContext { return username; } - public ListeningExecutorService getExecutor() { + public ExecutorService getExecutor() { return executor; } @@ -305,7 +307,7 @@ public class StoreContext { * @param capacity maximum capacity of this executor. * @return an executor for submitting work. */ - public ListeningExecutorService createThrottledExecutor(int capacity) { + public ExecutorService createThrottledExecutor(int capacity) { return new SemaphoredDelegatingExecutor(executor, capacity, true); } @@ -315,7 +317,7 @@ public class StoreContext { * {@link #executorCapacity}. * @return a new executor for exclusive use by the caller. */ - public ListeningExecutorService createThrottledExecutor() { + public ExecutorService createThrottledExecutor() { return createThrottledExecutor(executorCapacity); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java index 44353b9943..4d789d417d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContextBuilder.java @@ -19,8 +19,7 @@ package org.apache.hadoop.fs.s3a.impl; import java.net.URI; - -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.Invoker; @@ -46,7 +45,7 @@ public class StoreContextBuilder { private UserGroupInformation owner; - private ListeningExecutorService executor; + private ExecutorService executor; private int executorCapacity; @@ -96,7 +95,7 @@ public class StoreContextBuilder { } public StoreContextBuilder setExecutor( - final ListeningExecutorService ex) { + final ExecutorService ex) { this.executor = ex; return this; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 8ec848839e..976c661483 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -66,6 +66,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -452,7 +453,8 @@ public class DynamoDBMetadataStore implements MetadataStore, StoreContext context = owner.createStoreContext(); instrumentation = context.getInstrumentation().getS3GuardInstrumentation(); username = context.getUsername(); - executor = context.createThrottledExecutor(); + executor = MoreExecutors.listeningDecorator( + context.createThrottledExecutor()); ttlTimeProvider = Preconditions.checkNotNull( context.getTimeProvider(), "ttlTimeProvider must not be null"); @@ -507,13 +509,14 @@ public class DynamoDBMetadataStore implements MetadataStore, // the executor capacity for work. int executorCapacity = intOption(conf, EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1); - executor = BlockingThreadPoolExecutorService.newInstance( - executorCapacity, - executorCapacity * 2, - longOption(conf, KEEPALIVE_TIME, - DEFAULT_KEEPALIVE_TIME, 0), - TimeUnit.SECONDS, - "s3a-ddb-" + tableName); + executor = MoreExecutors.listeningDecorator( + BlockingThreadPoolExecutorService.newInstance( + executorCapacity, + executorCapacity * 2, + longOption(conf, KEEPALIVE_TIME, + DEFAULT_KEEPALIVE_TIME, 0), + TimeUnit.SECONDS, + "s3a-ddb-" + tableName)); initDataAccessRetries(conf); this.ttlTimeProvider = ttlTp; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java index ce20cc3aa2..55423273b9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.s3a; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.StopWatch; @@ -33,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -70,7 +70,7 @@ public class ITestBlockingThreadPoolExecutorService { @Test public void testSubmitCallable() throws Exception { ensureCreated(); - ListenableFuture f = tpe.submit(callableSleeper); + Future f = tpe.submit(callableSleeper); Integer v = f.get(); assertEquals(SOME_VALUE, v); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java index d6c0b1deae..3c67ddb4f1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java @@ -34,6 +34,7 @@ import java.util.stream.Stream; import com.amazonaws.services.s3.model.MultiObjectDeleteException; import org.apache.hadoop.thirdparty.com.google.common.base.Charsets; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;; import org.assertj.core.api.Assertions; import org.junit.Test; import org.junit.runner.RunWith; @@ -126,11 +127,12 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase { * For submitting work. */ private static final ListeningExecutorService EXECUTOR = - BlockingThreadPoolExecutorService.newInstance( - EXECUTOR_THREAD_COUNT, - EXECUTOR_THREAD_COUNT * 2, - 30, TimeUnit.SECONDS, - "test-operations"); + MoreExecutors.listeningDecorator( + BlockingThreadPoolExecutorService.newInstance( + EXECUTOR_THREAD_COUNT, + EXECUTOR_THREAD_COUNT * 2, + 30, TimeUnit.SECONDS, + "test-operations")); /**