HDFS-17067 Use BlockingThreadPoolExecutorService for nnProbingThreadPool in ObserverReadProxy (#5803)

This commit is contained in:
Xing Lin 2023-07-20 10:46:41 -07:00 committed by GitHub
parent c35f31640e
commit 80fefd093f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -27,11 +27,8 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.List;
@ -54,6 +51,7 @@
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -187,18 +185,8 @@ public class ObserverReadProxyProvider<T>
/**
* Threadpool to send the getHAServiceState requests.
*
* One thread running all the time, with up to 4 threads. Idle threads will be killed after
* 1 minute. At most 1024 requests can be submitted before they start to be rejected.
*
* Each hdfs client will have its own ObserverReadProxyProvider. Thus,
* having 1 thread running should be sufficient in most cases.
* We are not expecting to receive a lot of outstanding RPC calls
* from a single hdfs client, thus setting the queue size to 1024.
*/
private final ExecutorService nnProbingThreadPool =
new ThreadPoolExecutor(1, 4, 1L, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(1024));
private final BlockingThreadPoolExecutorService nnProbingThreadPool;
/**
* By default ObserverReadProxyProvider uses
@ -262,6 +250,15 @@ public ObserverReadProxyProvider(
+ "class does not implement {}", uri, ClientProtocol.class.getName());
this.observerReadEnabled = false;
}
/*
* At most 4 threads will be running and each thread will die after 10
* seconds of no use. Up to 132 tasks (4 active + 128 waiting) can be
* submitted simultaneously.
*/
nnProbingThreadPool =
BlockingThreadPoolExecutorService.newInstance(4, 128, 10L, TimeUnit.SECONDS,
"nn-ha-state-probing");
}
public AlignmentContext getAlignmentContext() {
@ -648,6 +645,7 @@ public synchronized void close() throws IOException {
}
}
failoverProxy.close();
nnProbingThreadPool.shutdown();
}
@Override