HADOOP-16080. hadoop-aws does not work with hadoop-client-api. (#2522)

Contributed by Chao Sun.

(Cherry-picked via PR #2575)
This commit is contained in:
Chao Sun 2021-03-09 12:01:29 -08:00 committed by GitHub
parent b2a565629d
commit 176bd88890
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 61 additions and 49 deletions

View File

@ -28,11 +28,11 @@
import java.util.HashMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -71,8 +71,8 @@ public class CosNFileSystem extends FileSystem {
private String owner = "Unknown";
private String group = "Unknown";
private ListeningExecutorService boundedIOThreadPool;
private ListeningExecutorService boundedCopyThreadPool;
private ExecutorService boundedIOThreadPool;
private ExecutorService boundedCopyThreadPool;
public CosNFileSystem() {
}

View File

@ -28,8 +28,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.classification.InterfaceAudience;
/**
@ -105,8 +103,7 @@ public Thread newThread(Runnable r) {
private BlockingThreadPoolExecutorService(int permitCount,
ThreadPoolExecutor eventProcessingExecutor) {
super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
permitCount, false);
super(eventProcessingExecutor, permitCount, false);
this.eventProcessingExecutor = eventProcessingExecutor;
}

View File

@ -18,10 +18,8 @@
package org.apache.hadoop.util;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.statistics.DurationTracker;
@ -31,6 +29,7 @@
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ -55,10 +54,10 @@
@SuppressWarnings("NullableProblems")
@InterfaceAudience.Private
public class SemaphoredDelegatingExecutor extends
ForwardingListeningExecutorService {
ForwardingExecutorService {
private final Semaphore queueingPermits;
private final ListeningExecutorService executorDelegatee;
private final ExecutorService executorDelegatee;
private final int permitCount;
private final DurationTrackerFactory trackerFactory;
@ -70,7 +69,7 @@ public class SemaphoredDelegatingExecutor extends
* @param trackerFactory duration tracker factory.
*/
public SemaphoredDelegatingExecutor(
ListeningExecutorService executorDelegatee,
ExecutorService executorDelegatee,
int permitCount,
boolean fair,
DurationTrackerFactory trackerFactory) {
@ -89,14 +88,14 @@ public SemaphoredDelegatingExecutor(
* @param fair should the semaphore be "fair"
*/
public SemaphoredDelegatingExecutor(
ListeningExecutorService executorDelegatee,
ExecutorService executorDelegatee,
int permitCount,
boolean fair) {
this(executorDelegatee, permitCount, fair, null);
}
@Override
protected ListeningExecutorService delegate() {
protected ExecutorService delegate() {
return executorDelegatee;
}
@ -127,7 +126,7 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
}
@Override
public <T> ListenableFuture<T> submit(Callable<T> task) {
public <T> Future<T> submit(Callable<T> task) {
try (DurationTracker ignored =
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
queueingPermits.acquire();
@ -139,7 +138,7 @@ public <T> ListenableFuture<T> submit(Callable<T> task) {
}
@Override
public <T> ListenableFuture<T> submit(Runnable task, T result) {
public <T> Future<T> submit(Runnable task, T result) {
try (DurationTracker ignored =
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
queueingPermits.acquire();
@ -151,7 +150,7 @@ public <T> ListenableFuture<T> submit(Runnable task, T result) {
}
@Override
public ListenableFuture<?> submit(Runnable task) {
public Future<?> submit(Runnable task) {
try (DurationTracker ignored =
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
queueingPermits.acquire();

View File

@ -27,6 +27,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -423,9 +424,10 @@ private void createFileSystems(final FileSystem.Cache cache, final int count)
// only one instance can be created at a time.
URI uri = new URI("blocking://a");
ListeningExecutorService pool =
BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
10, TimeUnit.SECONDS,
"creation-threads");
"creation-threads"));
// submit a set of requests to create an FS instance.
// the semaphore will block all but one, and that will block until

View File

@ -27,7 +27,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@ -78,8 +77,8 @@ public class AliyunOSSFileSystem extends FileSystem {
private int maxKeys;
private int maxReadAheadPartNumber;
private int maxConcurrentCopyTasksPerDir;
private ListeningExecutorService boundedThreadPool;
private ListeningExecutorService boundedCopyThreadPool;
private ExecutorService boundedThreadPool;
private ExecutorService boundedCopyThreadPool;
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override

View File

@ -84,4 +84,10 @@
<Bug pattern="VO_VOLATILE_INCREMENT"/>
</Match>
<!-- Ignore return value from this method call -->
<Match>
<Class name="org.apache.hadoop.fs.s3a.impl.StoreContext"/>
<Method name="submit"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
</Match>
</FindBugsFilter>

View File

@ -42,6 +42,7 @@
import java.util.Set;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -262,7 +263,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
private ListeningExecutorService boundedThreadPool;
private ExecutorService boundedThreadPool;
private ThreadPoolExecutor unboundedThreadPool;
private int executorCapacity;
private long multiPartThreshold;

View File

@ -28,6 +28,7 @@
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -207,7 +208,8 @@ public DeleteOperation(final StoreContext context,
"page size out of range: %s", pageSize);
this.pageSize = pageSize;
metadataStore = context.getMetadataStore();
executor = context.createThrottledExecutor(1);
executor = MoreExecutors.listeningDecorator(
context.createThrottledExecutor(1));
}
public long getFilesDeleted() {

View File

@ -23,9 +23,11 @@
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -127,7 +129,7 @@ public StoreContext(
final Configuration configuration,
final String username,
final UserGroupInformation owner,
final ListeningExecutorService executor,
final ExecutorService executor,
final int executorCapacity,
final Invoker invoker,
final S3AStatisticsContext instrumentation,
@ -144,7 +146,7 @@ public StoreContext(
this.configuration = configuration;
this.username = username;
this.owner = owner;
this.executor = executor;
this.executor = MoreExecutors.listeningDecorator(executor);
this.executorCapacity = executorCapacity;
this.invoker = invoker;
this.instrumentation = instrumentation;
@ -179,7 +181,7 @@ public String getUsername() {
return username;
}
public ListeningExecutorService getExecutor() {
public ExecutorService getExecutor() {
return executor;
}
@ -310,7 +312,7 @@ public void incrementGauge(Statistic statistic, long count) {
* @param capacity maximum capacity of this executor.
* @return an executor for submitting work.
*/
public ListeningExecutorService createThrottledExecutor(int capacity) {
public ExecutorService createThrottledExecutor(int capacity) {
return new SemaphoredDelegatingExecutor(executor,
capacity, true);
}
@ -320,7 +322,7 @@ public ListeningExecutorService createThrottledExecutor(int capacity) {
* {@link #executorCapacity}.
* @return a new executor for exclusive use by the caller.
*/
public ListeningExecutorService createThrottledExecutor() {
public ExecutorService createThrottledExecutor() {
return createThrottledExecutor(executorCapacity);
}

View File

@ -19,8 +19,7 @@
package org.apache.hadoop.fs.s3a.impl;
import java.net.URI;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Invoker;
@ -46,7 +45,7 @@ public class StoreContextBuilder {
private UserGroupInformation owner;
private ListeningExecutorService executor;
private ExecutorService executor;
private int executorCapacity;
@ -96,7 +95,7 @@ public StoreContextBuilder setOwner(final UserGroupInformation ugi) {
}
public StoreContextBuilder setExecutor(
final ListeningExecutorService ex) {
final ExecutorService ex) {
this.executor = ex;
return this;
}

View File

@ -67,6 +67,7 @@
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -454,7 +455,8 @@ void bindToOwnerFilesystem(final S3AFileSystem fs) {
instrumentation = context.getInstrumentation()
.getS3GuardInstrumentation();
username = context.getUsername();
executor = context.createThrottledExecutor();
executor = MoreExecutors.listeningDecorator(
context.createThrottledExecutor());
ttlTimeProvider = Preconditions.checkNotNull(
context.getTimeProvider(),
"ttlTimeProvider must not be null");
@ -509,13 +511,14 @@ public void initialize(Configuration config,
// the executor capacity for work.
int executorCapacity = intOption(conf,
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
executor = BlockingThreadPoolExecutorService.newInstance(
executorCapacity,
executorCapacity * 2,
longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0),
TimeUnit.SECONDS,
"s3a-ddb-" + tableName);
executor = MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance(
executorCapacity,
executorCapacity * 2,
longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0),
TimeUnit.SECONDS,
"s3a-ddb-" + tableName));
initDataAccessRetries(conf);
this.ttlTimeProvider = ttlTp;

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.StopWatch;
@ -33,6 +32,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@ -70,7 +70,7 @@ public static void afterClass() throws Exception {
@Test
public void testSubmitCallable() throws Exception {
ensureCreated();
ListenableFuture<Integer> f = tpe.submit(callableSleeper);
Future<Integer> f = tpe.submit(callableSleeper);
Integer v = f.get();
assertEquals(SOME_VALUE, v);
}

View File

@ -34,6 +34,7 @@
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -128,11 +129,12 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
* For submitting work.
*/
private static final ListeningExecutorService EXECUTOR =
BlockingThreadPoolExecutorService.newInstance(
EXECUTOR_THREAD_COUNT,
EXECUTOR_THREAD_COUNT * 2,
30, TimeUnit.SECONDS,
"test-operations");
MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance(
EXECUTOR_THREAD_COUNT,
EXECUTOR_THREAD_COUNT * 2,
30, TimeUnit.SECONDS,
"test-operations"));
/**