HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee

This commit is contained in:
Kihwal Lee 2017-07-21 09:14:19 -05:00
parent 3b48f81411
commit 8e3a992ecc

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@ -121,6 +122,7 @@ public class Dispatcher {
/** The maximum number of concurrent blocks moves at a datanode */
private final int maxConcurrentMovesPerNode;
private final int maxMoverThreads;
private final long getBlocksSize;
private final long getBlocksMinBlockSize;
@ -139,11 +141,13 @@ public class Dispatcher {
static class Allocator {
private final int max;
private int count = 0;
private int lotSize = 1;
Allocator(int max) {
this.max = max;
}
/** Allocate specified number of items */
synchronized int allocate(int n) {
final int remaining = max - count;
if (remaining <= 0) {
@ -155,9 +159,19 @@ synchronized int allocate(int n) {
}
}
/** Aloocate a single lot of items */
int allocate() {
return allocate(lotSize);
}
synchronized void reset() {
count = 0;
}
/** Set the lot size */
synchronized void setLotSize(int lotSize) {
this.lotSize = lotSize;
}
}
private static class GlobalBlockMap {
@ -1017,6 +1031,7 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
this.dispatchExecutor = dispatcherThreads == 0? null
: Executors.newFixedThreadPool(dispatcherThreads);
this.moverThreadAllocator = new Allocator(moverThreads);
this.maxMoverThreads = moverThreads;
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
this.getBlocksSize = getBlocksSize;
@ -1116,7 +1131,7 @@ public void executePendingMove(final PendingMove p) {
final DDatanode targetDn = p.target.getDDatanode();
ExecutorService moveExecutor = targetDn.getMoveExecutor();
if (moveExecutor == null) {
final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
final int nThreads = moverThreadAllocator.allocate();
if (nThreads > 0) {
moveExecutor = targetDn.initMoveExecutor(nThreads);
}
@ -1166,6 +1181,25 @@ private long dispatchBlockMoves() throws InterruptedException {
LOG.debug("Disperse Interval sec = " +
concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
}
// Determine the size of each mover thread pool per target
int threadsPerTarget = maxMoverThreads/targets.size();
if (threadsPerTarget == 0) {
// Some scheduled moves will get ignored as some targets won't have
// any threads allocated.
moverThreadAllocator.setLotSize(1);
LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" +
maxMoverThreads + " is too small for moving blocks to " +
targets.size() + " targets. Balancing may be slower.");
} else {
if (threadsPerTarget > maxConcurrentMovesPerNode) {
threadsPerTarget = maxConcurrentMovesPerNode;
LOG.info("Limiting threads per target to the specified max.");
}
moverThreadAllocator.setLotSize(threadsPerTarget);
LOG.info("Allocating " + threadsPerTarget + " threads per target.");
}
long dSec = 0;
final Iterator<Source> i = sources.iterator();
for (int j = 0; j < futures.length; j++) {