HDFS-16386. Reduce DataNode load when FsDatasetAsyncDiskService is working. (#3806)
This commit is contained in:
parent
32a78e0b9e
commit
746b328554
@ -143,6 +143,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
|
public static final long DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
|
||||||
public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
|
public static final String DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
|
||||||
public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
|
public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
|
||||||
|
public static final String DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY =
|
||||||
|
"dfs.datanode.fsdatasetasyncdisk.max.threads.per.volume";
|
||||||
|
public static final int DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
|
||||||
public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
|
public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
|
||||||
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
|
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
|
||||||
public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
|
public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
|
||||||
|
@ -30,6 +30,8 @@
|
|||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.util.Preconditions;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
@ -65,7 +67,7 @@ class FsDatasetAsyncDiskService {
|
|||||||
// ThreadPool core pool size
|
// ThreadPool core pool size
|
||||||
private static final int CORE_THREADS_PER_VOLUME = 1;
|
private static final int CORE_THREADS_PER_VOLUME = 1;
|
||||||
// ThreadPool maximum pool size
|
// ThreadPool maximum pool size
|
||||||
private static final int MAXIMUM_THREADS_PER_VOLUME = 4;
|
private final int maxNumThreadsPerVolume;
|
||||||
// ThreadPool keep-alive time for threads over core pool size
|
// ThreadPool keep-alive time for threads over core pool size
|
||||||
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
|
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
|
||||||
|
|
||||||
@ -90,6 +92,12 @@ class FsDatasetAsyncDiskService {
|
|||||||
this.datanode = datanode;
|
this.datanode = datanode;
|
||||||
this.fsdatasetImpl = fsdatasetImpl;
|
this.fsdatasetImpl = fsdatasetImpl;
|
||||||
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
|
this.threadGroup = new ThreadGroup(getClass().getSimpleName());
|
||||||
|
maxNumThreadsPerVolume = datanode.getConf().getInt(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_DEFAULT);
|
||||||
|
Preconditions.checkArgument(maxNumThreadsPerVolume > 0,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_FSDATASETASYNCDISK_MAX_THREADS_PER_VOLUME_KEY +
|
||||||
|
" must be a positive integer.");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addExecutorForVolume(final FsVolumeImpl volume) {
|
private void addExecutorForVolume(final FsVolumeImpl volume) {
|
||||||
@ -110,7 +118,7 @@ public Thread newThread(Runnable r) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
||||||
CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
|
CORE_THREADS_PER_VOLUME, maxNumThreadsPerVolume,
|
||||||
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
|
THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
|
||||||
new LinkedBlockingQueue<Runnable>(), threadFactory);
|
new LinkedBlockingQueue<Runnable>(), threadFactory);
|
||||||
|
|
||||||
|
@ -2982,6 +2982,16 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.fsdatasetasyncdisk.max.threads.per.volume</name>
|
||||||
|
<value>4</value>
|
||||||
|
<description>
|
||||||
|
The maximum number of threads per volume used to process async disk
|
||||||
|
operations on the datanode. These threads consume I/O and CPU at the
|
||||||
|
same time. This will affect normal data node operations.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.cachereport.intervalMsec</name>
|
<name>dfs.cachereport.intervalMsec</name>
|
||||||
<value>10000</value>
|
<value>10000</value>
|
||||||
|
Loading…
Reference in New Issue
Block a user