HDFS-17117. Print reconstructionQueuesInitProgress periodically when BlockManager processMisReplicatesAsync. (#5877). Contributed by Haiyang Hu.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
huhaiyang 2023-07-27 11:28:30 +08:00 committed by GitHub
parent 24f5f708df
commit 1d09dcc614
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 73 additions and 4 deletions

View File

@ -884,6 +884,11 @@ public int getPendingSPSPaths() {
return 0;
}
@Override
public float getReconstructionQueuesInitProgress() {
return 0;
}
private Router getRouter() throws IOException {
if (this.router == null) {
throw new IOException("Router is not initialized");

View File

@ -449,7 +449,7 @@ public int getPendingSPSPaths() {
/**
* Progress of the Reconstruction queues initialisation.
*/
private double reconstructionQueuesInitProgress = 0.0;
private float reconstructionQueuesInitProgress = 0.0f;
/** for block replicas placement */
private volatile BlockPlacementPolicies placementPolicies;
@ -3889,8 +3889,10 @@ private void processMisReplicatesAsync() throws InterruptedException {
totalProcessed += processed;
// there is a possibility that if any of the blocks deleted/added during
// initialisation, then progress might be different.
reconstructionQueuesInitProgress = Math.min((double) totalProcessed
/ totalBlocks, 1.0);
if (totalBlocks > 0) { // here avoid metrics appear as NaN.
reconstructionQueuesInitProgress = Math.min((float) totalProcessed
/ totalBlocks, 1.0f);
}
if (!blocksItr.hasNext()) {
LOG.info("Total number of blocks = {}", blocksMap.size());
@ -3910,6 +3912,8 @@ private void processMisReplicatesAsync() throws InterruptedException {
}
} finally {
namesystem.writeUnlock("processMisReplicatesAsync");
LOG.info("Reconstruction queues initialisation progress: {}, total number of blocks " +
"processed: {}/{}", reconstructionQueuesInitProgress, totalProcessed, totalBlocks);
// Make sure it is out of the write lock for sufficiently long time.
Thread.sleep(sleepDuration);
}
@ -3924,7 +3928,7 @@ private void processMisReplicatesAsync() throws InterruptedException {
*
* @return Returns values between 0 and 1 for the progress.
*/
public double getReconstructionQueuesInitProgress() {
public float getReconstructionQueuesInitProgress() {
return reconstructionQueuesInitProgress;
}

View File

@ -4888,6 +4888,15 @@ public int getPendingSPSPaths() {
return blockManager.getPendingSPSPaths();
}
/**
* Get the progress of the reconstruction queues initialisation.
*/
@Override // FSNamesystemMBean
@Metric
public float getReconstructionQueuesInitProgress() {
return blockManager.getReconstructionQueuesInitProgress();
}
/**
* Returns the length of the wait Queue for the FSNameSystemLock.
*

View File

@ -261,4 +261,11 @@ public interface FSNamesystemMBean {
* @return The number of paths to be processed by sps.
*/
int getPendingSPSPaths();
/**
* Get the progress of the reconstruction queues initialisation.
*
* @return Returns values between 0 and 1 for the progress.
*/
float getReconstructionQueuesInitProgress();
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
@ -33,9 +34,12 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.metrics2.impl.ConfigBuilder;
import org.apache.hadoop.metrics2.impl.TestMetricsConfig;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import org.eclipse.jetty.util.ajax.JSON;
@ -225,4 +229,44 @@ public void testFsEditLogMetrics() throws Exception {
}
}
}
/**
* Test metrics associated with reconstructionQueuesInitProgress.
*/
@Test
public void testReconstructionQueuesInitProgressMetrics() throws Exception {
Configuration conf = new Configuration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) {
cluster.waitActive();
final FSNamesystem fsNamesystem = cluster.getNamesystem();
final DistributedFileSystem fs = cluster.getFileSystem();
// Validate init reconstructionQueuesInitProgress value.
assertEquals(0.0, fsNamesystem.getReconstructionQueuesInitProgress(), 0);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName =
new ObjectName("Hadoop:service=NameNode,name=FSNamesystemState");
float reconstructionQueuesInitProgress =
(float) mbs.getAttribute(mxbeanName, "ReconstructionQueuesInitProgress");
assertEquals(0.0, reconstructionQueuesInitProgress, 0);
// Create file.
Path file = new Path("/test");
long fileLength = 1024 * 1024 * 3;
DFSTestUtil.createFile(fs, file, fileLength, (short) 1, 0L);
DFSTestUtil.waitReplication(fs, file, (short) 1);
// Restart nameNode to run processMisReplicatedBlocks.
cluster.restartNameNode(true);
// Validate reconstructionQueuesInitProgress value.
GenericTestUtils.waitFor(
() -> cluster.getNamesystem().getReconstructionQueuesInitProgress() == 1.0,
100, 5 * 1000);
reconstructionQueuesInitProgress =
(float) mbs.getAttribute(mxbeanName, "ReconstructionQueuesInitProgress");
assertEquals(1.0, reconstructionQueuesInitProgress, 0);
}
}
}