From ced438a4bf50fe0ac9072c128e18249e6742956a Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Tue, 15 Sep 2015 10:43:13 -0700 Subject: [PATCH] HDFS-8899. Erasure Coding: use threadpool for EC recovery tasks on DataNode. Contributed by Rakesh R. Change-Id: I9429706ae3c9b10a9274c07b98da6ed54cce192b --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 ++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 ++- .../erasurecode/ErasureCodingWorker.java | 33 +++++++++++++++---- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 39b5adc6a8..acf62cbe72 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -424,3 +424,6 @@ HDFS-7351. Document the HDFS Erasure Coding feature. (umamahesh and Zhe Zhang via wang) + + HDFS-8899. Erasure Coding: use threadpool for EC recovery tasks on DataNode. + (Rakesh R via zhz) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index c4dd496afe..f7cda187a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -400,7 +400,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size"; public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024; public static final String DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis"; - public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s + public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s + public static final String DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY = "dfs.datanode.striped.blockrecovery.threads.size"; + public static final int DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT = 8; public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface"; public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default"; public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index f6a5ece265..56b54f1590 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -38,6 +38,7 @@ import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -92,6 +93,7 @@ public final class ErasureCodingWorker { private final DataNode datanode; private final Configuration conf; + private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL; private ThreadPoolExecutor STRIPED_READ_THREAD_POOL; private final int STRIPED_READ_THRESHOLD_MILLIS; private final int STRIPED_READ_BUFFER_SIZE; @@ -109,6 +111,10 @@ public ErasureCodingWorker(Configuration conf, DataNode datanode) { STRIPED_READ_BUFFER_SIZE = conf.getInt( DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT); + + initializeStripedBlkRecoveryThreadPool(conf.getInt( + DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY, + DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT)); } private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) { @@ -142,6 +148,25 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); } + private void initializeStripedBlkRecoveryThreadPool(int num) { + if (LOG.isDebugEnabled()) { + LOG.debug("Using striped block recovery; pool threads=" + num); + } + STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60, + TimeUnit.SECONDS, new LinkedBlockingQueue(), + new Daemon.DaemonFactory() { + private final AtomicInteger threadIdx = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread t = super.newThread(r); + t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement()); + return t; + } + }); + STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true); + } + /** * Handles the Erasure Coding recovery work commands. * @@ -150,12 +175,8 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) { */ public void processErasureCodingTasks(Collection ecTasks) { for (BlockECRecoveryInfo recoveryInfo : ecTasks) { - try { - new Daemon(new ReconstructAndTransferBlock(recoveryInfo)).start(); - } catch (Throwable e) { - LOG.warn("Failed to recover striped block " + - recoveryInfo.getExtendedBlock().getLocalBlock(), e); - } + STRIPED_BLK_RECOVERY_THREAD_POOL.submit(new ReconstructAndTransferBlock( + recoveryInfo)); } }