HDFS-8899. Erasure Coding: use threadpool for EC recovery tasks on DataNode. Contributed by Rakesh R.

Change-Id: I9429706ae3c9b10a9274c07b98da6ed54cce192b
This commit is contained in:
Zhe Zhang 2015-09-15 10:43:13 -07:00
parent c2ebab67ac
commit ced438a4bf
3 changed files with 33 additions and 7 deletions

View File

@ -424,3 +424,6 @@
HDFS-7351. Document the HDFS Erasure Coding feature. HDFS-7351. Document the HDFS Erasure Coding feature.
(umamahesh and Zhe Zhang via wang) (umamahesh and Zhe Zhang via wang)
HDFS-8899. Erasure Coding: use threadpool for EC recovery tasks on DataNode.
(Rakesh R via zhz)

View File

@ -400,7 +400,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size"; public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size";
public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024; public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024;
public static final String DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis"; public static final String DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis";
public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s
public static final String DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY = "dfs.datanode.striped.blockrecovery.threads.size";
public static final int DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT = 8;
public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface"; public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default"; public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver"; public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";

View File

@ -38,6 +38,7 @@
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -92,6 +93,7 @@ public final class ErasureCodingWorker {
private final DataNode datanode; private final DataNode datanode;
private final Configuration conf; private final Configuration conf;
private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL;
private ThreadPoolExecutor STRIPED_READ_THREAD_POOL; private ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final int STRIPED_READ_THRESHOLD_MILLIS; private final int STRIPED_READ_THRESHOLD_MILLIS;
private final int STRIPED_READ_BUFFER_SIZE; private final int STRIPED_READ_BUFFER_SIZE;
@ -109,6 +111,10 @@ public ErasureCodingWorker(Configuration conf, DataNode datanode) {
STRIPED_READ_BUFFER_SIZE = conf.getInt( STRIPED_READ_BUFFER_SIZE = conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT); DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
initializeStripedBlkRecoveryThreadPool(conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT));
} }
private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) { private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
@ -142,6 +148,25 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
} }
private void initializeStripedBlkRecoveryThreadPool(int num) {
if (LOG.isDebugEnabled()) {
LOG.debug("Using striped block recovery; pool threads=" + num);
}
STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIdx = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement());
return t;
}
});
STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true);
}
/** /**
* Handles the Erasure Coding recovery work commands. * Handles the Erasure Coding recovery work commands.
* *
@ -150,12 +175,8 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
*/ */
public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) { public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
for (BlockECRecoveryInfo recoveryInfo : ecTasks) { for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
try { STRIPED_BLK_RECOVERY_THREAD_POOL.submit(new ReconstructAndTransferBlock(
new Daemon(new ReconstructAndTransferBlock(recoveryInfo)).start(); recoveryInfo));
} catch (Throwable e) {
LOG.warn("Failed to recover striped block " +
recoveryInfo.getExtendedBlock().getLocalBlock(), e);
}
} }
} }