HDFS-8818. Changes the global moveExecutor to per datanode executors and changes MAX_SIZE_TO_MOVE to be configurable.
This commit is contained in:
parent
cf9d3c9256
commit
b56daff6a1
@ -772,6 +772,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-8772. Fix TestStandbyIsHot#testDatanodeRestarts which occasionally fails.
|
HDFS-8772. Fix TestStandbyIsHot#testDatanodeRestarts which occasionally fails.
|
||||||
(Walter Su via wang)
|
(Walter Su via wang)
|
||||||
|
|
||||||
|
HDFS-8818. Changes the global moveExecutor to per datanode executors and
|
||||||
|
changes MAX_SIZE_TO_MOVE to be configurable. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
@ -351,6 +351,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final int DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000;
|
public static final int DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000;
|
||||||
public static final String DFS_BALANCER_DISPATCHERTHREADS_KEY = "dfs.balancer.dispatcherThreads";
|
public static final String DFS_BALANCER_DISPATCHERTHREADS_KEY = "dfs.balancer.dispatcherThreads";
|
||||||
public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
|
public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
|
||||||
|
public static final String DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY = "dfs.balancer.max-size-to-move";
|
||||||
|
public static final long DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT = 10L*1024*1024*1024;
|
||||||
|
|
||||||
|
|
||||||
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
|
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
|
||||||
public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
|
public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
@ -167,9 +168,6 @@ public class Balancer {
|
|||||||
|
|
||||||
static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
|
static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
|
||||||
|
|
||||||
private static final long GB = 1L << 30; //1GB
|
|
||||||
private static final long MAX_SIZE_TO_MOVE = 10*GB;
|
|
||||||
|
|
||||||
private static final String USAGE = "Usage: hdfs balancer"
|
private static final String USAGE = "Usage: hdfs balancer"
|
||||||
+ "\n\t[-policy <policy>]\tthe balancing policy: "
|
+ "\n\t[-policy <policy>]\tthe balancing policy: "
|
||||||
+ BalancingPolicy.Node.INSTANCE.getName() + " or "
|
+ BalancingPolicy.Node.INSTANCE.getName() + " or "
|
||||||
@ -192,6 +190,7 @@ public class Balancer {
|
|||||||
private final BalancingPolicy policy;
|
private final BalancingPolicy policy;
|
||||||
private final boolean runDuringUpgrade;
|
private final boolean runDuringUpgrade;
|
||||||
private final double threshold;
|
private final double threshold;
|
||||||
|
private final long maxSizeToMove;
|
||||||
|
|
||||||
// all data node lists
|
// all data node lists
|
||||||
private final Collection<Source> overUtilized = new LinkedList<Source>();
|
private final Collection<Source> overUtilized = new LinkedList<Source>();
|
||||||
@ -213,6 +212,24 @@ private static void checkReplicationPolicyCompatibility(Configuration conf
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static long getLong(Configuration conf, String key, long defaultValue) {
|
||||||
|
final long v = conf.getLong(key, defaultValue);
|
||||||
|
LOG.info(key + " = " + v + " (default=" + defaultValue + ")");
|
||||||
|
if (v <= 0) {
|
||||||
|
throw new HadoopIllegalArgumentException(key + " = " + v + " <= " + 0);
|
||||||
|
}
|
||||||
|
return v;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int getInt(Configuration conf, String key, int defaultValue) {
|
||||||
|
final int v = conf.getInt(key, defaultValue);
|
||||||
|
LOG.info(key + " = " + v + " (default=" + defaultValue + ")");
|
||||||
|
if (v <= 0) {
|
||||||
|
throw new HadoopIllegalArgumentException(key + " = " + v + " <= " + 0);
|
||||||
|
}
|
||||||
|
return v;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a balancer.
|
* Construct a balancer.
|
||||||
* Initialize balancer. It sets the value of the threshold, and
|
* Initialize balancer. It sets the value of the threshold, and
|
||||||
@ -221,16 +238,16 @@ private static void checkReplicationPolicyCompatibility(Configuration conf
|
|||||||
* when connection fails.
|
* when connection fails.
|
||||||
*/
|
*/
|
||||||
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
|
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
|
||||||
final long movedWinWidth = conf.getLong(
|
final long movedWinWidth = getLong(conf,
|
||||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
|
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
|
||||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
|
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
|
||||||
final int moverThreads = conf.getInt(
|
final int moverThreads = getInt(conf,
|
||||||
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
|
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY,
|
||||||
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
|
DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT);
|
||||||
final int dispatcherThreads = conf.getInt(
|
final int dispatcherThreads = getInt(conf,
|
||||||
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
|
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
|
||||||
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
|
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
|
||||||
final int maxConcurrentMovesPerNode = conf.getInt(
|
final int maxConcurrentMovesPerNode = getInt(conf,
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
||||||
|
|
||||||
@ -241,6 +258,10 @@ private static void checkReplicationPolicyCompatibility(Configuration conf
|
|||||||
this.threshold = p.threshold;
|
this.threshold = p.threshold;
|
||||||
this.policy = p.policy;
|
this.policy = p.policy;
|
||||||
this.runDuringUpgrade = p.runDuringUpgrade;
|
this.runDuringUpgrade = p.runDuringUpgrade;
|
||||||
|
|
||||||
|
this.maxSizeToMove = getLong(conf,
|
||||||
|
DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
|
||||||
|
DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long getCapacity(DatanodeStorageReport report, StorageType t) {
|
private static long getCapacity(DatanodeStorageReport report, StorageType t) {
|
||||||
@ -294,7 +315,7 @@ private long init(List<DatanodeStorageReport> reports) {
|
|||||||
final double utilizationDiff = utilization - policy.getAvgUtilization(t);
|
final double utilizationDiff = utilization - policy.getAvgUtilization(t);
|
||||||
final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
|
final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
|
||||||
final long maxSize2Move = computeMaxSize2Move(capacity,
|
final long maxSize2Move = computeMaxSize2Move(capacity,
|
||||||
getRemaining(r, t), utilizationDiff, threshold);
|
getRemaining(r, t), utilizationDiff, threshold, maxSizeToMove);
|
||||||
|
|
||||||
final StorageGroup g;
|
final StorageGroup g;
|
||||||
if (utilizationDiff > 0) {
|
if (utilizationDiff > 0) {
|
||||||
@ -331,13 +352,13 @@ private long init(List<DatanodeStorageReport> reports) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static long computeMaxSize2Move(final long capacity, final long remaining,
|
private static long computeMaxSize2Move(final long capacity, final long remaining,
|
||||||
final double utilizationDiff, final double threshold) {
|
final double utilizationDiff, final double threshold, final long max) {
|
||||||
final double diff = Math.min(threshold, Math.abs(utilizationDiff));
|
final double diff = Math.min(threshold, Math.abs(utilizationDiff));
|
||||||
long maxSizeToMove = percentage2bytes(diff, capacity);
|
long maxSizeToMove = percentage2bytes(diff, capacity);
|
||||||
if (utilizationDiff < 0) {
|
if (utilizationDiff < 0) {
|
||||||
maxSizeToMove = Math.min(remaining, maxSizeToMove);
|
maxSizeToMove = Math.min(remaining, maxSizeToMove);
|
||||||
}
|
}
|
||||||
return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
|
return Math.min(max, maxSizeToMove);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long percentage2bytes(double percentage, long capacity) {
|
private static long percentage2bytes(double percentage, long capacity) {
|
||||||
@ -387,6 +408,7 @@ private void chooseStorageGroups(final Matcher matcher) {
|
|||||||
/* first step: match each overUtilized datanode (source) to
|
/* first step: match each overUtilized datanode (source) to
|
||||||
* one or more underUtilized datanodes (targets).
|
* one or more underUtilized datanodes (targets).
|
||||||
*/
|
*/
|
||||||
|
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized");
|
||||||
chooseStorageGroups(overUtilized, underUtilized, matcher);
|
chooseStorageGroups(overUtilized, underUtilized, matcher);
|
||||||
|
|
||||||
/* match each remaining overutilized datanode (source) to
|
/* match each remaining overutilized datanode (source) to
|
||||||
@ -394,6 +416,7 @@ private void chooseStorageGroups(final Matcher matcher) {
|
|||||||
* Note only overutilized datanodes that haven't had that max bytes to move
|
* Note only overutilized datanodes that haven't had that max bytes to move
|
||||||
* satisfied in step 1 are selected
|
* satisfied in step 1 are selected
|
||||||
*/
|
*/
|
||||||
|
LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized");
|
||||||
chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
|
chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
|
||||||
|
|
||||||
/* match each remaining underutilized datanode (target) to
|
/* match each remaining underutilized datanode (target) to
|
||||||
@ -401,6 +424,7 @@ private void chooseStorageGroups(final Matcher matcher) {
|
|||||||
* Note only underutilized datanodes that have not had that max bytes to
|
* Note only underutilized datanodes that have not had that max bytes to
|
||||||
* move satisfied in step 1 are selected.
|
* move satisfied in step 1 are selected.
|
||||||
*/
|
*/
|
||||||
|
LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized");
|
||||||
chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
|
chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,14 +112,39 @@ public class Dispatcher {
|
|||||||
|
|
||||||
private NetworkTopology cluster;
|
private NetworkTopology cluster;
|
||||||
|
|
||||||
private final ExecutorService moveExecutor;
|
|
||||||
private final ExecutorService dispatchExecutor;
|
private final ExecutorService dispatchExecutor;
|
||||||
|
|
||||||
|
private final Allocator moverThreadAllocator;
|
||||||
|
|
||||||
/** The maximum number of concurrent blocks moves at a datanode */
|
/** The maximum number of concurrent blocks moves at a datanode */
|
||||||
private final int maxConcurrentMovesPerNode;
|
private final int maxConcurrentMovesPerNode;
|
||||||
|
|
||||||
private final int ioFileBufferSize;
|
private final int ioFileBufferSize;
|
||||||
|
|
||||||
|
static class Allocator {
|
||||||
|
private final int max;
|
||||||
|
private int count = 0;
|
||||||
|
|
||||||
|
Allocator(int max) {
|
||||||
|
this.max = max;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized int allocate(int n) {
|
||||||
|
final int remaining = max - count;
|
||||||
|
if (remaining <= 0) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
final int allocated = remaining < n? remaining: n;
|
||||||
|
count += allocated;
|
||||||
|
return allocated;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void reset() {
|
||||||
|
count = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class GlobalBlockMap {
|
private static class GlobalBlockMap {
|
||||||
private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
|
private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
|
||||||
|
|
||||||
@ -285,9 +310,7 @@ private boolean addTo(StorageGroup g) {
|
|||||||
|
|
||||||
/** Dispatch the move to the proxy source & wait for the response. */
|
/** Dispatch the move to the proxy source & wait for the response. */
|
||||||
private void dispatch() {
|
private void dispatch() {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.info("Start moving " + this);
|
||||||
LOG.debug("Start moving " + this);
|
|
||||||
}
|
|
||||||
|
|
||||||
Socket sock = new Socket();
|
Socket sock = new Socket();
|
||||||
DataOutputStream out = null;
|
DataOutputStream out = null;
|
||||||
@ -502,7 +525,7 @@ public boolean equals(Object obj) {
|
|||||||
private final List<PendingMove> pendings;
|
private final List<PendingMove> pendings;
|
||||||
private volatile boolean hasFailure = false;
|
private volatile boolean hasFailure = false;
|
||||||
private volatile boolean hasSuccess = false;
|
private volatile boolean hasSuccess = false;
|
||||||
private final int maxConcurrentMoves;
|
private ExecutorService moveExecutor;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
@ -511,7 +534,6 @@ public String toString() {
|
|||||||
|
|
||||||
private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) {
|
private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) {
|
||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
this.maxConcurrentMoves = maxConcurrentMoves;
|
|
||||||
this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
|
this.pendings = new ArrayList<PendingMove>(maxConcurrentMoves);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -519,6 +541,21 @@ public DatanodeInfo getDatanodeInfo() {
|
|||||||
return datanode;
|
return datanode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
synchronized ExecutorService initMoveExecutor(int poolSize) {
|
||||||
|
return moveExecutor = Executors.newFixedThreadPool(poolSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized ExecutorService getMoveExecutor() {
|
||||||
|
return moveExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void shutdownMoveExecutor() {
|
||||||
|
if (moveExecutor != null) {
|
||||||
|
moveExecutor.shutdown();
|
||||||
|
moveExecutor = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static <G extends StorageGroup> void put(StorageType storageType,
|
private static <G extends StorageGroup> void put(StorageType storageType,
|
||||||
G g, EnumMap<StorageType, G> map) {
|
G g, EnumMap<StorageType, G> map) {
|
||||||
final StorageGroup existing = map.put(storageType, g);
|
final StorageGroup existing = map.put(storageType, g);
|
||||||
@ -539,6 +576,7 @@ public Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d
|
|||||||
|
|
||||||
synchronized private void activateDelay(long delta) {
|
synchronized private void activateDelay(long delta) {
|
||||||
delayUntil = Time.monotonicNow() + delta;
|
delayUntil = Time.monotonicNow() + delta;
|
||||||
|
LOG.info(this + " activateDelay " + delta/1000.0 + " seconds");
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized private boolean isDelayActive() {
|
synchronized private boolean isDelayActive() {
|
||||||
@ -549,11 +587,6 @@ synchronized private boolean isDelayActive() {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Check if the node can schedule more blocks to move */
|
|
||||||
synchronized boolean isPendingQNotFull() {
|
|
||||||
return pendings.size() < maxConcurrentMoves;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Check if all the dispatched moves are done */
|
/** Check if all the dispatched moves are done */
|
||||||
synchronized boolean isPendingQEmpty() {
|
synchronized boolean isPendingQEmpty() {
|
||||||
return pendings.isEmpty();
|
return pendings.isEmpty();
|
||||||
@ -561,7 +594,7 @@ synchronized boolean isPendingQEmpty() {
|
|||||||
|
|
||||||
/** Add a scheduled block move to the node */
|
/** Add a scheduled block move to the node */
|
||||||
synchronized boolean addPendingBlock(PendingMove pendingBlock) {
|
synchronized boolean addPendingBlock(PendingMove pendingBlock) {
|
||||||
if (!isDelayActive() && isPendingQNotFull()) {
|
if (!isDelayActive()) {
|
||||||
return pendings.add(pendingBlock);
|
return pendings.add(pendingBlock);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
@ -619,6 +652,11 @@ Iterator<DBlock> getBlockIterator() {
|
|||||||
private long getBlockList() throws IOException {
|
private long getBlockList() throws IOException {
|
||||||
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
|
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
|
||||||
final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
|
final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
|
||||||
|
+ StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
|
||||||
|
+ ") returns " + newBlocks.getBlocks().length + " blocks.");
|
||||||
|
}
|
||||||
|
|
||||||
long bytesReceived = 0;
|
long bytesReceived = 0;
|
||||||
for (BlockWithLocations blk : newBlocks.getBlocks()) {
|
for (BlockWithLocations blk : newBlocks.getBlocks()) {
|
||||||
@ -640,7 +678,9 @@ private long getBlockList() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
|
if (!srcBlocks.contains(block) && isGoodBlockCandidate(block)) {
|
||||||
// filter bad candidates
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Add " + block + " to " + this);
|
||||||
|
}
|
||||||
srcBlocks.add(block);
|
srcBlocks.add(block);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -708,11 +748,9 @@ private void removeMovedBlocks() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final int SOURCE_BLOCKS_MIN_SIZE = 5;
|
|
||||||
|
|
||||||
/** @return if should fetch more blocks from namenode */
|
/** @return if should fetch more blocks from namenode */
|
||||||
private boolean shouldFetchMoreBlocks() {
|
private boolean shouldFetchMoreBlocks() {
|
||||||
return srcBlocks.size() < SOURCE_BLOCKS_MIN_SIZE && blocksToReceive > 0;
|
return blocksToReceive > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
|
private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
|
||||||
@ -732,6 +770,11 @@ private void dispatchBlocks() {
|
|||||||
int noPendingMoveIteration = 0;
|
int noPendingMoveIteration = 0;
|
||||||
while (!isTimeUp && getScheduledSize() > 0
|
while (!isTimeUp && getScheduledSize() > 0
|
||||||
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
|
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(this + " blocksToReceive=" + blocksToReceive
|
||||||
|
+ ", scheduledSize=" + getScheduledSize()
|
||||||
|
+ ", srcBlocks#=" + srcBlocks.size());
|
||||||
|
}
|
||||||
final PendingMove p = chooseNextMove();
|
final PendingMove p = chooseNextMove();
|
||||||
if (p != null) {
|
if (p != null) {
|
||||||
// Reset no pending move counter
|
// Reset no pending move counter
|
||||||
@ -759,12 +802,16 @@ private void dispatchBlocks() {
|
|||||||
// in case no blocks can be moved for source node's task,
|
// in case no blocks can be moved for source node's task,
|
||||||
// jump out of while-loop after 5 iterations.
|
// jump out of while-loop after 5 iterations.
|
||||||
if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
|
if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
|
||||||
|
LOG.info("Failed to find a pending move " + noPendingMoveIteration
|
||||||
|
+ " times. Skipping " + this);
|
||||||
resetScheduledSize();
|
resetScheduledSize();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if time is up or not
|
// check if time is up or not
|
||||||
if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
|
if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
|
||||||
|
LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000
|
||||||
|
+ " seconds). Skipping " + this);
|
||||||
isTimeUp = true;
|
isTimeUp = true;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -801,9 +848,9 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
|||||||
|
|
||||||
this.cluster = NetworkTopology.getInstance(conf);
|
this.cluster = NetworkTopology.getInstance(conf);
|
||||||
|
|
||||||
this.moveExecutor = Executors.newFixedThreadPool(moverThreads);
|
|
||||||
this.dispatchExecutor = dispatcherThreads == 0? null
|
this.dispatchExecutor = dispatcherThreads == 0? null
|
||||||
: Executors.newFixedThreadPool(dispatcherThreads);
|
: Executors.newFixedThreadPool(dispatcherThreads);
|
||||||
|
this.moverThreadAllocator = new Allocator(moverThreads);
|
||||||
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
|
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
|
||||||
|
|
||||||
this.saslClient = new SaslDataTransferClient(conf,
|
this.saslClient = new SaslDataTransferClient(conf,
|
||||||
@ -888,8 +935,22 @@ public DDatanode newDatanode(DatanodeInfo datanode) {
|
|||||||
return new DDatanode(datanode, maxConcurrentMovesPerNode);
|
return new DDatanode(datanode, maxConcurrentMovesPerNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void executePendingMove(final PendingMove p) {
|
public void executePendingMove(final PendingMove p) {
|
||||||
// move the block
|
// move the block
|
||||||
|
final DDatanode targetDn = p.target.getDDatanode();
|
||||||
|
ExecutorService moveExecutor = targetDn.getMoveExecutor();
|
||||||
|
if (moveExecutor == null) {
|
||||||
|
final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
|
||||||
|
if (nThreads > 0) {
|
||||||
|
moveExecutor = targetDn.initMoveExecutor(nThreads);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (moveExecutor == null) {
|
||||||
|
LOG.warn("No mover threads available: skip moving " + p);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
moveExecutor.execute(new Runnable() {
|
moveExecutor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
@ -1081,6 +1142,11 @@ void reset(Configuration conf) {
|
|||||||
cluster = NetworkTopology.getInstance(conf);
|
cluster = NetworkTopology.getInstance(conf);
|
||||||
storageGroupMap.clear();
|
storageGroupMap.clear();
|
||||||
sources.clear();
|
sources.clear();
|
||||||
|
|
||||||
|
moverThreadAllocator.reset();
|
||||||
|
for(StorageGroup t : targets) {
|
||||||
|
t.getDDatanode().shutdownMoveExecutor();
|
||||||
|
}
|
||||||
targets.clear();
|
targets.clear();
|
||||||
globalBlocks.removeAllButRetain(movedBlocks);
|
globalBlocks.removeAllButRetain(movedBlocks);
|
||||||
movedBlocks.cleanup();
|
movedBlocks.cleanup();
|
||||||
@ -1102,7 +1168,6 @@ public void shutdownNow() {
|
|||||||
if (dispatchExecutor != null) {
|
if (dispatchExecutor != null) {
|
||||||
dispatchExecutor.shutdownNow();
|
dispatchExecutor.shutdownNow();
|
||||||
}
|
}
|
||||||
moveExecutor.shutdownNow();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static class Util {
|
static class Util {
|
||||||
|
@ -77,6 +77,11 @@ public Block getBlock() {
|
|||||||
public long getNumBytes() {
|
public long getNumBytes() {
|
||||||
return block.getNumBytes();
|
return block.getNumBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return block + " size=" + getNumBytes();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final int CUR_WIN = 0;
|
private static final int CUR_WIN = 0;
|
||||||
|
@ -19,7 +19,13 @@
|
|||||||
|
|
||||||
import static org.apache.hadoop.fs.StorageType.DEFAULT;
|
import static org.apache.hadoop.fs.StorageType.DEFAULT;
|
||||||
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
|
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
@ -29,8 +35,8 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.URI;
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -45,6 +51,7 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
@ -59,12 +66,16 @@
|
|||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
import org.apache.hadoop.hdfs.protocol.*;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
|
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
|
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
|
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
|
||||||
@ -927,7 +938,7 @@ void testBalancer1Internal(Configuration conf) throws Exception {
|
|||||||
new String[] {RACK0, RACK1});
|
new String[] {RACK0, RACK1});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=100000)
|
@Test(expected=HadoopIllegalArgumentException.class)
|
||||||
public void testBalancerWithZeroThreadsForMove() throws Exception {
|
public void testBalancerWithZeroThreadsForMove() throws Exception {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);
|
conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);
|
||||||
|
Loading…
Reference in New Issue
Block a user