HDFS-10909. De-duplicate code in ErasureCodingWorker#initializeStripedReadThreadPool and DFSClient#initThreadsNumForStripedReads. (Manoj Govindassamy via lei)

This commit is contained in:
Lei Xu 2016-11-02 16:45:25 -07:00
parent 9b0c17f852
commit b59206190e
3 changed files with 61 additions and 42 deletions

View File

@ -2800,37 +2800,17 @@ public void rejectedExecution(Runnable runnable,
/**
* Create thread pool for parallel reading in striped layout,
* STRIPED_READ_THREAD_POOL, if it does not already exist.
* @param num Number of threads for striped reads thread pool.
* @param numThreads Number of threads for striped reads thread pool.
*/
private void initThreadsNumForStripedReads(int num) {
assert num > 0;
private void initThreadsNumForStripedReads(int numThreads) {
assert numThreads > 0;
if (STRIPED_READ_THREAD_POOL != null) {
return;
}
synchronized (DFSClient.class) {
if (STRIPED_READ_THREAD_POOL == null) {
STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("stripedRead-" + threadIndex.getAndIncrement());
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
LOG.info("Execution for striped reading rejected, "
+ "Executing in current thread");
// will run in the current thread
super.rejectedExecution(runnable, e);
}
});
STRIPED_READ_THREAD_POOL = DFSUtilClient.getThreadPoolExecutor(1,
numThreads, 60, "StripedRead-", true);
STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.primitives.SignedBytes;
import org.apache.hadoop.conf.Configuration;
@ -51,6 +52,7 @@
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
@ -81,6 +83,10 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
@ -776,4 +782,48 @@ private static boolean getClientDataTransferTcpNoDelay(Configuration conf) {
DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY,
DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT);
}
/**
* Utility to create a {@link ThreadPoolExecutor}.
*
* @param corePoolSize - min threads in the pool, even if idle
* @param maxPoolSize - max threads in the pool
* @param keepAliveTimeSecs - max seconds beyond which excess idle threads
* will be terminated
* @param threadNamePrefix - name prefix for the pool threads
* @param runRejectedExec - when true, rejected tasks from
* ThreadPoolExecutor are run in the context of calling thread
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
int maxPoolSize, long keepAliveTimeSecs, String threadNamePrefix,
boolean runRejectedExec) {
Preconditions.checkArgument(corePoolSize > 0);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName(threadNamePrefix + threadIndex.getAndIncrement());
return t;
}
});
if (runRejectedExec) {
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor
.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
LOG.info(threadNamePrefix + " task is rejected by " +
"ThreadPoolExecutor. Executing it in current thread.");
// will run in the current thread
super.rejectedExecution(runnable, e);
}
});
}
return threadPoolExecutor;
}
}

View File

@ -20,13 +20,13 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -89,22 +89,11 @@ public void rejectedExecution(Runnable runnable,
stripedReadPool.allowCoreThreadTimeOut(true);
}
private void initializeStripedBlkReconstructionThreadPool(int num) {
LOG.debug("Using striped block reconstruction; pool threads={}", num);
stripedReconstructionPool = new ThreadPoolExecutor(2, num, 60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIdx = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("stripedBlockReconstruction-"
+ threadIdx.getAndIncrement());
return t;
}
});
private void initializeStripedBlkReconstructionThreadPool(int numThreads) {
LOG.debug("Using striped block reconstruction; pool threads={}",
numThreads);
stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(2,
numThreads, 60, "StripedBlockReconstruction-", false);
stripedReconstructionPool.allowCoreThreadTimeOut(true);
}