diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 93c0ff09b3..0128b07bf9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -2800,37 +2800,17 @@ public void rejectedExecution(Runnable runnable, /** * Create thread pool for parallel reading in striped layout, * STRIPED_READ_THREAD_POOL, if it does not already exist. - * @param num Number of threads for striped reads thread pool. + * @param numThreads Number of threads for striped reads thread pool. */ - private void initThreadsNumForStripedReads(int num) { - assert num > 0; + private void initThreadsNumForStripedReads(int numThreads) { + assert numThreads > 0; if (STRIPED_READ_THREAD_POOL != null) { return; } synchronized (DFSClient.class) { if (STRIPED_READ_THREAD_POOL == null) { - STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, - TimeUnit.SECONDS, new SynchronousQueue(), - new Daemon.DaemonFactory() { - private final AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = super.newThread(r); - t.setName("stripedRead-" + threadIndex.getAndIncrement()); - return t; - } - }, - new ThreadPoolExecutor.CallerRunsPolicy() { - @Override - public void rejectedExecution(Runnable runnable, - ThreadPoolExecutor e) { - LOG.info("Execution for striped reading rejected, " - + "Executing in current thread"); - // will run in the current thread - super.rejectedExecution(runnable, e); - } - }); + STRIPED_READ_THREAD_POOL = DFSUtilClient.getThreadPoolExecutor(1, + numThreads, 60, "StripedRead-", true); STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index b93632aaec..d267530106 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.primitives.SignedBytes; import org.apache.hadoop.conf.Configuration; @@ -51,6 +52,7 @@ import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.KMSUtil; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -81,6 +83,10 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY; @@ -776,4 +782,48 @@ private static boolean getClientDataTransferTcpNoDelay(Configuration conf) { DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY, DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT); } + + /** + * Utility to create a {@link ThreadPoolExecutor}. + * + * @param corePoolSize - min threads in the pool, even if idle + * @param maxPoolSize - max threads in the pool + * @param keepAliveTimeSecs - max seconds beyond which excess idle threads + * will be terminated + * @param threadNamePrefix - name prefix for the pool threads + * @param runRejectedExec - when true, rejected tasks from + * ThreadPoolExecutor are run in the context of calling thread + * @return ThreadPoolExecutor + */ + public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize, + int maxPoolSize, long keepAliveTimeSecs, String threadNamePrefix, + boolean runRejectedExec) { + Preconditions.checkArgument(corePoolSize > 0); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, + maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS, + new SynchronousQueue(), new Daemon.DaemonFactory() { + private final AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName(threadNamePrefix + threadIndex.getAndIncrement()); + return t; + } + }); + if (runRejectedExec) { + threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor + .CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable runnable, + ThreadPoolExecutor e) { + LOG.info(threadNamePrefix + " task is rejected by " + + "ThreadPoolExecutor. Executing it in current thread."); + // will run in the current thread + super.rejectedExecution(runnable, e); + } + }); + } + return threadPoolExecutor; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index aacbb2d0e2..1492e5dd44 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -20,13 +20,13 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import java.util.Collection; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -89,22 +89,11 @@ public void rejectedExecution(Runnable runnable, stripedReadPool.allowCoreThreadTimeOut(true); } - private void initializeStripedBlkReconstructionThreadPool(int num) { - LOG.debug("Using striped block reconstruction; pool threads={}", num); - stripedReconstructionPool = new ThreadPoolExecutor(2, num, 60, - TimeUnit.SECONDS, - new LinkedBlockingQueue(), - new Daemon.DaemonFactory() { - private final AtomicInteger threadIdx = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread t = super.newThread(r); - t.setName("stripedBlockReconstruction-" - + threadIdx.getAndIncrement()); - return t; - } - }); + private void initializeStripedBlkReconstructionThreadPool(int numThreads) { + LOG.debug("Using striped block reconstruction; pool threads={}", + numThreads); + stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(2, + numThreads, 60, "StripedBlockReconstruction-", false); stripedReconstructionPool.allowCoreThreadTimeOut(true); }