HADOOP-16080. hadoop-aws does not work with hadoop-client-api. Contributed by Chao Sun (#2522)

This commit is contained in:
Chao Sun 2020-12-12 09:37:13 -08:00 committed by GitHub
parent 1a63df86e2
commit 81e533de8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 60 additions and 48 deletions

View File

@ -28,11 +28,11 @@ import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -71,8 +71,8 @@ public class CosNFileSystem extends FileSystem {
private String owner = "Unknown"; private String owner = "Unknown";
private String group = "Unknown"; private String group = "Unknown";
private ListeningExecutorService boundedIOThreadPool; private ExecutorService boundedIOThreadPool;
private ListeningExecutorService boundedCopyThreadPool; private ExecutorService boundedCopyThreadPool;
public CosNFileSystem() { public CosNFileSystem() {
} }

View File

@ -28,8 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
/** /**
@ -105,8 +103,7 @@ public final class BlockingThreadPoolExecutorService
private BlockingThreadPoolExecutorService(int permitCount, private BlockingThreadPoolExecutorService(int permitCount,
ThreadPoolExecutor eventProcessingExecutor) { ThreadPoolExecutor eventProcessingExecutor) {
super(MoreExecutors.listeningDecorator(eventProcessingExecutor), super(eventProcessingExecutor, permitCount, false);
permitCount, false);
this.eventProcessingExecutor = eventProcessingExecutor; this.eventProcessingExecutor = eventProcessingExecutor;
} }

View File

@ -18,10 +18,8 @@
package org.apache.hadoop.util; package org.apache.hadoop.util;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ForwardingExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -29,6 +27,7 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -49,10 +48,10 @@ import java.util.concurrent.TimeoutException;
@SuppressWarnings("NullableProblems") @SuppressWarnings("NullableProblems")
@InterfaceAudience.Private @InterfaceAudience.Private
public class SemaphoredDelegatingExecutor extends public class SemaphoredDelegatingExecutor extends
ForwardingListeningExecutorService { ForwardingExecutorService {
private final Semaphore queueingPermits; private final Semaphore queueingPermits;
private final ListeningExecutorService executorDelegatee; private final ExecutorService executorDelegatee;
private final int permitCount; private final int permitCount;
/** /**
@ -62,7 +61,7 @@ public class SemaphoredDelegatingExecutor extends
* @param fair should the semaphore be "fair" * @param fair should the semaphore be "fair"
*/ */
public SemaphoredDelegatingExecutor( public SemaphoredDelegatingExecutor(
ListeningExecutorService executorDelegatee, ExecutorService executorDelegatee,
int permitCount, int permitCount,
boolean fair) { boolean fair) {
this.permitCount = permitCount; this.permitCount = permitCount;
@ -71,7 +70,7 @@ public class SemaphoredDelegatingExecutor extends
} }
@Override @Override
protected ListeningExecutorService delegate() { protected ExecutorService delegate() {
return executorDelegatee; return executorDelegatee;
} }
@ -102,7 +101,7 @@ public class SemaphoredDelegatingExecutor extends
} }
@Override @Override
public <T> ListenableFuture<T> submit(Callable<T> task) { public <T> Future<T> submit(Callable<T> task) {
try { try {
queueingPermits.acquire(); queueingPermits.acquire();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -113,7 +112,7 @@ public class SemaphoredDelegatingExecutor extends
} }
@Override @Override
public <T> ListenableFuture<T> submit(Runnable task, T result) { public <T> Future<T> submit(Runnable task, T result) {
try { try {
queueingPermits.acquire(); queueingPermits.acquire();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -124,7 +123,7 @@ public class SemaphoredDelegatingExecutor extends
} }
@Override @Override
public ListenableFuture<?> submit(Runnable task) { public Future<?> submit(Runnable task) {
try { try {
queueingPermits.acquire(); queueingPermits.acquire();
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -423,9 +424,10 @@ public class TestFileSystemCaching extends HadoopTestBase {
// only one instance can be created at a time. // only one instance can be created at a time.
URI uri = new URI("blocking://a"); URI uri = new URI("blocking://a");
ListeningExecutorService pool = ListeningExecutorService pool =
MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance(count * 2, 0, BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
10, TimeUnit.SECONDS, 10, TimeUnit.SECONDS,
"creation-threads"); "creation-threads"));
// submit a set of requests to create an FS instance. // submit a set of requests to create an FS instance.
// the semaphore will block all but one, and that will block until // the semaphore will block all but one, and that will block until

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -78,8 +77,8 @@ public class AliyunOSSFileSystem extends FileSystem {
private int maxKeys; private int maxKeys;
private int maxReadAheadPartNumber; private int maxReadAheadPartNumber;
private int maxConcurrentCopyTasksPerDir; private int maxConcurrentCopyTasksPerDir;
private ListeningExecutorService boundedThreadPool; private ExecutorService boundedThreadPool;
private ListeningExecutorService boundedCopyThreadPool; private ExecutorService boundedCopyThreadPool;
private static final PathFilter DEFAULT_FILTER = new PathFilter() { private static final PathFilter DEFAULT_FILTER = new PathFilter() {
@Override @Override

View File

@ -74,4 +74,10 @@
<Bug pattern="SF_SWITCH_FALLTHROUGH"/> <Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match> </Match>
<!-- Ignore return value from this method call -->
<Match>
<Class name="org.apache.hadoop.fs.s3a.impl.StoreContext"/>
<Method name="submit"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
</Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -42,6 +42,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -243,7 +244,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private long partSize; private long partSize;
private boolean enableMultiObjectsDelete; private boolean enableMultiObjectsDelete;
private TransferManager transfers; private TransferManager transfers;
private ListeningExecutorService boundedThreadPool; private ExecutorService boundedThreadPool;
private ThreadPoolExecutor unboundedThreadPool; private ThreadPoolExecutor unboundedThreadPool;
private int executorCapacity; private int executorCapacity;
private long multiPartThreshold; private long multiPartThreshold;

View File

@ -28,6 +28,7 @@ import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult; import com.amazonaws.services.s3.model.DeleteObjectsResult;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -207,7 +208,8 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
"page size out of range: %s", pageSize); "page size out of range: %s", pageSize);
this.pageSize = pageSize; this.pageSize = pageSize;
metadataStore = context.getMetadataStore(); metadataStore = context.getMetadataStore();
executor = context.createThrottledExecutor(1); executor = MoreExecutors.listeningDecorator(
context.createThrottledExecutor(1));
} }
public long getFilesDeleted() { public long getFilesDeleted() {

View File

@ -23,9 +23,11 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -126,7 +128,7 @@ public class StoreContext {
final Configuration configuration, final Configuration configuration,
final String username, final String username,
final UserGroupInformation owner, final UserGroupInformation owner,
final ListeningExecutorService executor, final ExecutorService executor,
final int executorCapacity, final int executorCapacity,
final Invoker invoker, final Invoker invoker,
final S3AInstrumentation instrumentation, final S3AInstrumentation instrumentation,
@ -143,7 +145,7 @@ public class StoreContext {
this.configuration = configuration; this.configuration = configuration;
this.username = username; this.username = username;
this.owner = owner; this.owner = owner;
this.executor = executor; this.executor = MoreExecutors.listeningDecorator(executor);
this.executorCapacity = executorCapacity; this.executorCapacity = executorCapacity;
this.invoker = invoker; this.invoker = invoker;
this.instrumentation = instrumentation; this.instrumentation = instrumentation;
@ -178,7 +180,7 @@ public class StoreContext {
return username; return username;
} }
public ListeningExecutorService getExecutor() { public ExecutorService getExecutor() {
return executor; return executor;
} }
@ -305,7 +307,7 @@ public class StoreContext {
* @param capacity maximum capacity of this executor. * @param capacity maximum capacity of this executor.
* @return an executor for submitting work. * @return an executor for submitting work.
*/ */
public ListeningExecutorService createThrottledExecutor(int capacity) { public ExecutorService createThrottledExecutor(int capacity) {
return new SemaphoredDelegatingExecutor(executor, return new SemaphoredDelegatingExecutor(executor,
capacity, true); capacity, true);
} }
@ -315,7 +317,7 @@ public class StoreContext {
* {@link #executorCapacity}. * {@link #executorCapacity}.
* @return a new executor for exclusive use by the caller. * @return a new executor for exclusive use by the caller.
*/ */
public ListeningExecutorService createThrottledExecutor() { public ExecutorService createThrottledExecutor() {
return createThrottledExecutor(executorCapacity); return createThrottledExecutor(executorCapacity);
} }

View File

@ -19,8 +19,7 @@
package org.apache.hadoop.fs.s3a.impl; package org.apache.hadoop.fs.s3a.impl;
import java.net.URI; import java.net.URI;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.Invoker;
@ -46,7 +45,7 @@ public class StoreContextBuilder {
private UserGroupInformation owner; private UserGroupInformation owner;
private ListeningExecutorService executor; private ExecutorService executor;
private int executorCapacity; private int executorCapacity;
@ -96,7 +95,7 @@ public class StoreContextBuilder {
} }
public StoreContextBuilder setExecutor( public StoreContextBuilder setExecutor(
final ListeningExecutorService ex) { final ExecutorService ex) {
this.executor = ex; this.executor = ex;
return this; return this;
} }

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -452,7 +453,8 @@ public class DynamoDBMetadataStore implements MetadataStore,
StoreContext context = owner.createStoreContext(); StoreContext context = owner.createStoreContext();
instrumentation = context.getInstrumentation().getS3GuardInstrumentation(); instrumentation = context.getInstrumentation().getS3GuardInstrumentation();
username = context.getUsername(); username = context.getUsername();
executor = context.createThrottledExecutor(); executor = MoreExecutors.listeningDecorator(
context.createThrottledExecutor());
ttlTimeProvider = Preconditions.checkNotNull( ttlTimeProvider = Preconditions.checkNotNull(
context.getTimeProvider(), context.getTimeProvider(),
"ttlTimeProvider must not be null"); "ttlTimeProvider must not be null");
@ -507,13 +509,14 @@ public class DynamoDBMetadataStore implements MetadataStore,
// the executor capacity for work. // the executor capacity for work.
int executorCapacity = intOption(conf, int executorCapacity = intOption(conf,
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1); EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
executor = BlockingThreadPoolExecutorService.newInstance( executor = MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance(
executorCapacity, executorCapacity,
executorCapacity * 2, executorCapacity * 2,
longOption(conf, KEEPALIVE_TIME, longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0), DEFAULT_KEEPALIVE_TIME, 0),
TimeUnit.SECONDS, TimeUnit.SECONDS,
"s3a-ddb-" + tableName); "s3a-ddb-" + tableName));
initDataAccessRetries(conf); initDataAccessRetries(conf);
this.ttlTimeProvider = ttlTp; this.ttlTimeProvider = ttlTp;

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.StopWatch;
@ -33,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -70,7 +70,7 @@ public class ITestBlockingThreadPoolExecutorService {
@Test @Test
public void testSubmitCallable() throws Exception { public void testSubmitCallable() throws Exception {
ensureCreated(); ensureCreated();
ListenableFuture<Integer> f = tpe.submit(callableSleeper); Future<Integer> f = tpe.submit(callableSleeper);
Integer v = f.get(); Integer v = f.get();
assertEquals(SOME_VALUE, v); assertEquals(SOME_VALUE, v);
} }

View File

@ -34,6 +34,7 @@ import java.util.stream.Stream;
import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets; import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -126,11 +127,12 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
* For submitting work. * For submitting work.
*/ */
private static final ListeningExecutorService EXECUTOR = private static final ListeningExecutorService EXECUTOR =
MoreExecutors.listeningDecorator(
BlockingThreadPoolExecutorService.newInstance( BlockingThreadPoolExecutorService.newInstance(
EXECUTOR_THREAD_COUNT, EXECUTOR_THREAD_COUNT,
EXECUTOR_THREAD_COUNT * 2, EXECUTOR_THREAD_COUNT * 2,
30, TimeUnit.SECONDS, 30, TimeUnit.SECONDS,
"test-operations"); "test-operations"));
/** /**