HDFS-14553. Make queue size of BlockReportProcessingThread configurable. Contributed by He Xiaoqiao.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
He Xiaoqiao 2019-06-10 17:20:50 -07:00 committed by Wei-Chiu Chuang
parent 9191e08f0a
commit bd46bdf9f9
3 changed files with 21 additions and 5 deletions

View File

@ -269,6 +269,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
= "dfs.namenode.storageinfo.defragment.ratio"; = "dfs.namenode.storageinfo.defragment.ratio";
public static final double public static final double
DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT = 0.75; DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT = 0.75;
public static final String DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_KEY
= "dfs.namenode.blockreport.queue.size";
public static final int DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT
= 1024;
public static final String DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter"; public static final String DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter";
/* Phrased as below to avoid javac inlining as a constant, to match the behavior when /* Phrased as below to avoid javac inlining as a constant, to match the behavior when
this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you

View File

@ -317,8 +317,7 @@ public long getTotalECBlockGroups() {
new Daemon(new StorageInfoDefragmenter()); new Daemon(new StorageInfoDefragmenter());
/** Block report thread for handling async reports. */ /** Block report thread for handling async reports. */
private final BlockReportProcessingThread blockReportThread = private final BlockReportProcessingThread blockReportThread;
new BlockReportProcessingThread();
/** /**
* Store blocks {@literal ->} datanodedescriptor(s) map of corrupt replicas. * Store blocks {@literal ->} datanodedescriptor(s) map of corrupt replicas.
@ -574,6 +573,11 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf); bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
int queueSize = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);
blockReportThread = new BlockReportProcessingThread(queueSize);
LOG.info("defaultReplication = {}", defaultReplication); LOG.info("defaultReplication = {}", defaultReplication);
LOG.info("maxReplication = {}", maxReplication); LOG.info("maxReplication = {}", maxReplication);
LOG.info("minReplication = {}", minReplication); LOG.info("minReplication = {}", minReplication);
@ -4966,11 +4970,11 @@ private class BlockReportProcessingThread extends Thread {
private static final long MAX_LOCK_HOLD_MS = 4; private static final long MAX_LOCK_HOLD_MS = 4;
private long lastFull = 0; private long lastFull = 0;
private final BlockingQueue<Runnable> queue = private final BlockingQueue<Runnable> queue;
new ArrayBlockingQueue<Runnable>(1024);
BlockReportProcessingThread() { BlockReportProcessingThread(int size) {
super("Block report processor"); super("Block report processor");
queue = new ArrayBlockingQueue<>(size);
setDaemon(true); setDaemon(true);
} }

View File

@ -5337,4 +5337,12 @@
inter-DN QOP. inter-DN QOP.
</description> </description>
</property> </property>
<property>
<name>dfs.namenode.blockreport.queue.size</name>
<value>1024</value>
<description>
The queue size of BlockReportProcessingThread in BlockManager.
</description>
</property>
</configuration> </configuration>