HDFS-15665. Balancer logging improvements. Contributed by Konstantin V Shvachko.

(cherry picked from commit d07dc7afb4)
This commit is contained in:
Konstantin V Shvachko 2020-11-03 12:01:30 -08:00
parent dd1634ec3b
commit e48dd9daea
4 changed files with 55 additions and 14 deletions

View File

@ -282,6 +282,9 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
*/ */
Balancer(NameNodeConnector theblockpool, BalancerParameters p, Balancer(NameNodeConnector theblockpool, BalancerParameters p,
Configuration conf) { Configuration conf) {
// NameNode configuration parameters for balancing
getInt(conf, DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
final long movedWinWidth = getLong(conf, final long movedWinWidth = getLong(conf,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
@ -291,10 +294,6 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
final int dispatcherThreads = getInt(conf, final int dispatcherThreads = getInt(conf,
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT); DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
final int maxConcurrentMovesPerNode = getInt(conf,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
final long getBlocksSize = getLongBytes(conf, final long getBlocksSize = getLongBytes(conf,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_KEY, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_KEY,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT); DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT);
@ -311,6 +310,13 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY,
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT); DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT);
// DataNode configuration parameters for balancing
final int maxConcurrentMovesPerNode = getInt(conf,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
getLongBytes(conf, DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT);
this.nnc = theblockpool; this.nnc = theblockpool;
this.dispatcher = this.dispatcher =
new Dispatcher(theblockpool, p.getIncludedNodes(), new Dispatcher(theblockpool, p.getIncludedNodes(),
@ -603,12 +609,13 @@ static class Result {
this.bytesAlreadyMoved = bytesAlreadyMoved; this.bytesAlreadyMoved = bytesAlreadyMoved;
} }
void print(int iteration, PrintStream out) { void print(int iteration, NameNodeConnector nnc, PrintStream out) {
out.printf("%-24s %10d %19s %18s %17s%n", out.printf("%-24s %10d %19s %18s %17s %s%n",
DateFormat.getDateTimeInstance().format(new Date()), iteration, DateFormat.getDateTimeInstance().format(new Date()), iteration,
StringUtils.byteDesc(bytesAlreadyMoved), StringUtils.byteDesc(bytesAlreadyMoved),
StringUtils.byteDesc(bytesLeftToMove), StringUtils.byteDesc(bytesLeftToMove),
StringUtils.byteDesc(bytesBeingMoved)); StringUtils.byteDesc(bytesBeingMoved),
nnc.getNameNodeUri());
} }
} }
@ -653,8 +660,10 @@ Result runOneIteration() {
System.out.println("No block can be moved. Exiting..."); System.out.println("No block can be moved. Exiting...");
return newResult(ExitStatus.NO_MOVE_BLOCK, bytesLeftToMove, bytesBeingMoved); return newResult(ExitStatus.NO_MOVE_BLOCK, bytesLeftToMove, bytesBeingMoved);
} else { } else {
LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) + LOG.info("Will move {} in this iteration for {}",
" in this iteration"); StringUtils.byteDesc(bytesBeingMoved), nnc.toString());
LOG.info("Total target DataNodes in this iteration: {}",
dispatcher.moveTasksTotal());
} }
/* For each pair of <source, target>, start a thread that repeatedly /* For each pair of <source, target>, start a thread that repeatedly
@ -705,7 +714,9 @@ static private int doBalance(Collection<URI> namenodes,
LOG.info("excluded nodes = " + p.getExcludedNodes()); LOG.info("excluded nodes = " + p.getExcludedNodes());
LOG.info("source nodes = " + p.getSourceNodes()); LOG.info("source nodes = " + p.getSourceNodes());
checkKeytabAndInit(conf); checkKeytabAndInit(conf);
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); System.out.println("Time Stamp Iteration#"
+ " Bytes Already Moved Bytes Left To Move Bytes Being Moved"
+ " NameNode");
List<NameNodeConnector> connectors = Collections.emptyList(); List<NameNodeConnector> connectors = Collections.emptyList();
try { try {
@ -721,7 +732,7 @@ static private int doBalance(Collection<URI> namenodes,
|| p.getBlockPools().contains(nnc.getBlockpoolID())) { || p.getBlockPools().contains(nnc.getBlockpoolID())) {
final Balancer b = new Balancer(nnc, p, conf); final Balancer b = new Balancer(nnc, p, conf);
final Result r = b.runOneIteration(); final Result r = b.runOneIteration();
r.print(iteration, System.out); r.print(iteration, nnc, System.out);
// clean all lists // clean all lists
b.resetData(conf); b.resetData(conf);

View File

@ -392,7 +392,7 @@ private void dispatch() {
sendRequest(out, eb, accessToken); sendRequest(out, eb, accessToken);
receiveResponse(in); receiveResponse(in);
nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes()); nnc.addBytesMoved(reportedBlock.getNumBytes());
target.getDDatanode().setHasSuccess(); target.getDDatanode().setHasSuccess();
LOG.info("Successfully moved " + this); LOG.info("Successfully moved " + this);
} catch (IOException e) { } catch (IOException e) {
@ -1064,6 +1064,10 @@ long getBytesMoved() {
return nnc.getBytesMoved().get(); return nnc.getBytesMoved().get();
} }
long getBblocksMoved() {
return nnc.getBlocksMoved().get();
}
long bytesToMove() { long bytesToMove() {
Preconditions.checkState( Preconditions.checkState(
storageGroupMap.size() >= sources.size() + targets.size(), storageGroupMap.size() >= sources.size() + targets.size(),
@ -1083,6 +1087,14 @@ void add(Source source, StorageGroup target) {
targets.add(target); targets.add(target);
} }
public int moveTasksTotal() {
int b = 0;
for (Source src : sources) {
b += src.tasks.size();
}
return b;
}
private boolean shouldIgnore(DatanodeInfo dn) { private boolean shouldIgnore(DatanodeInfo dn) {
// ignore out-of-service nodes // ignore out-of-service nodes
final boolean outOfService = !dn.isInService(); final boolean outOfService = !dn.isInService();
@ -1164,12 +1176,13 @@ public boolean dispatchAndCheckContinue() throws InterruptedException {
*/ */
private long dispatchBlockMoves() throws InterruptedException { private long dispatchBlockMoves() throws InterruptedException {
final long bytesLastMoved = getBytesMoved(); final long bytesLastMoved = getBytesMoved();
final long blocksLastMoved = getBblocksMoved();
final Future<?>[] futures = new Future<?>[sources.size()]; final Future<?>[] futures = new Future<?>[sources.size()];
int concurrentThreads = Math.min(sources.size(), int concurrentThreads = Math.min(sources.size(),
((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize()); ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
assert concurrentThreads > 0 : "Number of concurrent threads is 0."; assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
LOG.debug("Balancer concurrent dispatcher threads = {}", concurrentThreads); LOG.info("Balancer concurrent dispatcher threads = {}", concurrentThreads);
// Determine the size of each mover thread pool per target // Determine the size of each mover thread pool per target
int threadsPerTarget = maxMoverThreads/targets.size(); int threadsPerTarget = maxMoverThreads/targets.size();
@ -1211,6 +1224,9 @@ public void run() {
// wait for all reportedBlock moving to be done // wait for all reportedBlock moving to be done
waitForMoveCompletion(targets); waitForMoveCompletion(targets);
LOG.info("Total bytes (blocks) moved in this iteration {} ({})",
StringUtils.byteDesc(getBytesMoved() - bytesLastMoved),
(getBblocksMoved() - blocksLastMoved));
return getBytesMoved() - bytesLastMoved; return getBytesMoved() - bytesLastMoved;
} }

View File

@ -162,6 +162,7 @@ public static void checkOtherInstanceRunning(boolean toCheck) {
private OutputStream out; private OutputStream out;
private final List<Path> targetPaths; private final List<Path> targetPaths;
private final AtomicLong bytesMoved = new AtomicLong(); private final AtomicLong bytesMoved = new AtomicLong();
private final AtomicLong blocksMoved = new AtomicLong();
private final int maxNotChangedIterations; private final int maxNotChangedIterations;
private int notChangedIterations = 0; private int notChangedIterations = 0;
@ -233,6 +234,19 @@ AtomicLong getBytesMoved() {
return bytesMoved; return bytesMoved;
} }
AtomicLong getBlocksMoved() {
return blocksMoved;
}
public void addBytesMoved(long numBytes) {
bytesMoved.addAndGet(numBytes);
blocksMoved.incrementAndGet();
}
public URI getNameNodeUri() {
return nameNodeUri;
}
/** @return blocks with locations. */ /** @return blocks with locations. */
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize) throws IOException { minBlockSize) throws IOException {

View File

@ -1018,7 +1018,7 @@ private static int runBalancer(Collection<URI> namenodes,
for(NameNodeConnector nnc : connectors) { for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf); final Balancer b = new Balancer(nnc, p, conf);
final Result r = b.runOneIteration(); final Result r = b.runOneIteration();
r.print(iteration, System.out); r.print(iteration, nnc, System.out);
// clean all lists // clean all lists
b.resetData(conf); b.resetData(conf);